From ac6faab5c7aab5407c983c5a5459423b0b281205 Mon Sep 17 00:00:00 2001 From: Anton Nesterov Date: Thu, 11 Jul 2024 01:14:10 +0200 Subject: [PATCH] init --- .gitignore | 174 ++++++++++++++++++++++++++++++++++ README.md | 116 +++++++++++++++++++++++ bun.lockb | Bin 0 -> 3484 bytes dist/limiter.js | 117 +++++++++++++++++++++++ dist/limiter.test.js | 173 ++++++++++++++++++++++++++++++++++ limiter.test.ts | 219 +++++++++++++++++++++++++++++++++++++++++++ limiter.ts | 146 +++++++++++++++++++++++++++++ package.json | 16 ++++ tsconfig.json | 31 ++++++ 9 files changed, 992 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100755 bun.lockb create mode 100644 dist/limiter.js create mode 100644 dist/limiter.test.js create mode 100644 limiter.test.ts create mode 100644 limiter.ts create mode 100644 package.json create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2c8942c --- /dev/null +++ b/.gitignore @@ -0,0 +1,174 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..eed158a --- /dev/null +++ b/README.md @@ -0,0 +1,116 @@ +# Limiter + +A promise poll with RPS limiter. + +Features: + +- [x] TypeScript first +- [x] Limits parrallel promises execution +- [x] Limits RPS (requests per second), evenly distributes the requests over time +- [x] Able to retry +- [x] Simple API +- [x] Simple async/await flow +- [x] Allows to handle errors silently using onError callback +- [x] Works with any runtime (Bun/Deno/Node) + +## Install + +```bash +bun add github:nesterow/limiter # or pnpm +``` + +## Usage + +### Limit number of requests + +```typescript +import {Limiter} from '@nesterow/limiter' + +const task = () => { + await fetch('https://my.api.xyz') + // ... write +} + +const limiter = new Limiter({ + limit: 10 +}) + +for (let i=0; i<100; i++) { + await limiter.process(task) +} + +``` + +### Limit RPS + +```typescript +import {Limiter} from '@nesterow/limiter' + +const execEvery100ms = () => { + await fetch('https://my.api.xyz') + // ... write +} + +const limiter = new Limiter({ + limit: 20 + rps: 10 +}) + +for (let i=0; i < 100; i++) { + await limiter.process(execEvery100ms) +} + +``` + +### Retry + +```typescript +import {Limiter, LimiterRetryError} from '@nesterow/limiter' + +const retry5times = () => { + await fetch('https://my.api.xyz') + throw new Error("Connection refused") + // ... write +} + +const limiter = new Limiter({ + limit: 20 + maxRetry: 5 +}) + +for (let i=0; i < 100; i++) { + try { + await limiter.process(retry5times) + } catch(e) { + if (e instanceof LimiterRetryError) { + // Logger.log(e) + } + } +} + +``` + +### Handle errors in background + +```typescript +import {Limiter, LimiterRetryError} from '@nesterow/limiter' + +const wontStopPooling = () => { + await fetch('https://my.api.xyz') + throw new Error("Connection refused") + // ... write +} + +const limiter = new Limiter({ + limit: 20 + maxRetry: 5, + onError(error) { + // Logger.error(error) + } +}) + +for (let i=0; i < 100; i++) { + await limiter.process(wontStopPooling) +} + +``` diff --git a/bun.lockb b/bun.lockb new file mode 100755 index 0000000000000000000000000000000000000000..6c2e827f7ae0e1119027009f40e64370f0123082 GIT binary patch literal 3484 zcmeHJYfuwc6y9J!42p=$CMyEm&XGDs?*67ac2#meP*cI-Q9a_-)H zzHjfn=j_cF8>1}dIpsXos${GV`MhGQ44T4XGtM`$7Di#RafP{p!l9A{Q52=SZ+Pf3 z{yrhDuSDJ>!rG|47rl2zyWt>xc~^GytzDCP_JSa=P_O%!clyTjNdQ%7wA*YJin4Qz zAg~N4g9vEOD)2qPUk4usK2}0eJ;DFr8wcl!b0rSE)G>A1y{kE@%smMQ*PIS{^RPj+ zAAd-|9>L#*sF8q=0uSd*g$G3W zCkb8&iyjO3V3-;J4SiRP;O&4<20Ru9VzCaoVg$b%@Ot0-5sx|Os*#*wJobT#l0rjn z%mY4mhlYk8pF0Z}`Y(oe_jV)U%QWDVP+nd!wRC52&Y)!jj~?rrwV~|Nu@#QRU%9K! zZ!M`dJmrza7NVLsx8-*1e4*<67!fZycU-pg zK#gooYSsm|Uxb4nueQWb__U=hm7!0Do$0Ti`Q340dsA4i4F!kmObum<{f>lQ;oELH zCu;hQTRM1d`ycTZm55iIH>tVugKb%nB@g@8@3ZV`e{eywsx*<-R+i7t>^3ZJ!Ht%_ z!J8)5)m|0098bBxFHWx)@?$bo4U(!8e0pUKo7^pZeQ-(9tb|$- zuei>o@sn4lwQLr~$#!HERev&Mx-)8ee$uZEX=g__cQ}Ol)a|aA+mk(sZ0%#nSn^X-$fLZKD>jap602!GH~4MITLXWtPFopXXBri9 z^Y*Z_k@el_p;1M^`!vJaAon@mClT;nJ}vHLkC7Y`|2+K9*wXV*y~2o2V7KqW{Kn^H zdY&`Vtd$pXEf$){4SFiaFzIrQO3t(8jLnoNPnPFeShLk;68#{!AA1eyv7Xy~oIOD> zpvm;bNt$$kK7#~vj5UgP4b}>N!}z`FeE1n%;KBDI2XY|~;t3b}3c}@Ew@g1V-(OyJD!HkyHq|nJv!3>AJqiy%?`;N~) znQxvcbTAw}p;TZvYp!LAz_C_yx?Jr)8r_peLkYam!+Y%SsIAPohS+jF<9ZP2z4^D65tq=W}_39s;$tLI2OHQ>#AfNl6w2?YKJc)1ob0vqvG#{cjC EKdG0I8UO$Q literal 0 HcmV?d00001 diff --git a/dist/limiter.js b/dist/limiter.js new file mode 100644 index 0000000..aa56816 --- /dev/null +++ b/dist/limiter.js @@ -0,0 +1,117 @@ +export class LimiterRetryError extends Error { + constructor(message, error) { + super(message); + this.name = "RetryError"; + if (error) { + this.stack = error.stack; + this.cause = error; + } + } +} +export class Limiter { + #limit = 10; + #promisesCount = 0; + #promises = []; + #retryQueue = []; + #maxRetry = 0; + #rps; + #onError; + constructor({ limit = 10, rps, maxRetry = 0, onError = undefined, }) { + this.#limit = limit; + this.#rps = rps; + this.#maxRetry = maxRetry; + this.#onError = onError?.bind(this); + } + #tick = Date.now(); + async #limitRps(callback, delay = 0) { + if (!this.#rps) { + return await callback(); + } + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + const diff = Date.now() - this.#tick; + if (diff < 1000 / this.#rps) { + return await this.#limitRps(callback, 1000 / this.#rps - diff); + } + this.#tick = Date.now(); + return await callback(); + } + async #execute() { + try { + await Promise.all(this.#promises); + this.#promises = []; + } + catch (error) { + if (!this.#onError) { + throw error; + } + for (;;) { + const promise = this.#promises.pop(); + if (!promise) + break; + promise.catch(this.#onError); + } + } + } + async process(...callbacks) { + for (;;) { + const item = callbacks.pop(); + if (!item) + break; + if (this.#promisesCount >= this.#limit) { + await this.#execute(); + } + this.#promisesCount++; + const promise = (async (item) => { + const callback = item.callback || item; + try { + const res = await this.#limitRps(callback); + this.#promisesCount--; + return res; + } + catch (error) { + this.#promisesCount--; + if (this.#maxRetry > 0) { + this.#retryQueue.push({ + callback, + retries: item.retries ?? this.#maxRetry, + error: error, + }); + } + else { + throw error; + } + } + })(item); + this.#promises.push(promise); + } + if (this.#promises.length > 0) { + await this.#execute(); + } + if (this.#retryQueue.length > 0) { + const retryItems = []; + for (;;) { + const item = this.#retryQueue.pop(); + if (!item) + break; + if (item.retries > 0) { + item.retries--; + retryItems.push(item); + } + else if (this.#onError) { + this.#onError(new LimiterRetryError("Retry limit exceeded", item.error)); + } + else { + throw new LimiterRetryError("Retry limit exceeded", item.error); + } + } + if (retryItems.length) { + await this.process(...retryItems); + } + } + } + get length() { + return this.#promisesCount; + } +} diff --git a/dist/limiter.test.js b/dist/limiter.test.js new file mode 100644 index 0000000..7c20a9a --- /dev/null +++ b/dist/limiter.test.js @@ -0,0 +1,173 @@ +import { beforeAll, expect, test, jest } from "bun:test"; +import { Limiter, LimiterRetryError } from "./limiter"; +const delay = (ms) => new Promise((r) => setTimeout(r, ms)); +const setup = ({ send, close, delay = 300 }) => { + return jest.fn(() => { + let closed = false; + let loading = false; + return { + process: jest.fn(async () => { + if (closed) + throw new Error("Connection closed"); + //if (loading) throw new Error("Connection in use"); + loading = true; + await send(); + await new Promise((resolve) => setTimeout(resolve, delay)); + loading = false; + }), + close: jest.fn(async () => { + close(); + closed = true; + }), + send, + }; + }); +}; +test("Limiter: opens #limit of concurent connections", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); + limiter.process(...connections.map((c) => { + return c.process; + })); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(1); + expect(connections[0].send).toBeCalledTimes(7); +}); +test("Limiter: can add new connections to poll", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process, connection().process); + await delay(0); + expect(limiter.length).toBe(3); + await delay(500); + expect(limiter.length).toBe(2); + await delay(500); + expect(limiter.length).toBe(0); +}); +test("Limiter: limit RPS - requests are evenly distributed", async () => { + const connection = setup({ + send: jest.fn(() => { + return Promise.resolve(); + }), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + const limiter = new Limiter({ limit: 20, rps: 10 }); + const connections = Array.from({ length: 45 }, () => connection()); + let count = 0; + const timestamps = []; + await limiter.process(...connections.map((c) => { + return () => { + ++count; + timestamps.push(Date.now()); + return c.process(); + }; + })); + expect(count).toBe(45); + const diffsAvg = timestamps + .map((t, i) => { + return i === 0 ? 100 : t - timestamps[i - 1]; + }) + .reduce((a, b) => a + b) / timestamps.length; + expect(diffsAvg).toBeGreaterThan(99); + expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms +}); +test("Limiter: throws an error by deafult", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 6 }, () => connection()); + try { + await limiter.process(...connections.map((c) => { + return c.process; + })); + } + catch (e) { + expect(e).toBe(1); + } + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(3); +}); +test("Limiter: #onError, no trow", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + const onError = jest.fn(() => { }); + const limiter = new Limiter({ + limit: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + await limiter.process(...connections.map((c) => { + return c.process; + })); + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(6); + expect(onError).toBeCalledTimes(6); +}); +test("Limiter: #maxRetry, exit on fail", async () => { + const connection = setup({ + send: () => Promise.reject(1), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + }); + const connections = Array.from({ length: 6 }, () => connection()); + let count = 0; + try { + await limiter.process(...connections.map((c) => { + ++count; + return c.process; + })); + } + catch (e) { + expect(e).toBeInstanceOf(LimiterRetryError); + } + expect(limiter.length).toBe(0); +}); +test("Limiter: #onError, #maxRetry", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(new Error("Connection error"))), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + let error; + const onError = jest.fn((err) => { + error = err; + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + await limiter.process(...connections.map((c) => { + return c.process; + })); + expect(onError).toBeCalledTimes(6); + expect(error).toBeInstanceOf(LimiterRetryError); +}); diff --git a/limiter.test.ts b/limiter.test.ts new file mode 100644 index 0000000..86a36e7 --- /dev/null +++ b/limiter.test.ts @@ -0,0 +1,219 @@ +import { beforeAll, expect, test, jest } from "bun:test"; +import { Limiter, LimiterRetryError } from "./limiter"; + +const delay = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +const setup = ({ send, close, delay = 300 }: any) => { + return jest.fn(() => { + let closed = false; + let loading = false; + return { + process: jest.fn(async () => { + if (closed) throw new Error("Connection closed"); + //if (loading) throw new Error("Connection in use"); + loading = true; + await send(); + await new Promise((resolve) => setTimeout(resolve, delay)); + loading = false; + }), + close: jest.fn(async () => { + close(); + closed = true; + }), + send, + }; + }); +}; + +test("Limiter: opens #limit of concurent connections", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 7 }, () => connection()); + + limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + + await delay(0); + expect(limiter.length).toBe(3); + + await delay(500); + expect(limiter.length).toBe(3); + + await delay(500); + expect(limiter.length).toBe(1); + + expect(connections[0].send).toBeCalledTimes(7); +}); + +test("Limiter: can add new connections to poll", async () => { + const connection = setup({ + send: jest.fn(() => Promise.resolve()), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const limiter = new Limiter({ limit: 3 }); + + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process); + limiter.process(connection().process, connection().process); + + await delay(0); + expect(limiter.length).toBe(3); + + await delay(500); + expect(limiter.length).toBe(2); + + await delay(500); + expect(limiter.length).toBe(0); +}); + +test("Limiter: limit RPS - requests are evenly distributed", async () => { + const connection = setup({ + send: jest.fn(() => { + return Promise.resolve(); + }), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + + const limiter = new Limiter({ limit: 20, rps: 10 }); + const connections = Array.from({ length: 45 }, () => connection()); + + let count = 0; + const timestamps: number[] = []; + await limiter.process( + ...connections.map((c) => { + return () => { + ++count; + timestamps.push(Date.now()); + return c.process(); + }; + }), + ); + + expect(count).toBe(45); + const diffsAvg = + timestamps + .map((t, i) => { + return i === 0 ? 100 : t - timestamps[i - 1]; + }) + .reduce((a, b) => a + b) / timestamps.length; + expect(diffsAvg).toBeGreaterThan(99); + expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms +}); + +test("Limiter: throws an error by deafult", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const limiter = new Limiter({ limit: 3 }); + const connections = Array.from({ length: 6 }, () => connection()); + + try { + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + } catch (e) { + expect(e).toBe(1); + } + + expect(limiter.length).toBe(0); + + expect(connections[0].send).toBeCalledTimes(3); +}); + +test("Limiter: #onError, no trow", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(1)), + close: jest.fn(() => Promise.resolve()), + delay: 500, + }); + + const onError = jest.fn(() => {}); + const limiter = new Limiter({ + limit: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + + expect(limiter.length).toBe(0); + expect(connections[0].send).toBeCalledTimes(6); + expect(onError).toBeCalledTimes(6); +}); + +test("Limiter: #maxRetry, exit on fail", async () => { + const connection = setup({ + send: () => Promise.reject(1), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + }); + const connections = Array.from({ length: 6 }, () => connection()); + + let count = 0; + + try { + await limiter.process( + ...connections.map((c) => { + ++count; + return c.process; + }), + ); + } catch (e) { + expect(e).toBeInstanceOf(LimiterRetryError); + } + + expect(limiter.length).toBe(0); +}); + +test("Limiter: #onError, #maxRetry", async () => { + const connection = setup({ + send: jest.fn(() => Promise.reject(new Error("Connection error"))), + close: jest.fn(() => Promise.resolve()), + delay: 0, + }); + + let error; + const onError = jest.fn((err) => { + error = err; + }); + const limiter = new Limiter({ + limit: 3, + maxRetry: 3, + onError, + }); + const connections = Array.from({ length: 6 }, () => connection()); + + await limiter.process( + ...connections.map((c) => { + return c.process; + }), + ); + expect(onError).toBeCalledTimes(6); + expect(error).toBeInstanceOf(LimiterRetryError); +}); diff --git a/limiter.ts b/limiter.ts new file mode 100644 index 0000000..7d11426 --- /dev/null +++ b/limiter.ts @@ -0,0 +1,146 @@ +type AsyncCallback = () => any | Promise; + +export interface ILimiter { + process: (...cb: AsyncCallback[]) => Promise; +} + +export interface ILimiterOptions { + limit?: number; + maxRetry?: number; + rps?: number; + onError?: (error: Error) => Promise | void; +} + +interface ILimiterRetryItem { + callback: AsyncCallback; + retries: number; + error?: Error; +} + +export class LimiterRetryError extends Error { + constructor(message: string, error?: Error) { + super(message); + this.name = "RetryError"; + if (error) { + this.stack = error.stack; + this.cause = error; + } + } +} + +export class Limiter implements ILimiter { + #limit = 10; + #promisesCount = 0; + #promises: Promise[] = []; + #retryQueue: Array = []; + #maxRetry = 0; + #rps: number | undefined; + #onError?: (error: Error) => void | Promise; + + constructor({ + limit = 10, + rps, + maxRetry = 0, + onError = undefined, + }: ILimiterOptions) { + this.#limit = limit; + this.#rps = rps; + this.#maxRetry = maxRetry; + this.#onError = onError?.bind(this); + } + + #tick = Date.now(); + async #limitRps(callback: AsyncCallback, delay = 0): Promise { + if (!this.#rps) { + return await callback(); + } + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + const diff = Date.now() - this.#tick; + if (diff < 1000 / this.#rps!) { + return await this.#limitRps(callback, 1000 / this.#rps! - diff); + } + this.#tick = Date.now(); + return await callback(); + } + + async #execute() { + try { + await Promise.all(this.#promises); + this.#promises = []; + } catch (error) { + if (!this.#onError) { + throw error; + } + for (;;) { + const promise = this.#promises.pop(); + if (!promise) break; + promise.catch(this.#onError); + } + } + } + + async process(...callbacks: AsyncCallback[] | ILimiterRetryItem[]) { + for (;;) { + const item = callbacks.pop(); + if (!item) break; + + if (this.#promisesCount >= this.#limit) { + await this.#execute(); + } + + this.#promisesCount++; + const promise = (async (item) => { + const callback = + (item as ILimiterRetryItem).callback || (item as AsyncCallback); + try { + const res = await this.#limitRps(callback); + this.#promisesCount--; + return res; + } catch (error) { + this.#promisesCount--; + if (this.#maxRetry > 0) { + this.#retryQueue.push({ + callback, + retries: (item as ILimiterRetryItem).retries ?? this.#maxRetry, + error: error as Error, + }); + } else { + throw error; + } + } + })(item); + this.#promises.push(promise); + } + + if (this.#promises.length > 0) { + await this.#execute(); + } + + if (this.#retryQueue.length > 0) { + const retryItems: ILimiterRetryItem[] = []; + for (;;) { + const item = this.#retryQueue.pop(); + if (!item) break; + if (item.retries > 0) { + item.retries--; + retryItems.push(item); + } else if (this.#onError) { + this.#onError( + new LimiterRetryError("Retry limit exceeded", item.error), + ); + } else { + throw new LimiterRetryError("Retry limit exceeded", item.error); + } + } + if (retryItems.length) { + await this.process(...retryItems); + } + } + } + + get length() { + return this.#promisesCount; + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..1b3a409 --- /dev/null +++ b/package.json @@ -0,0 +1,16 @@ +{ + "name": "@nesterow/limiter", + "module": "limiter.ts", + "type": "module", + "devDependencies": { + "@types/bun": "latest", + "prettier": "^3.3.2" + }, + "peerDependencies": { + "typescript": "^5.0.0" + }, + "scripts": { + "build": "tsc", + "format": "prettier --write ." + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..24fb845 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + // Enable latest features + "lib": ["ESNext", "DOM"], + "target": "ESNext", + "module": "ESNext", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": false, + "verbatimModuleSyntax": true, + "noEmit": false, + + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false, + "outDir": "dist", + "rootDir": "." + }, + "include": ["./*"], + "files": ["./limiter.ts"] +}