commit 2d559dfb3168d5fe7ac83b479a0772f7831d1ad9 Author: Anton Nesterov Date: Fri Oct 25 23:19:39 2024 +0200 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9b1ee42 --- /dev/null +++ b/.gitignore @@ -0,0 +1,175 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt +dist + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..79c949c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Anton Nesterov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5aa388c --- /dev/null +++ b/README.md @@ -0,0 +1,57 @@ +# Offload + +Offload cpu-itensive tasks using WebWorkers. Offload creates a limited execution pool and can operate in two modes: +- callback mode (default) - spawns web workers on demand when the function is called, self terminated +- background - runs workers as backround jobs, distributes load among them, must be terminated manually + + +*Currently uses unstable api's.* + +To install: + +```bash +bun add githib:nesterow/offload # or pnpm +``` + +## Usage + +Considering following worker: + +```typescript +// echo.worker.ts +import { handler } from "@nesterow/offload"; +declare var self: Worker; + +handler(async (data: string) => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + console.log(data); + return true; +}); +``` + +In the callback mode, `print()` will spawn a worker and terminate it after the task is done. +Maximum of 5 workers may be spawned at the same time, the rest will be queued: + +```typescript +import { offload } from "@nesterow/offload"; + +const [print, terminate] = offload("./echo.worker.ts", 5); + +await print("Hello, World!"); // => true +``` + +In the background mode, offload will spawn 5 workers, `print()` will distribute the tasks among them: + +```typescript +import { offload } from "@nesterow/offload"; + +const [print, terminate] = offload("./echo.worker.ts", 5, 'bg'); + +await print("Hello, World!"); // => true + +terminate(); // terminate all workers, for example on exit signal +``` + +## License + +MIT diff --git a/__test__/count-primes.worker.ts b/__test__/count-primes.worker.ts new file mode 100644 index 0000000..46395d1 --- /dev/null +++ b/__test__/count-primes.worker.ts @@ -0,0 +1,28 @@ +import { handler } from "../mod.ts"; + +const isPrime = (n: number) => { + let result = true; + if (n <= 1) { + result = false; + } else { + for (let i = 2; i * i <= n; i++) { + if (n % i === 0) { + result = false; + break; + } + } + } + return result; +}; + +handler((n: number) => { + let result = 0; + + for (let i = 0; i < n; i++) { + if (isPrime(i)) { + result++; + } + } + + return result; +}); diff --git a/__test__/echo-1s.worker.ts b/__test__/echo-1s.worker.ts new file mode 100644 index 0000000..b3b0df1 --- /dev/null +++ b/__test__/echo-1s.worker.ts @@ -0,0 +1,7 @@ +import { handler } from "../mod.ts"; +declare var self: Worker; + +handler(async (data: any) => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + return data; +}); diff --git a/__test__/offload.test.ts b/__test__/offload.test.ts new file mode 100644 index 0000000..6bb5ba0 --- /dev/null +++ b/__test__/offload.test.ts @@ -0,0 +1,70 @@ +import { test, expect } from "bun:test"; +import { offload } from "../offload"; + +test( + "count-primes.worker.ts", + async () => { + const [calculatePrime, terminate] = offload( + "__test__/count-primes.worker.ts", + ); + const result = await calculatePrime(1000000); + expect(result).toBeNumber(); + }, + { timeout: 30000 }, +); + +test("throw-error.worker.ts", async () => { + const [throwError, terminate] = offload( + "__test__/throw-error.worker.ts", + ); + try { + const result = await throwError(); + } catch (e) { + expect(e).toBeInstanceOf(Error); + } +}); + +test("limited queue (size of 3)", async () => { + const [echo, terminate] = offload( + "__test__/echo-1s.worker.ts", + 3, + ); + const results: number[] = []; + for (let i = 0; i < 10; i++) { + echo(i).then((e) => results.push(e)); + } + await new Promise((resolve) => setTimeout(resolve, 1050)); + expect(results.length).toBe(3); + + await new Promise((resolve) => setTimeout(resolve, 1050)); + expect(results.length).toBe(6); + console.log("limited queue: Order is not guaranteed:", results); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + expect(results.length).toBe(10); + console.log("limited queue: Order is not guaranteed:", results); +}); + +test( + "bg daemons (size of 3)", + async () => { + const [echo, terminate] = offload( + "__test__/echo-1s.worker.ts", + 3, + "bg", + ); + const results: number[] = []; + for (let i = 0; i < 10; i++) { + echo(i).then((e) => results.push(e)); + } + await new Promise((resolve) => setTimeout(resolve, 1050)); + expect(results.length).toBeLessThanOrEqual(4); + await new Promise((resolve) => setTimeout(resolve, 3000)); + expect(results.length).toBe(10); + console.log("bg daemons: Order is not guaranteed:", results); + + console.log("bg daemons: Always terminate background jobs"); + terminate(); + }, + { timeout: 7000 }, +); diff --git a/__test__/throw-error.worker.ts b/__test__/throw-error.worker.ts new file mode 100644 index 0000000..90c0df1 --- /dev/null +++ b/__test__/throw-error.worker.ts @@ -0,0 +1,6 @@ +import { handler } from "../mod.ts"; + +handler(async () => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + throw new Error("This is an error"); +}); diff --git a/bun.lockb b/bun.lockb new file mode 100755 index 0000000..a0bdbb5 Binary files /dev/null and b/bun.lockb differ diff --git a/jsr.json b/jsr.json new file mode 100644 index 0000000..c41a005 --- /dev/null +++ b/jsr.json @@ -0,0 +1,5 @@ +{ + "name": "@nesterow/offload", + "version": "0.0.1", + "exports": "./mod.ts" +} diff --git a/mod.ts b/mod.ts new file mode 100644 index 0000000..83cca20 --- /dev/null +++ b/mod.ts @@ -0,0 +1,3 @@ +export * from "./offload.error.ts"; +export * from "./offload.handler.ts"; +export * from "./offload.ts"; diff --git a/offload.error.ts b/offload.error.ts new file mode 100644 index 0000000..944df89 --- /dev/null +++ b/offload.error.ts @@ -0,0 +1,6 @@ +export class OffloadError extends Error { + constructor(message: string, options?: any) { + super(message, options); + this.name = "OffloadError"; + } +} diff --git a/offload.handler.ts b/offload.handler.ts new file mode 100644 index 0000000..b5b4ccd --- /dev/null +++ b/offload.handler.ts @@ -0,0 +1,14 @@ +import type { WorkerResponse, WorkerRequest } from "./offload.interface"; + +declare var self: Worker; + +type HandlerCallback = ((data: E) => T) | ((data: E) => Awaited); + +export async function handler(fn: HandlerCallback): Promise { + self.addEventListener("message", async (event) => { + const request = event.data as WorkerRequest; + const result = await fn(request.params); + const response: WorkerResponse = { id: request.id, value: result }; + self.postMessage(response); + }); +} diff --git a/offload.interface.ts b/offload.interface.ts new file mode 100644 index 0000000..f08555f --- /dev/null +++ b/offload.interface.ts @@ -0,0 +1,2 @@ +export type WorkerResponse = { id: number; value: T }; +export type WorkerRequest = { id: number; params: T }; diff --git a/offload.ts b/offload.ts new file mode 100644 index 0000000..0193abd --- /dev/null +++ b/offload.ts @@ -0,0 +1,168 @@ +import type { WorkerResponse, WorkerRequest } from "./offload.interface"; +import { OffloadError } from "./offload.error"; + +type URLlike = URL | string; +type Callback = (data: E) => Promise; +type Terminator = () => void; // 🦾 +type Id = number; +type PromiseTable = Map< + Id, + { resolve: (value: any) => void; reject: (reason: unknown) => void } +>; +type TaskCallback = ((data: E) => Promise) & { [workerId]: Worker }; +type WorkerTasks = Map; + +const workerId = Symbol("workerId"); +const workerTasks: WorkerTasks = new Map(); + +/** + * OffloadMode + * 'cb' - (default) callback mode, spawns a worker on call and terminates it upon completion + * 'bg' - runs a max number of workers of poolSize constantly in background, balances callbacks among them + */ +export type OffloadMode = "cb" | "bg"; + +/** + * offload - offload a tasks to a worker + */ +export function offload( + url: URLlike, + poolSize = 1, + mode: OffloadMode = "cb", +): [Callback, Terminator] { + switch (mode) { + case "bg": + return createPooledCallback(poolSize, () => { + const bg = withMessageInterceptor(new Worker(url.toString())); + const bgcb = createTaskCallback(bg); + return bgcb; + }); + default: + return createBufferedCallback(poolSize, () => { + const worker = withMessageInterceptor(new Worker(url.toString())); + const cb = createTaskCallback(worker, () => { + worker.terminate(); + }); + return cb; + }); + } +} + +function createTaskCallback( + worker: Worker, + eof?: () => void, +): TaskCallback { + const cb = async function (data: E): Promise { + const id = createTaskId(); + worker.addEventListener( + "error", + (event) => { + const error = event.message; + workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id)); + workerTasks.get(worker)?.delete(id); + }, + { once: true }, + ); + const workerTask = Promise.withResolvers(); + workerTasks.get(worker)?.set(id, workerTask); + const request: WorkerRequest = { id, params: data }; + worker.postMessage(request); + try { + const result = await workerTask.promise; + workerTasks.get(worker)?.delete(id); + if (eof) eof(); + return result; + } catch (error) { + workerTasks.get(worker)?.delete(id); + if (eof) eof(); + throw error; + } + }; + cb[workerId] = worker; + return cb; +} + +export function createBufferedCallback( + bufSize: number, + fun: () => TaskCallback, +): [Callback, Terminator] { + let free = bufSize; + const waitFree = async () => { + if (free <= 0) { + await new Promise((resolve) => setTimeout(resolve)); + return await waitFree(); + } + }; + const spots: TaskCallback[] = []; + const term = () => { + for (const cb of spots) { + if (cb) terminate(cb); + } + }; + const call = async (data: E) => { + if (free <= 0) await waitFree(); + --free; + const cb = fun(); + spots[free] = cb; + const result = await cb(data); + delete spots[free]; + free++; + return result; + }; + return [call, term]; +} + +export function createPooledCallback( + poolSize: number, + fun: () => TaskCallback, +): [Callback, Terminator] { + let free = poolSize; + const waitFree = async () => { + if (free <= 0) { + await new Promise((resolve) => setTimeout(resolve)); + return await waitFree(); + } + }; + const spots: TaskCallback[] = []; + for (let i = 0; i < poolSize; i++) { + spots[i] = fun(); + } + const term = () => { + for (const cb of spots) { + terminate(cb); + } + }; + const call = async (data: E) => { + if (free <= 0) await waitFree(); + --free; + const cb = spots[0]; + const result = await cb(data); + free++; + return result; + }; + return [call, term]; +} + +function useWorker(cb: TaskCallback): Worker { + return cb[workerId]; +} + +function terminate(cb: TaskCallback): void { + const worker = useWorker(cb); + worker.terminate(); +} + +function createTaskId(): Id { + return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); +} + +function withMessageInterceptor(worker: Worker): Worker { + const promiseTable: PromiseTable = new Map(); + workerTasks.set(worker, promiseTable); + worker.addEventListener("message", (event) => { + const { id, value } = event.data as WorkerResponse; + promiseTable.get(id)?.resolve(value); + promiseTable.delete(id); + }); + return worker; +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..833a4fe --- /dev/null +++ b/package.json @@ -0,0 +1,13 @@ +{ + "name": "@nesterow/offload", + "version": "0.0.1", + "description": "Offload heavy tasks to a separate thread using workers", + "module": "offload.ts", + "type": "module", + "devDependencies": { + "@types/bun": "latest" + }, + "peerDependencies": { + "typescript": "^5.0.0" + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..238655f --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + // Enable latest features + "lib": ["ESNext", "DOM"], + "target": "ESNext", + "module": "ESNext", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true, + "noEmit": true, + + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false + } +}