node compat

This commit is contained in:
Anton Nesterov 2024-10-25 23:21:59 +02:00
parent 2d559dfb31
commit 975671ee34
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5
2 changed files with 156 additions and 0 deletions

148
index.js Normal file
View file

@ -0,0 +1,148 @@
// offload.error.ts
class OffloadError extends Error {
constructor(message, options) {
super(message, options);
this.name = "OffloadError";
}
}
// offload.handler.ts
async function handler(fn) {
self.addEventListener("message", async (event) => {
const request = event.data;
const result = await fn(request.params);
const response = { id: request.id, value: result };
self.postMessage(response);
});
}
// offload.ts
var workerId = Symbol("workerId");
var workerTasks = new Map;
function offload(url, poolSize = 1, mode = "cb") {
switch (mode) {
case "bg":
return createPooledCallback(poolSize, () => {
const bg = withMessageInterceptor(new Worker(url.toString()));
const bgcb = createTaskCallback(bg);
return bgcb;
});
default:
return createBufferedCallback(poolSize, () => {
const worker = withMessageInterceptor(new Worker(url.toString()));
const cb = createTaskCallback(worker, () => {
worker.terminate();
});
return cb;
});
}
}
function createTaskCallback(worker, eof) {
const cb = async function(data) {
const id = createTaskId();
worker.addEventListener("error", (event) => {
const error = event.message;
workerTasks.get(worker)?.get(id)?.reject(new OffloadError(error, id));
workerTasks.get(worker)?.delete(id);
}, { once: true });
const workerTask = Promise.withResolvers();
workerTasks.get(worker)?.set(id, workerTask);
const request = { id, params: data };
worker.postMessage(request);
try {
const result = await workerTask.promise;
workerTasks.get(worker)?.delete(id);
if (eof)
eof();
return result;
} catch (error) {
workerTasks.get(worker)?.delete(id);
if (eof)
eof();
throw error;
}
};
cb[workerId] = worker;
return cb;
}
function createBufferedCallback(bufSize, fun) {
let free = bufSize;
const waitFree = async () => {
if (free <= 0) {
await new Promise((resolve) => setTimeout(resolve));
return await waitFree();
}
};
const spots = [];
const term = () => {
for (const cb of spots) {
if (cb)
terminate(cb);
}
};
const call = async (data) => {
if (free <= 0)
await waitFree();
--free;
const cb = fun();
spots[free] = cb;
const result = await cb(data);
delete spots[free];
free++;
return result;
};
return [call, term];
}
function createPooledCallback(poolSize, fun) {
let free = poolSize;
const waitFree = async () => {
if (free <= 0) {
await new Promise((resolve) => setTimeout(resolve));
return await waitFree();
}
};
const spots = [];
for (let i = 0;i < poolSize; i++) {
spots[i] = fun();
}
const term = () => {
for (const cb of spots) {
terminate(cb);
}
};
const call = async (data) => {
if (free <= 0)
await waitFree();
--free;
const cb = spots[0];
const result = await cb(data);
free++;
return result;
};
return [call, term];
}
function useWorker(cb) {
return cb[workerId];
}
function terminate(cb) {
const worker = useWorker(cb);
worker.terminate();
}
function createTaskId() {
return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
}
function withMessageInterceptor(worker) {
const promiseTable = new Map;
workerTasks.set(worker, promiseTable);
worker.addEventListener("message", (event) => {
const { id, value } = event.data;
promiseTable.get(id)?.resolve(value);
promiseTable.delete(id);
});
return worker;
}
export {
offload,
handler,
createPooledCallback,
createBufferedCallback,
OffloadError
};

View file

@ -1,6 +1,10 @@
{
"name": "@nesterow/offload",
"version": "0.0.1",
"author": {
"name": "Anton Nesterov",
"url": "https://github.com/nesterow"
},
"description": "Offload heavy tasks to a separate thread using workers",
"module": "offload.ts",
"type": "module",
@ -9,5 +13,9 @@
},
"peerDependencies": {
"typescript": "^5.0.0"
},
"scripts": {
"bundle": "bun build ./mod.ts --outfile=index.js",
"test": "bun test"
}
}