diff --git a/README.md b/README.md index a6e1c96..e2e645d 100644 --- a/README.md +++ b/README.md @@ -24,21 +24,20 @@ bun add github:nesterow/limiter # or pnpm ### Limit number of requests ```typescript -import {Limiter} from '@nesterow/limiter' +import { Limiter } from "@nesterow/limiter"; const task = () => { - await fetch('https://my.api.xyz') - // ... write -} + await fetch("https://my.api.xyz"); + // ... write +}; const limiter = new Limiter({ - limit: 10 -}) + limit: 10, +}); -for (let i=0; i<100; i++) { - await limiter.process(task) +for (let i = 0; i < 100; i++) { + await limiter.process(task); } - ``` ### Limit RPS diff --git a/dist/limiter.js b/dist/limiter.js index aa56816..d42c39a 100644 --- a/dist/limiter.js +++ b/dist/limiter.js @@ -1,117 +1,111 @@ export class LimiterRetryError extends Error { - constructor(message, error) { - super(message); - this.name = "RetryError"; - if (error) { - this.stack = error.stack; - this.cause = error; - } + constructor(message, error) { + super(message); + this.name = "RetryError"; + if (error) { + this.stack = error.stack; + this.cause = error; } + } } export class Limiter { - #limit = 10; - #promisesCount = 0; - #promises = []; - #retryQueue = []; - #maxRetry = 0; - #rps; - #onError; - constructor({ limit = 10, rps, maxRetry = 0, onError = undefined, }) { - this.#limit = limit; - this.#rps = rps; - this.#maxRetry = maxRetry; - this.#onError = onError?.bind(this); + #limit = 10; + #promisesCount = 0; + #promises = []; + #retryQueue = []; + #maxRetry = 0; + #rps; + #onError; + constructor({ limit = 10, rps, maxRetry = 0, onError = undefined }) { + this.#limit = limit; + this.#rps = rps; + this.#maxRetry = maxRetry; + this.#onError = onError?.bind(this); + } + #tick = Date.now(); + async #limitRps(callback, delay = 0) { + if (!this.#rps) { + return await callback(); } - #tick = Date.now(); - async #limitRps(callback, delay = 0) { - if (!this.#rps) { - return await callback(); - } - if (delay > 0) { - await new Promise((resolve) => setTimeout(resolve, delay)); - } - const diff = Date.now() - this.#tick; - if (diff < 1000 / this.#rps) { - return await this.#limitRps(callback, 1000 / this.#rps - diff); - } - this.#tick = Date.now(); - return await callback(); + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); } - async #execute() { + const diff = Date.now() - this.#tick; + if (diff < 1000 / this.#rps) { + return await this.#limitRps(callback, 1000 / this.#rps - diff); + } + this.#tick = Date.now(); + return await callback(); + } + async #execute() { + try { + await Promise.all(this.#promises); + this.#promises = []; + } catch (error) { + if (!this.#onError) { + throw error; + } + for (;;) { + const promise = this.#promises.pop(); + if (!promise) break; + promise.catch(this.#onError); + } + } + } + async process(...callbacks) { + for (;;) { + const item = callbacks.pop(); + if (!item) break; + if (this.#promisesCount >= this.#limit) { + await this.#execute(); + } + this.#promisesCount++; + const promise = (async (item) => { + const callback = item.callback || item; try { - await Promise.all(this.#promises); - this.#promises = []; - } - catch (error) { - if (!this.#onError) { - throw error; - } - for (;;) { - const promise = this.#promises.pop(); - if (!promise) - break; - promise.catch(this.#onError); - } + const res = await this.#limitRps(callback); + this.#promisesCount--; + return res; + } catch (error) { + this.#promisesCount--; + if (this.#maxRetry > 0) { + this.#retryQueue.push({ + callback, + retries: item.retries ?? this.#maxRetry, + error: error, + }); + } else { + throw error; + } } + })(item); + this.#promises.push(promise); } - async process(...callbacks) { - for (;;) { - const item = callbacks.pop(); - if (!item) - break; - if (this.#promisesCount >= this.#limit) { - await this.#execute(); - } - this.#promisesCount++; - const promise = (async (item) => { - const callback = item.callback || item; - try { - const res = await this.#limitRps(callback); - this.#promisesCount--; - return res; - } - catch (error) { - this.#promisesCount--; - if (this.#maxRetry > 0) { - this.#retryQueue.push({ - callback, - retries: item.retries ?? this.#maxRetry, - error: error, - }); - } - else { - throw error; - } - } - })(item); - this.#promises.push(promise); - } - if (this.#promises.length > 0) { - await this.#execute(); - } - if (this.#retryQueue.length > 0) { - const retryItems = []; - for (;;) { - const item = this.#retryQueue.pop(); - if (!item) - break; - if (item.retries > 0) { - item.retries--; - retryItems.push(item); - } - else if (this.#onError) { - this.#onError(new LimiterRetryError("Retry limit exceeded", item.error)); - } - else { - throw new LimiterRetryError("Retry limit exceeded", item.error); - } - } - if (retryItems.length) { - await this.process(...retryItems); - } - } + if (this.#promises.length > 0) { + await this.#execute(); } - get length() { - return this.#promisesCount; + if (this.#retryQueue.length > 0) { + const retryItems = []; + for (;;) { + const item = this.#retryQueue.pop(); + if (!item) break; + if (item.retries > 0) { + item.retries--; + retryItems.push(item); + } else if (this.#onError) { + this.#onError( + new LimiterRetryError("Retry limit exceeded", item.error), + ); + } else { + throw new LimiterRetryError("Retry limit exceeded", item.error); + } + } + if (retryItems.length) { + await this.process(...retryItems); + } } + } + get length() { + return this.#promisesCount; + } } diff --git a/dist/limiter.test.js b/dist/limiter.test.js index 7c20a9a..35eb1f1 100644 --- a/dist/limiter.test.js +++ b/dist/limiter.test.js @@ -2,172 +2,182 @@ import { beforeAll, expect, test, jest } from "bun:test"; import { Limiter, LimiterRetryError } from "./limiter"; const delay = (ms) => new Promise((r) => setTimeout(r, ms)); const setup = ({ send, close, delay = 300 }) => { - return jest.fn(() => { - let closed = false; - let loading = false; - return { - process: jest.fn(async () => { - if (closed) - throw new Error("Connection closed"); - //if (loading) throw new Error("Connection in use"); - loading = true; - await send(); - await new Promise((resolve) => setTimeout(resolve, delay)); - loading = false; - }), - close: jest.fn(async () => { - close(); - closed = true; - }), - send, - }; - }); + return jest.fn(() => { + let closed = false; + let loading = false; + return { + process: jest.fn(async () => { + if (closed) throw new Error("Connection closed"); + //if (loading) throw new Error("Connection in use"); + loading = true; + await send(); + await new Promise((resolve) => setTimeout(resolve, delay)); + loading = false; + }), + close: jest.fn(async () => { + close(); + closed = true; + }), + send, + }; + }); }; test("Limiter: opens #limit of concurent connections", async () => { - const connection = setup({ - send: jest.fn(() => Promise.resolve()), - close: jest.fn(() => Promise.resolve()), - delay: 500, - }); - const limiter = new Limiter({ limit: 3 }); - const connections = Array.from({ length: 7 }, () => connection()); - limiter.process(...connections.map((c) => { - return c.process; - })); - await delay(0); - expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(1); - expect(connections[0].send).toBeCalledTimes(7); + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); + limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(1); + expect(connections[0].send).toBeCalledTimes(7); }); test("Limiter: can add new connections to poll", async () => { - const connection = setup({ - send: jest.fn(() => Promise.resolve()), - close: jest.fn(() => Promise.resolve()), - delay: 500, - }); - const limiter = new Limiter({ limit: 3 }); - limiter.process(connection().process); - limiter.process(connection().process); - limiter.process(connection().process); - limiter.process(connection().process, connection().process); - await delay(0); - expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(2); - await delay(500); - expect(limiter.length).toBe(0); + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process, connection().process); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(2); + await delay(500); + expect(limiter.length).toBe(0); }); test("Limiter: limit RPS - requests are evenly distributed", async () => { - const connection = setup({ - send: jest.fn(() => { - return Promise.resolve(); - }), - close: jest.fn(() => Promise.resolve()), - delay: 0, - }); - const limiter = new Limiter({ limit: 20, rps: 10 }); - const connections = Array.from({ length: 45 }, () => connection()); - let count = 0; - const timestamps = []; - await limiter.process(...connections.map((c) => { - return () => { - ++count; - timestamps.push(Date.now()); - return c.process(); - }; - })); - expect(count).toBe(45); - const diffsAvg = timestamps - .map((t, i) => { + const connection = setup({ + send: jest.fn(() => { + return Promise.resolve(); + }), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + const limiter = new Limiter({ limit: 20, rps: 10 }); + const connections = Array.from({ length: 45 }, () => connection()); + let count = 0; + const timestamps = []; + await limiter.process( + ...connections.map((c) => { + return () => { + ++count; + timestamps.push(Date.now()); + return c.process(); + }; + }), + ); + expect(count).toBe(45); + const diffsAvg = + timestamps + .map((t, i) => { return i === 0 ? 100 : t - timestamps[i - 1]; - }) - .reduce((a, b) => a + b) / timestamps.length; - expect(diffsAvg).toBeGreaterThan(99); - expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms + }) + .reduce((a, b) => a + b) / timestamps.length; + expect(diffsAvg).toBeGreaterThan(99); + expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms }); test("Limiter: throws an error by deafult", async () => { - const connection = setup({ - send: jest.fn(() => Promise.reject(1)), - close: jest.fn(() => Promise.resolve()), - delay: 500, - }); - const limiter = new Limiter({ limit: 3 }); - const connections = Array.from({ length: 6 }, () => connection()); - try { - await limiter.process(...connections.map((c) => { - return c.process; - })); - } - catch (e) { - expect(e).toBe(1); - } - expect(limiter.length).toBe(0); - expect(connections[0].send).toBeCalledTimes(3); + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 6 }, () => connection()); + try { + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + } catch (e) { + expect(e).toBe(1); + } + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(3); }); test("Limiter: #onError, no trow", async () => { - const connection = setup({ - send: jest.fn(() => Promise.reject(1)), - close: jest.fn(() => Promise.resolve()), - delay: 500, - }); - const onError = jest.fn(() => { }); - const limiter = new Limiter({ - limit: 3, - onError, - }); - const connections = Array.from({ length: 6 }, () => connection()); - await limiter.process(...connections.map((c) => { - return c.process; - })); - expect(limiter.length).toBe(0); - expect(connections[0].send).toBeCalledTimes(6); - expect(onError).toBeCalledTimes(6); + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const onError = jest.fn(() => {}); + const limiter = new Limiter({ + limit: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(6); + expect(onError).toBeCalledTimes(6); }); test("Limiter: #maxRetry, exit on fail", async () => { - const connection = setup({ - send: () => Promise.reject(1), - close: jest.fn(() => Promise.resolve()), - delay: 0, - }); - const limiter = new Limiter({ - limit: 3, - maxRetry: 3, - }); - const connections = Array.from({ length: 6 }, () => connection()); - let count = 0; - try { - await limiter.process(...connections.map((c) => { - ++count; - return c.process; - })); - } - catch (e) { - expect(e).toBeInstanceOf(LimiterRetryError); - } - expect(limiter.length).toBe(0); + const connection = setup({ + send: () => Promise.reject(1), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + }); + const connections = Array.from({ length: 6 }, () => connection()); + let count = 0; + try { + await limiter.process( + ...connections.map((c) => { + ++count; + return c.process; + }), + ); + } catch (e) { + expect(e).toBeInstanceOf(LimiterRetryError); + } + expect(limiter.length).toBe(0); }); test("Limiter: #onError, #maxRetry", async () => { - const connection = setup({ - send: jest.fn(() => Promise.reject(new Error("Connection error"))), - close: jest.fn(() => Promise.resolve()), - delay: 0, - }); - let error; - const onError = jest.fn((err) => { - error = err; - }); - const limiter = new Limiter({ - limit: 3, - maxRetry: 3, - onError, - }); - const connections = Array.from({ length: 6 }, () => connection()); - await limiter.process(...connections.map((c) => { - return c.process; - })); - expect(onError).toBeCalledTimes(6); - expect(error).toBeInstanceOf(LimiterRetryError); + const connection = setup({ + send: jest.fn(() => Promise.reject(new Error("Connection error"))), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + let error; + const onError = jest.fn((err) => { + error = err; + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + expect(onError).toBeCalledTimes(6); + expect(error).toBeInstanceOf(LimiterRetryError); }); diff --git a/jsr.json b/jsr.json new file mode 100644 index 0000000..8cea72c --- /dev/null +++ b/jsr.json @@ -0,0 +1,5 @@ +{ + "name": "@nesterow/limiter", + "version": "0.1.0", + "exports": "./limiter.ts" +} diff --git a/limiter.ts b/limiter.ts index 7d11426..6b0a088 100644 --- a/limiter.ts +++ b/limiter.ts @@ -140,7 +140,7 @@ export class Limiter implements ILimiter { } } - get length() { + get length(): number { return this.#promisesCount; } } diff --git a/package.json b/package.json index 1b3a409..29577eb 100644 --- a/package.json +++ b/package.json @@ -1,4 +1,5 @@ { + "version": "0.1.0", "name": "@nesterow/limiter", "module": "limiter.ts", "type": "module",