From ae3881975c40132e6a6038ccf85128f84604152f Mon Sep 17 00:00:00 2001 From: Anton Nesterov Date: Sat, 26 Oct 2024 19:36:47 +0200 Subject: [PATCH] minor improvements; fix: error handler --- README.md | 36 ++++++++++++++++++++++++++++++------ index.js | 11 +++++++---- jsr.json | 2 +- offload.ts | 22 ++++++++++------------ package.json | 2 +- 5 files changed, 49 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 5aa388c..89941cc 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,15 @@ # Offload -Offload cpu-itensive tasks using WebWorkers. Offload creates a limited execution pool and can operate in two modes: +Offload heavy tasks using WebWorkers. + +Offload creates a limited execution pool and can operate in two modes: - callback mode (default) - spawns web workers on demand when the function is called, self terminated - background - runs workers as backround jobs, distributes load among them, must be terminated manually -*Currently uses unstable api's.* +*Currently uses unstable WebWorkers API.* -To install: +## Install: ```bash bun add githib:nesterow/offload # or pnpm @@ -18,7 +20,7 @@ bun add githib:nesterow/offload # or pnpm Considering following worker: ```typescript -// echo.worker.ts +// print.worker.ts import { handler } from "@nesterow/offload"; declare var self: Worker; @@ -29,29 +31,51 @@ handler(async (data: string) => { }); ``` +### Callback operation mode + In the callback mode, `print()` will spawn a worker and terminate it after the task is done. Maximum of 5 workers may be spawned at the same time, the rest will be queued: ```typescript import { offload } from "@nesterow/offload"; -const [print, terminate] = offload("./echo.worker.ts", 5); +const [print, terminate] = offload("./print.worker.ts", 5); await print("Hello, World!"); // => true ``` +Callback operatinal mode us useful when thread startup delay doesn't matter. +You don't need to worry about worker termination as it exits after the callback returns result. +This is default "safe" option as it allows to call `offload` in any part of the application. + +### Background operation mode + In the background mode, offload will spawn 5 workers, `print()` will distribute the tasks among them: ```typescript import { offload } from "@nesterow/offload"; -const [print, terminate] = offload("./echo.worker.ts", 5, 'bg'); +const [print, terminate] = offload("./print.worker.ts", 5, 'bg'); await print("Hello, World!"); // => true terminate(); // terminate all workers, for example on exit signal ``` +Background operation mode is useful when you need to spawn pre-defined number of the threads on application start. +Generally it is more effective as it balances the load among the threads and doesn't have startup delay. + +## Types + +Because offload doesn't know params and return types of your worker, you need to pass type arguments manually: + +```typescript +const [callback, termiate] = offload("./my.worker.ts", 1); + +const param: ParamType = {}; +const result: ReturnType = await callback(param: ParamType); +``` + ## License MIT diff --git a/index.js b/index.js index 6b1caed..c92f905 100644 --- a/index.js +++ b/index.js @@ -38,11 +38,12 @@ function offload(url, poolSize = 1, mode = "cb") { function createTaskCallback(worker, eof) { const cb = async function(data) { const id = createTaskId(); - worker.addEventListener("error", (event) => { + const errorCallback = (event) => { const error = event.message; workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); workerTasks.get(worker)?.delete(id); - }, { once: true }); + }; + worker.addEventListener("error", errorCallback, { once: true }); const workerTask = Promise.withResolvers(); workerTasks.get(worker)?.set(id, workerTask); const request = { id, params: data }; @@ -52,11 +53,13 @@ function createTaskCallback(worker, eof) { workerTasks.get(worker)?.delete(id); if (eof) eof(); + worker.removeEventListener("error", errorCallback); return result; } catch (error) { workerTasks.get(worker)?.delete(id); if (eof) eof(); + worker.removeEventListener("error", errorCallback); throw error; } }; @@ -132,11 +135,11 @@ function createTaskId() { function withMessageInterceptor(worker) { const promiseTable = new Map; workerTasks.set(worker, promiseTable); - worker.addEventListener("message", (event) => { + worker.onmessage = (event) => { const { id, value } = event.data; promiseTable.get(id)?.resolve(value); promiseTable.delete(id); - }); + }; return worker; } export { diff --git a/jsr.json b/jsr.json index 305f5bf..773920d 100644 --- a/jsr.json +++ b/jsr.json @@ -1,5 +1,5 @@ { "name": "@nesterow/offload", - "version": "0.0.2", + "version": "0.0.3", "exports": "./mod.ts" } diff --git a/offload.ts b/offload.ts index 85486f2..c3953b4 100644 --- a/offload.ts +++ b/offload.ts @@ -54,15 +54,12 @@ function createTaskCallback( ): TaskCallback { const cb = async function (data: E): Promise { const id = createTaskId(); - worker.addEventListener( - "error", - (event) => { - const error = event.message; - workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); - workerTasks.get(worker)?.delete(id); - }, - { once: true }, - ); + const errorCallback = (event: ErrorEvent) => { + const error = event.message; + workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); + workerTasks.get(worker)?.delete(id); + }; + worker.addEventListener("error", errorCallback, { once: true }); const workerTask = Promise.withResolvers(); workerTasks.get(worker)?.set(id, workerTask); const request: WorkerRequest = { id, params: data }; @@ -71,10 +68,12 @@ function createTaskCallback( const result = await workerTask.promise; workerTasks.get(worker)?.delete(id); if (eof) eof(); + worker.removeEventListener("error", errorCallback); return result; } catch (error) { workerTasks.get(worker)?.delete(id); if (eof) eof(); + worker.removeEventListener("error", errorCallback); throw error; } }; @@ -155,14 +154,13 @@ function terminate(cb: TaskCallback): void { function createTaskId(): Id { return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); } - function withMessageInterceptor(worker: Worker): Worker { const promiseTable: PromiseTable = new Map(); workerTasks.set(worker, promiseTable); - worker.addEventListener("message", (event) => { + worker.onmessage = (event) => { const { id, value } = event.data as WorkerResponse; promiseTable.get(id)?.resolve(value); promiseTable.delete(id); - }); + }; return worker; } diff --git a/package.json b/package.json index f51ad8f..1d29ac1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@nesterow/offload", - "version": "0.0.2", + "version": "0.0.3", "author": { "name": "Anton Nesterov", "url": "https://github.com/nesterow"