diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..79c949c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Anton Nesterov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index dcbc315..79fc59b 100644 --- a/README.md +++ b/README.md @@ -19,44 +19,58 @@ Features: bun add github:nesterow/limiter # or pnpm ``` +## API + +- limit - default 10 +- maxRetry - number of retries, use Infinity to retry until dead +- rps - if set throttles task execution based on provided rate per second +- onError() - if set, the errors are handled silently + +```typescript +limiter = new Limiter({ + limit?: number; + maxRetry?: number; + rps?: number; + onError?: (error: Error) => Promise | void; +}) +``` + ## Usage ### Add tasks ```typescript -import { Limiter, LimiterRetryError } from "@nesterow/limiter"; - -const task = ({ url }) => { - await fetch(url); - // ... write -}; +import { Limiter } from "@nesterow/limiter"; const limiter = new Limiter({ limit: 20, - onError(error) { - // Logger.error(error) - }, }); +const task = () => { + await fetch(url); +}; + limiter.process(task); limiter.process(task); limiter.process(task); + +await limiter.done() ``` -### Limit number of requests +### Batch processing ```typescript import { Limiter } from "@nesterow/limiter"; const task = () => { await fetch("https://my.api.xyz"); - // ... write }; const limiter = new Limiter({ limit: 10, }); +// process 100 tasks, 10 at the same time await limiter.process(...Array.from({ length: 100 }, () => task())); ``` @@ -67,7 +81,6 @@ import { Limiter } from "@nesterow/limiter"; const execEvery100ms = () => { await fetch("https://my.api.xyz"); - // ... write }; const limiter = new Limiter({ @@ -75,6 +88,7 @@ const limiter = new Limiter({ rps: 10, }); +// trottle every 100ms await limiter.process(...Array.from({ length: 100 }, () => execEvery100ms())); ``` @@ -86,7 +100,6 @@ import { Limiter, LimiterRetryError } from "@nesterow/limiter"; const retry5times = () => { await fetch("https://my.api.xyz"); throw new Error("Connection refused"); - // ... write }; const limiter = new Limiter({ @@ -96,7 +109,7 @@ const limiter = new Limiter({ for (let i = 0; i < 100; i++) { try { - await limiter.process(Array.from({ length: 100 }, () => retry5times())); + await limiter.process(...Array.from({ length: 100 }, () => retry5times())); } catch (e) { if (e instanceof LimiterRetryError) { // Logger.log(e) @@ -105,26 +118,6 @@ for (let i = 0; i < 100; i++) { } ``` -### Handle errors in background +## License -```typescript -import { Limiter, LimiterRetryError } from "@nesterow/limiter"; - -const wontStopPooling = () => { - await fetch("https://my.api.xyz"); - throw new Error("Connection refused"); - // ... write -}; - -const limiter = new Limiter({ - limit: 20, - maxRetry: 5, - onError(error) { - // Logger.error(error) - }, -}); - -for (let i = 0; i < 100; i++) { - await limiter.process(Array.from({ length: 100 }, () => wontStopPooling())); -} -``` +MIT diff --git a/dist/limiter.js b/dist/limiter.js index d42c39a..762d10b 100644 --- a/dist/limiter.js +++ b/dist/limiter.js @@ -10,6 +10,7 @@ export class LimiterRetryError extends Error { } export class Limiter { #limit = 10; + #processing = false; #promisesCount = 0; #promises = []; #retryQueue = []; @@ -43,6 +44,8 @@ export class Limiter { this.#promises = []; } catch (error) { if (!this.#onError) { + this.#promises = []; + this.#processing = false; throw error; } for (;;) { @@ -52,7 +55,13 @@ export class Limiter { } } } + /** + * Process the callbacks. + * A callback must be a function that returns a promise. + * @param callbacks + */ async process(...callbacks) { + this.#processing = true; for (;;) { const item = callbacks.pop(); if (!item) break; @@ -97,6 +106,9 @@ export class Limiter { new LimiterRetryError("Retry limit exceeded", item.error), ); } else { + this.#promises = []; + this.#retryQueue = []; + this.#processing = false; throw new LimiterRetryError("Retry limit exceeded", item.error); } } @@ -104,8 +116,27 @@ export class Limiter { await this.process(...retryItems); } } + this.#processing = false; } + /** + * Wait until all the promises are resolved. + **/ + async done() { + if (this.isProcessing) { + await new Promise((resolve) => setTimeout(resolve, 10)); + await this.done(); + } + } + /** + * Get the number of promises in the queue. + */ get length() { return this.#promisesCount; } + /** + * Get the processing status. + */ + get isProcessing() { + return this.#processing; + } } diff --git a/dist/limiter.test.js b/dist/limiter.test.js index 35eb1f1..9771ce6 100644 --- a/dist/limiter.test.js +++ b/dist/limiter.test.js @@ -22,27 +22,32 @@ const setup = ({ send, close, delay = 300 }) => { }; }); }; -test("Limiter: opens #limit of concurent connections", async () => { - const connection = setup({ - send: jest.fn(() => Promise.resolve()), - close: jest.fn(() => Promise.resolve()), - delay: 500, - }); - const limiter = new Limiter({ limit: 3 }); - const connections = Array.from({ length: 7 }, () => connection()); - limiter.process( - ...connections.map((c) => { - return c.process; - }), - ); - await delay(0); - expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(1); - expect(connections[0].send).toBeCalledTimes(7); -}); +test( + "Limiter: opens #limit of concurent connections", + async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); + limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(1); + await limiter.done(); + expect(connections[0].send).toBeCalledTimes(7); + }, + { timeout: 5000 }, +); test("Limiter: can add new connections to poll", async () => { const connection = setup({ send: jest.fn(() => Promise.resolve()), diff --git a/limiter.test.ts b/limiter.test.ts index 86a36e7..a1a9ac3 100644 --- a/limiter.test.ts +++ b/limiter.test.ts @@ -25,33 +25,38 @@ const setup = ({ send, close, delay = 300 }: any) => { }); }; -test("Limiter: opens #limit of concurent connections", async () => { - const connection = setup({ - send: jest.fn(() => Promise.resolve()), - close: jest.fn(() => Promise.resolve()), - delay: 500, - }); +test( + "Limiter: opens #limit of concurent connections", + async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); - const limiter = new Limiter({ limit: 3 }); - const connections = Array.from({ length: 7 }, () => connection()); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); - limiter.process( - ...connections.map((c) => { - return c.process; - }), - ); + limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); - await delay(0); - expect(limiter.length).toBe(3); + await delay(0); + expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(3); - await delay(500); - expect(limiter.length).toBe(1); + await delay(500); + expect(limiter.length).toBe(1); - expect(connections[0].send).toBeCalledTimes(7); -}); + await limiter.done(); + expect(connections[0].send).toBeCalledTimes(7); + }, + { timeout: 5000 }, +); test("Limiter: can add new connections to poll", async () => { const connection = setup({ diff --git a/limiter.ts b/limiter.ts index 6b0a088..fe57d26 100644 --- a/limiter.ts +++ b/limiter.ts @@ -2,6 +2,9 @@ type AsyncCallback = () => any | Promise; export interface ILimiter { process: (...cb: AsyncCallback[]) => Promise; + done: () => Promise; + length: number; + isProcessing: boolean; } export interface ILimiterOptions { @@ -30,6 +33,7 @@ export class LimiterRetryError extends Error { export class Limiter implements ILimiter { #limit = 10; + #processing: boolean = false; #promisesCount = 0; #promises: Promise[] = []; #retryQueue: Array = []; @@ -71,6 +75,8 @@ export class Limiter implements ILimiter { this.#promises = []; } catch (error) { if (!this.#onError) { + this.#promises = []; + this.#processing = false; throw error; } for (;;) { @@ -81,7 +87,13 @@ export class Limiter implements ILimiter { } } + /** + * Process the callbacks. + * A callback must be a function that returns a promise. + * @param callbacks + */ async process(...callbacks: AsyncCallback[] | ILimiterRetryItem[]) { + this.#processing = true; for (;;) { const item = callbacks.pop(); if (!item) break; @@ -131,6 +143,9 @@ export class Limiter implements ILimiter { new LimiterRetryError("Retry limit exceeded", item.error), ); } else { + this.#promises = []; + this.#retryQueue = []; + this.#processing = false; throw new LimiterRetryError("Retry limit exceeded", item.error); } } @@ -138,9 +153,30 @@ export class Limiter implements ILimiter { await this.process(...retryItems); } } + this.#processing = false; } + /** + * Wait until all the promises are resolved. + **/ + async done() { + if (this.isProcessing) { + await new Promise((resolve) => setTimeout(resolve, 10)); + await this.done(); + } + } + + /** + * Get the number of promises in the queue. + */ get length(): number { return this.#promisesCount; } + + /** + * Get the processing status. + */ + get isProcessing(): boolean { + return this.#processing; + } }