commit ac6faab5c7aab5407c983c5a5459423b0b281205 Author: Anton Nesterov Date: Thu Jul 11 01:14:10 2024 +0200 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2c8942c --- /dev/null +++ b/.gitignore @@ -0,0 +1,174 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..eed158a --- /dev/null +++ b/README.md @@ -0,0 +1,116 @@ +# Limiter + +A promise poll with RPS limiter. + +Features: + +- [x] TypeScript first +- [x] Limits parrallel promises execution +- [x] Limits RPS (requests per second), evenly distributes the requests over time +- [x] Able to retry +- [x] Simple API +- [x] Simple async/await flow +- [x] Allows to handle errors silently using onError callback +- [x] Works with any runtime (Bun/Deno/Node) + +## Install + +```bash +bun add github:nesterow/limiter # or pnpm +``` + +## Usage + +### Limit number of requests + +```typescript +import {Limiter} from '@nesterow/limiter' + +const task = () => { + await fetch('https://my.api.xyz') + // ... write +} + +const limiter = new Limiter({ + limit: 10 +}) + +for (let i=0; i<100; i++) { + await limiter.process(task) +} + +``` + +### Limit RPS + +```typescript +import {Limiter} from '@nesterow/limiter' + +const execEvery100ms = () => { + await fetch('https://my.api.xyz') + // ... write +} + +const limiter = new Limiter({ + limit: 20 + rps: 10 +}) + +for (let i=0; i < 100; i++) { + await limiter.process(execEvery100ms) +} + +``` + +### Retry + +```typescript +import {Limiter, LimiterRetryError} from '@nesterow/limiter' + +const retry5times = () => { + await fetch('https://my.api.xyz') + throw new Error("Connection refused") + // ... write +} + +const limiter = new Limiter({ + limit: 20 + maxRetry: 5 +}) + +for (let i=0; i < 100; i++) { + try { + await limiter.process(retry5times) + } catch(e) { + if (e instanceof LimiterRetryError) { + // Logger.log(e) + } + } +} + +``` + +### Handle errors in background + +```typescript +import {Limiter, LimiterRetryError} from '@nesterow/limiter' + +const wontStopPooling = () => { + await fetch('https://my.api.xyz') + throw new Error("Connection refused") + // ... write +} + +const limiter = new Limiter({ + limit: 20 + maxRetry: 5, + onError(error) { + // Logger.error(error) + } +}) + +for (let i=0; i < 100; i++) { + await limiter.process(wontStopPooling) +} + +``` diff --git a/bun.lockb b/bun.lockb new file mode 100755 index 0000000..6c2e827 Binary files /dev/null and b/bun.lockb differ diff --git a/dist/limiter.js b/dist/limiter.js new file mode 100644 index 0000000..aa56816 --- /dev/null +++ b/dist/limiter.js @@ -0,0 +1,117 @@ +export class LimiterRetryError extends Error { + constructor(message, error) { + super(message); + this.name = "RetryError"; + if (error) { + this.stack = error.stack; + this.cause = error; + } + } +} +export class Limiter { + #limit = 10; + #promisesCount = 0; + #promises = []; + #retryQueue = []; + #maxRetry = 0; + #rps; + #onError; + constructor({ limit = 10, rps, maxRetry = 0, onError = undefined, }) { + this.#limit = limit; + this.#rps = rps; + this.#maxRetry = maxRetry; + this.#onError = onError?.bind(this); + } + #tick = Date.now(); + async #limitRps(callback, delay = 0) { + if (!this.#rps) { + return await callback(); + } + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + const diff = Date.now() - this.#tick; + if (diff < 1000 / this.#rps) { + return await this.#limitRps(callback, 1000 / this.#rps - diff); + } + this.#tick = Date.now(); + return await callback(); + } + async #execute() { + try { + await Promise.all(this.#promises); + this.#promises = []; + } + catch (error) { + if (!this.#onError) { + throw error; + } + for (;;) { + const promise = this.#promises.pop(); + if (!promise) + break; + promise.catch(this.#onError); + } + } + } + async process(...callbacks) { + for (;;) { + const item = callbacks.pop(); + if (!item) + break; + if (this.#promisesCount >= this.#limit) { + await this.#execute(); + } + this.#promisesCount++; + const promise = (async (item) => { + const callback = item.callback || item; + try { + const res = await this.#limitRps(callback); + this.#promisesCount--; + return res; + } + catch (error) { + this.#promisesCount--; + if (this.#maxRetry > 0) { + this.#retryQueue.push({ + callback, + retries: item.retries ?? this.#maxRetry, + error: error, + }); + } + else { + throw error; + } + } + })(item); + this.#promises.push(promise); + } + if (this.#promises.length > 0) { + await this.#execute(); + } + if (this.#retryQueue.length > 0) { + const retryItems = []; + for (;;) { + const item = this.#retryQueue.pop(); + if (!item) + break; + if (item.retries > 0) { + item.retries--; + retryItems.push(item); + } + else if (this.#onError) { + this.#onError(new LimiterRetryError("Retry limit exceeded", item.error)); + } + else { + throw new LimiterRetryError("Retry limit exceeded", item.error); + } + } + if (retryItems.length) { + await this.process(...retryItems); + } + } + } + get length() { + return this.#promisesCount; + } +} diff --git a/dist/limiter.test.js b/dist/limiter.test.js new file mode 100644 index 0000000..7c20a9a --- /dev/null +++ b/dist/limiter.test.js @@ -0,0 +1,173 @@ +import { beforeAll, expect, test, jest } from "bun:test"; +import { Limiter, LimiterRetryError } from "./limiter"; +const delay = (ms) => new Promise((r) => setTimeout(r, ms)); +const setup = ({ send, close, delay = 300 }) => { + return jest.fn(() => { + let closed = false; + let loading = false; + return { + process: jest.fn(async () => { + if (closed) + throw new Error("Connection closed"); + //if (loading) throw new Error("Connection in use"); + loading = true; + await send(); + await new Promise((resolve) => setTimeout(resolve, delay)); + loading = false; + }), + close: jest.fn(async () => { + close(); + closed = true; + }), + send, + }; + }); +}; +test("Limiter: opens #limit of concurent connections", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); + limiter.process(...connections.map((c) => { + return c.process; + })); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(1); + expect(connections[0].send).toBeCalledTimes(7); +}); +test("Limiter: can add new connections to poll", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process, connection().process); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(2); + await delay(500); + expect(limiter.length).toBe(0); +}); +test("Limiter: limit RPS - requests are evenly distributed", async () => { + const connection = setup({ + send: jest.fn(() => { + return Promise.resolve(); + }), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + const limiter = new Limiter({ limit: 20, rps: 10 }); + const connections = Array.from({ length: 45 }, () => connection()); + let count = 0; + const timestamps = []; + await limiter.process(...connections.map((c) => { + return () => { + ++count; + timestamps.push(Date.now()); + return c.process(); + }; + })); + expect(count).toBe(45); + const diffsAvg = timestamps + .map((t, i) => { + return i === 0 ? 100 : t - timestamps[i - 1]; + }) + .reduce((a, b) => a + b) / timestamps.length; + expect(diffsAvg).toBeGreaterThan(99); + expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms +}); +test("Limiter: throws an error by deafult", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 6 }, () => connection()); + try { + await limiter.process(...connections.map((c) => { + return c.process; + })); + } + catch (e) { + expect(e).toBe(1); + } + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(3); +}); +test("Limiter: #onError, no trow", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const onError = jest.fn(() => { }); + const limiter = new Limiter({ + limit: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + await limiter.process(...connections.map((c) => { + return c.process; + })); + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(6); + expect(onError).toBeCalledTimes(6); +}); +test("Limiter: #maxRetry, exit on fail", async () => { + const connection = setup({ + send: () => Promise.reject(1), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + }); + const connections = Array.from({ length: 6 }, () => connection()); + let count = 0; + try { + await limiter.process(...connections.map((c) => { + ++count; + return c.process; + })); + } + catch (e) { + expect(e).toBeInstanceOf(LimiterRetryError); + } + expect(limiter.length).toBe(0); +}); +test("Limiter: #onError, #maxRetry", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(new Error("Connection error"))), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + let error; + const onError = jest.fn((err) => { + error = err; + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + await limiter.process(...connections.map((c) => { + return c.process; + })); + expect(onError).toBeCalledTimes(6); + expect(error).toBeInstanceOf(LimiterRetryError); +}); diff --git a/limiter.test.ts b/limiter.test.ts new file mode 100644 index 0000000..86a36e7 --- /dev/null +++ b/limiter.test.ts @@ -0,0 +1,219 @@ +import { beforeAll, expect, test, jest } from "bun:test"; +import { Limiter, LimiterRetryError } from "./limiter"; + +const delay = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +const setup = ({ send, close, delay = 300 }: any) => { + return jest.fn(() => { + let closed = false; + let loading = false; + return { + process: jest.fn(async () => { + if (closed) throw new Error("Connection closed"); + //if (loading) throw new Error("Connection in use"); + loading = true; + await send(); + await new Promise((resolve) => setTimeout(resolve, delay)); + loading = false; + }), + close: jest.fn(async () => { + close(); + closed = true; + }), + send, + }; + }); +}; + +test("Limiter: opens #limit of concurent connections", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); + + limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + + await delay(0); + expect(limiter.length).toBe(3); + + await delay(500); + expect(limiter.length).toBe(3); + + await delay(500); + expect(limiter.length).toBe(1); + + expect(connections[0].send).toBeCalledTimes(7); +}); + +test("Limiter: can add new connections to poll", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const limiter = new Limiter({ limit: 3 }); + + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process, connection().process); + + await delay(0); + expect(limiter.length).toBe(3); + + await delay(500); + expect(limiter.length).toBe(2); + + await delay(500); + expect(limiter.length).toBe(0); +}); + +test("Limiter: limit RPS - requests are evenly distributed", async () => { + const connection = setup({ + send: jest.fn(() => { + return Promise.resolve(); + }), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + + const limiter = new Limiter({ limit: 20, rps: 10 }); + const connections = Array.from({ length: 45 }, () => connection()); + + let count = 0; + const timestamps: number[] = []; + await limiter.process( + ...connections.map((c) => { + return () => { + ++count; + timestamps.push(Date.now()); + return c.process(); + }; + }), + ); + + expect(count).toBe(45); + const diffsAvg = + timestamps + .map((t, i) => { + return i === 0 ? 100 : t - timestamps[i - 1]; + }) + .reduce((a, b) => a + b) / timestamps.length; + expect(diffsAvg).toBeGreaterThan(99); + expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms +}); + +test("Limiter: throws an error by deafult", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 6 }, () => connection()); + + try { + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + } catch (e) { + expect(e).toBe(1); + } + + expect(limiter.length).toBe(0); + + expect(connections[0].send).toBeCalledTimes(3); +}); + +test("Limiter: #onError, no trow", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const onError = jest.fn(() => {}); + const limiter = new Limiter({ + limit: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(6); + expect(onError).toBeCalledTimes(6); +}); + +test("Limiter: #maxRetry, exit on fail", async () => { + const connection = setup({ + send: () => Promise.reject(1), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + }); + const connections = Array.from({ length: 6 }, () => connection()); + + let count = 0; + + try { + await limiter.process( + ...connections.map((c) => { + ++count; + return c.process; + }), + ); + } catch (e) { + expect(e).toBeInstanceOf(LimiterRetryError); + } + + expect(limiter.length).toBe(0); +}); + +test("Limiter: #onError, #maxRetry", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(new Error("Connection error"))), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + + let error; + const onError = jest.fn((err) => { + error = err; + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + expect(onError).toBeCalledTimes(6); + expect(error).toBeInstanceOf(LimiterRetryError); +}); diff --git a/limiter.ts b/limiter.ts new file mode 100644 index 0000000..7d11426 --- /dev/null +++ b/limiter.ts @@ -0,0 +1,146 @@ +type AsyncCallback = () => any | Promise; + +export interface ILimiter { + process: (...cb: AsyncCallback[]) => Promise; +} + +export interface ILimiterOptions { + limit?: number; + maxRetry?: number; + rps?: number; + onError?: (error: Error) => Promise | void; +} + +interface ILimiterRetryItem { + callback: AsyncCallback; + retries: number; + error?: Error; +} + +export class LimiterRetryError extends Error { + constructor(message: string, error?: Error) { + super(message); + this.name = "RetryError"; + if (error) { + this.stack = error.stack; + this.cause = error; + } + } +} + +export class Limiter implements ILimiter { + #limit = 10; + #promisesCount = 0; + #promises: Promise[] = []; + #retryQueue: Array = []; + #maxRetry = 0; + #rps: number | undefined; + #onError?: (error: Error) => void | Promise; + + constructor({ + limit = 10, + rps, + maxRetry = 0, + onError = undefined, + }: ILimiterOptions) { + this.#limit = limit; + this.#rps = rps; + this.#maxRetry = maxRetry; + this.#onError = onError?.bind(this); + } + + #tick = Date.now(); + async #limitRps(callback: AsyncCallback, delay = 0): Promise { + if (!this.#rps) { + return await callback(); + } + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + const diff = Date.now() - this.#tick; + if (diff < 1000 / this.#rps!) { + return await this.#limitRps(callback, 1000 / this.#rps! - diff); + } + this.#tick = Date.now(); + return await callback(); + } + + async #execute() { + try { + await Promise.all(this.#promises); + this.#promises = []; + } catch (error) { + if (!this.#onError) { + throw error; + } + for (;;) { + const promise = this.#promises.pop(); + if (!promise) break; + promise.catch(this.#onError); + } + } + } + + async process(...callbacks: AsyncCallback[] | ILimiterRetryItem[]) { + for (;;) { + const item = callbacks.pop(); + if (!item) break; + + if (this.#promisesCount >= this.#limit) { + await this.#execute(); + } + + this.#promisesCount++; + const promise = (async (item) => { + const callback = + (item as ILimiterRetryItem).callback || (item as AsyncCallback); + try { + const res = await this.#limitRps(callback); + this.#promisesCount--; + return res; + } catch (error) { + this.#promisesCount--; + if (this.#maxRetry > 0) { + this.#retryQueue.push({ + callback, + retries: (item as ILimiterRetryItem).retries ?? this.#maxRetry, + error: error as Error, + }); + } else { + throw error; + } + } + })(item); + this.#promises.push(promise); + } + + if (this.#promises.length > 0) { + await this.#execute(); + } + + if (this.#retryQueue.length > 0) { + const retryItems: ILimiterRetryItem[] = []; + for (;;) { + const item = this.#retryQueue.pop(); + if (!item) break; + if (item.retries > 0) { + item.retries--; + retryItems.push(item); + } else if (this.#onError) { + this.#onError( + new LimiterRetryError("Retry limit exceeded", item.error), + ); + } else { + throw new LimiterRetryError("Retry limit exceeded", item.error); + } + } + if (retryItems.length) { + await this.process(...retryItems); + } + } + } + + get length() { + return this.#promisesCount; + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..1b3a409 --- /dev/null +++ b/package.json @@ -0,0 +1,16 @@ +{ + "name": "@nesterow/limiter", + "module": "limiter.ts", + "type": "module", + "devDependencies": { + "@types/bun": "latest", + "prettier": "^3.3.2" + }, + "peerDependencies": { + "typescript": "^5.0.0" + }, + "scripts": { + "build": "tsc", + "format": "prettier --write ." + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..24fb845 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + // Enable latest features + "lib": ["ESNext", "DOM"], + "target": "ESNext", + "module": "ESNext", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": false, + "verbatimModuleSyntax": true, + "noEmit": false, + + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false, + "outDir": "dist", + "rootDir": "." + }, + "include": ["./*"], + "files": ["./limiter.ts"] +}