minor improvements; fix: error handler
Some checks failed
Publish / publish (push) Has been cancelled

This commit is contained in:
Anton Nesterov 2024-10-26 19:36:47 +02:00
parent 27ce25096d
commit ae3881975c
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5
5 changed files with 49 additions and 24 deletions

View file

@ -1,13 +1,15 @@
# Offload # Offload
Offload cpu-itensive tasks using WebWorkers. Offload creates a limited execution pool and can operate in two modes: Offload heavy tasks using WebWorkers.
Offload creates a limited execution pool and can operate in two modes:
- callback mode (default) - spawns web workers on demand when the function is called, self terminated - callback mode (default) - spawns web workers on demand when the function is called, self terminated
- background - runs workers as backround jobs, distributes load among them, must be terminated manually - background - runs workers as backround jobs, distributes load among them, must be terminated manually
*Currently uses unstable api's.* *Currently uses unstable WebWorkers API.*
To install: ## Install:
```bash ```bash
bun add githib:nesterow/offload # or pnpm bun add githib:nesterow/offload # or pnpm
@ -18,7 +20,7 @@ bun add githib:nesterow/offload # or pnpm
Considering following worker: Considering following worker:
```typescript ```typescript
// echo.worker.ts // print.worker.ts
import { handler } from "@nesterow/offload"; import { handler } from "@nesterow/offload";
declare var self: Worker; declare var self: Worker;
@ -29,29 +31,51 @@ handler(async (data: string) => {
}); });
``` ```
### Callback operation mode
In the callback mode, `print()` will spawn a worker and terminate it after the task is done. In the callback mode, `print()` will spawn a worker and terminate it after the task is done.
Maximum of 5 workers may be spawned at the same time, the rest will be queued: Maximum of 5 workers may be spawned at the same time, the rest will be queued:
```typescript ```typescript
import { offload } from "@nesterow/offload"; import { offload } from "@nesterow/offload";
const [print, terminate] = offload<boolean, string>("./echo.worker.ts", 5); const [print, terminate] = offload<boolean, string>("./print.worker.ts", 5);
await print("Hello, World!"); // => true await print("Hello, World!"); // => true
``` ```
Callback operatinal mode us useful when thread startup delay doesn't matter.
You don't need to worry about worker termination as it exits after the callback returns result.
This is default "safe" option as it allows to call `offload` in any part of the application.
### Background operation mode
In the background mode, offload will spawn 5 workers, `print()` will distribute the tasks among them: In the background mode, offload will spawn 5 workers, `print()` will distribute the tasks among them:
```typescript ```typescript
import { offload } from "@nesterow/offload"; import { offload } from "@nesterow/offload";
const [print, terminate] = offload<boolean, string>("./echo.worker.ts", 5, 'bg'); const [print, terminate] = offload<boolean, string>("./print.worker.ts", 5, 'bg');
await print("Hello, World!"); // => true await print("Hello, World!"); // => true
terminate(); // terminate all workers, for example on exit signal terminate(); // terminate all workers, for example on exit signal
``` ```
Background operation mode is useful when you need to spawn pre-defined number of the threads on application start.
Generally it is more effective as it balances the load among the threads and doesn't have startup delay.
## Types
Because offload doesn't know params and return types of your worker, you need to pass type arguments manually:
```typescript
const [callback, termiate] = offload<ReturnType, ParamType>("./my.worker.ts", 1);
const param: ParamType = {};
const result: ReturnType = await callback(param: ParamType);
```
## License ## License
MIT MIT

View file

