import type { WorkerResponse, WorkerRequest } from "./offload.interface"; import { OffloadError } from "./offload.error"; type URLlike = URL | string; type Callback = (data: E) => Promise; type Terminator = () => void; // 🦾 type Id = number; type PromiseTable = Map< Id, { resolve: (value: any) => void; reject: (reason: unknown) => void } >; type TaskCallback = ((data: E) => Promise) & { [workerId]: Worker }; type WorkerTasks = Map; const workerId = Symbol("workerId"); const workerTasks: WorkerTasks = new Map(); /** * OffloadMode * 'cb' - (default) callback mode, spawns a worker on call and terminates it upon completion * 'bg' - runs a max number of workers of poolSize constantly in background, balances callbacks among them */ export type OffloadMode = "cb" | "bg"; /** * offload - offload a tasks to a worker */ export function offload( url: URLlike, poolSize = 1, mode: OffloadMode = "cb", ): [Callback, Terminator] { switch (mode) { case "bg": return createPooledCallback(poolSize, () => { const bg = withMessageInterceptor(new Worker(url.toString())); const bgcb = createTaskCallback(bg); return bgcb; }); default: return createBufferedCallback(poolSize, () => { const worker = withMessageInterceptor(new Worker(url.toString())); const cb = createTaskCallback(worker, () => { worker.terminate(); }); return cb; }); } } function createTaskCallback( worker: Worker, eof?: () => void, ): TaskCallback { const cb = async function (data: E): Promise { const id = createTaskId(); const errorCallback = (event: ErrorEvent) => { const error = event.message; workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); workerTasks.get(worker)?.delete(id); }; worker.addEventListener("error", errorCallback, { once: true }); const workerTask = Promise.withResolvers(); workerTasks.get(worker)?.set(id, workerTask); const request: WorkerRequest = { id, params: data }; worker.postMessage(request); try { const result = await workerTask.promise; workerTasks.get(worker)?.delete(id); if (eof) eof(); worker.removeEventListener("error", errorCallback); return result; } catch (error) { workerTasks.get(worker)?.delete(id); if (eof) eof(); worker.removeEventListener("error", errorCallback); throw error; } }; cb[workerId] = worker; return cb; } function createBufferedCallback( bufSize: number, fun: () => TaskCallback, ): [Callback, Terminator] { let free = bufSize; const waitFree = async () => { if (free <= 0) { await new Promise((resolve) => setTimeout(resolve)); return await waitFree(); } }; const spots: TaskCallback[] = []; const term = () => { for (const cb of spots) { if (cb) terminate(cb); } }; const call = async (data: E) => { if (free <= 0) await waitFree(); --free; const cb = fun(); spots[free] = cb; const result = await cb(data); delete spots[free]; free++; return result; }; return [call, term]; } function createPooledCallback( poolSize: number, fun: () => TaskCallback, ): [Callback, Terminator] { let free = poolSize; const waitFree = async () => { if (free <= 0) { await new Promise((resolve) => setTimeout(resolve)); return await waitFree(); } }; const spots: TaskCallback[] = []; for (let i = 0; i < poolSize; i++) { spots[i] = fun(); } const term = () => { for (const cb of spots) { terminate(cb); } }; const call = async (data: E) => { if (free <= 0) await waitFree(); --free; const cb = spots[0]; const result = await cb(data); free++; return result; }; return [call, term]; } function useWorker(cb: TaskCallback): Worker { return cb[workerId]; } function terminate(cb: TaskCallback): void { const worker = useWorker(cb); worker.terminate(); } function createTaskId(): Id { return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); } function withMessageInterceptor(worker: Worker): Worker { const promiseTable: PromiseTable = new Map(); workerTasks.set(worker, promiseTable); worker.onmessage = (event) => { const { id, value } = event.data as WorkerResponse; promiseTable.get(id)?.resolve(value); promiseTable.delete(id); }; return worker; }