@ -38,11 +38,12 @@ function offload(url, poolSize = 1, mode = "cb") {
function createTaskCallback(worker, eof) { function createTaskCallback(worker, eof) {
const cb = async function(data) { const cb = async function(data) {
const id = createTaskId(); const id = createTaskId();
worker.addEventListener("error", (event) => { const errorCallback = (event) => {
const error = event.message; const error = event.message;
workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id));
workerTasks.get(worker)?.delete(id); workerTasks.get(worker)?.delete(id);
}, { once: true }); };
worker.addEventListener("error", errorCallback, { once: true });
const workerTask = Promise.withResolvers(); const workerTask = Promise.withResolvers();
workerTasks.get(worker)?.set(id, workerTask); workerTasks.get(worker)?.set(id, workerTask);
const request = { id, params: data }; const request = { id, params: data };
@ -52,11 +53,13 @@ function createTaskCallback(worker, eof) {
workerTasks.get(worker)?.delete(id); workerTasks.get(worker)?.delete(id);
if (eof) if (eof)
eof(); eof();
worker.removeEventListener("error", errorCallback);
return result; return result;
} catch (error) { } catch (error) {
workerTasks.get(worker)?.delete(id); workerTasks.get(worker)?.delete(id);
if (eof) if (eof)
eof(); eof();
worker.removeEventListener("error", errorCallback);
throw error; throw error;
} }
}; };
@ -132,11 +135,11 @@ function createTaskId() {
function withMessageInterceptor(worker) { function withMessageInterceptor(worker) {
const promiseTable = new Map; const promiseTable = new Map;
workerTasks.set(worker, promiseTable); workerTasks.set(worker, promiseTable);
worker.addEventListener("message", (event) => { worker.onmessage = (event) => {
const { id, value } = event.data; const { id, value } = event.data;
promiseTable.get(id)?.resolve(value); promiseTable.get(id)?.resolve(value);
promiseTable.delete(id); promiseTable.delete(id);
}); };
return worker; return worker;
} }
export { export {

View file

@ -1,5 +1,5 @@
{ {
"name": "@nesterow/offload", "name": "@nesterow/offload",
"version": "0.0.2", "version": "0.0.3",
"exports": "./mod.ts" "exports": "./mod.ts"
} }

View file

@ -54,15 +54,12 @@ function createTaskCallback<T, E>(
): TaskCallback<T, E> { ): TaskCallback<T, E> {
const cb = async function (data: E): Promise<T> { const cb = async function (data: E): Promise<T> {
const id = createTaskId(); const id = createTaskId();
worker.addEventListener( const errorCallback = (event: ErrorEvent) => {
"error",
(event) => {
const error = event.message; const error = event.message;
workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id));
workerTasks.get(worker)?.delete(id); workerTasks.get(worker)?.delete(id);
}, };
{ once: true }, worker.addEventListener("error", errorCallback, { once: true });
);
const workerTask = Promise.withResolvers<T>(); const workerTask = Promise.withResolvers<T>();
workerTasks.get(worker)?.set(id, workerTask); workerTasks.get(worker)?.set(id, workerTask);
const request: WorkerRequest<E> = { id, params: data }; const request: WorkerRequest<E> = { id, params: data };
@ -71,10 +68,12 @@ function createTaskCallback<T, E>(
const result = await workerTask.promise; const result = await workerTask.promise;
workerTasks.get(worker)?.delete(id); workerTasks.get(worker)?.delete(id);
if (eof) eof(); if (eof) eof();
worker.removeEventListener("error", errorCallback);
return result; return result;
} catch (error) { } catch (error) {
workerTasks.get(worker)?.delete(id); workerTasks.get(worker)?.delete(id);
if (eof) eof(); if (eof) eof();
worker.removeEventListener("error", errorCallback);
throw error; throw error;
} }
}; };
@ -155,14 +154,13 @@ function terminate<T, E>(cb: TaskCallback<T, E>): void {
function createTaskId(): Id { function createTaskId(): Id {
return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
} }
function withMessageInterceptor(worker: Worker): Worker { function withMessageInterceptor(worker: Worker): Worker {
const promiseTable: PromiseTable = new Map(); const promiseTable: PromiseTable = new Map();
workerTasks.set(worker, promiseTable); workerTasks.set(worker, promiseTable);
worker.addEventListener("message", (event) => { worker.onmessage = (event) => {
const { id, value } = event.data as WorkerResponse<unknown>; const { id, value } = event.data as WorkerResponse<unknown>;
promiseTable.get(id)?.resolve(value); promiseTable.get(id)?.resolve(value);
promiseTable.delete(id); promiseTable.delete(id);
}); };
return worker; return worker;
} }

View file

@ -1,6 +1,6 @@
{ {
"name": "@nesterow/offload", "name": "@nesterow/offload",
"version": "0.0.2", "version": "0.0.3",
"author": { "author": {
"name": "Anton Nesterov", "name": "Anton Nesterov",
"url": "https://github.com/nesterow" "url": "https://github.com/nesterow"