This commit is contained in:
Anton Nesterov 2024-07-11 01:14:10 +02:00
commit ac6faab5c7
9 changed files with 992 additions and 0 deletions

174
.gitignore vendored Normal file
View file

@ -0,0 +1,174 @@
# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore
# Logs
logs
_.log
npm-debug.log_
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# Caches
.cache
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json
# Runtime data
pids
_.pid
_.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# parcel-bundler cache (https://parceljs.org/)
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
# Gatsby files
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*
# IntelliJ based IDEs
.idea
# Finder (MacOS) folder config
.DS_Store

116
README.md Normal file
View file

@ -0,0 +1,116 @@
# Limiter
A promise poll with RPS limiter.
Features:
- [x] TypeScript first
- [x] Limits parrallel promises execution
- [x] Limits RPS (requests per second), evenly distributes the requests over time
- [x] Able to retry
- [x] Simple API
- [x] Simple async/await flow
- [x] Allows to handle errors silently using onError callback
- [x] Works with any runtime (Bun/Deno/Node)
## Install
```bash
bun add github:nesterow/limiter # or pnpm
```
## Usage
### Limit number of requests
```typescript
import {Limiter} from '@nesterow/limiter'
const task = () => {
await fetch('https://my.api.xyz')
// ... write
}
const limiter = new Limiter({
limit: 10
})
for (let i=0; i<100; i++) {
await limiter.process(task)
}
```
### Limit RPS
```typescript
import {Limiter} from '@nesterow/limiter'
const execEvery100ms = () => {
await fetch('https://my.api.xyz')
// ... write
}
const limiter = new Limiter({
limit: 20
rps: 10
})
for (let i=0; i < 100; i++) {
await limiter.process(execEvery100ms)
}
```
### Retry
```typescript
import {Limiter, LimiterRetryError} from '@nesterow/limiter'
const retry5times = () => {
await fetch('https://my.api.xyz')
throw new Error("Connection refused")
// ... write
}
const limiter = new Limiter({
limit: 20
maxRetry: 5
})
for (let i=0; i < 100; i++) {
try {
await limiter.process(retry5times)
} catch(e) {
if (e instanceof LimiterRetryError) {
// Logger.log(e)
}
}
}
```
### Handle errors in background
```typescript
import {Limiter, LimiterRetryError} from '@nesterow/limiter'
const wontStopPooling = () => {
await fetch('https://my.api.xyz')
throw new Error("Connection refused")
// ... write
}
const limiter = new Limiter({
limit: 20
maxRetry: 5,
onError(error) {
// Logger.error(error)
}
})
for (let i=0; i < 100; i++) {
await limiter.process(wontStopPooling)
}
```

BIN
bun.lockb Executable file

Binary file not shown.

117
dist/limiter.js vendored Normal file
View file

@ -0,0 +1,117 @@
export class LimiterRetryError extends Error {
constructor(message, error) {
super(message);
this.name = "RetryError";
if (error) {
this.stack = error.stack;
this.cause = error;
}
}
}
export class Limiter {
#limit = 10;
#promisesCount = 0;
#promises = [];
#retryQueue = [];
#maxRetry = 0;
#rps;
#onError;
constructor({ limit = 10, rps, maxRetry = 0, onError = undefined, }) {
this.#limit = limit;
this.#rps = rps;
this.#maxRetry = maxRetry;
this.#onError = onError?.bind(this);
}
#tick = Date.now();
async #limitRps(callback, delay = 0) {
if (!this.#rps) {
return await callback();
}
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}
const diff = Date.now() - this.#tick;
if (diff < 1000 / this.#rps) {
return await this.#limitRps(callback, 1000 / this.#rps - diff);
}
this.#tick = Date.now();
return await callback();
}
async #execute() {
try {
await Promise.all(this.#promises);
this.#promises = [];
}
catch (error) {
if (!this.#onError) {
throw error;
}
for (;;) {
const promise = this.#promises.pop();
if (!promise)
break;
promise.catch(this.#onError);
}
}
}
async process(...callbacks) {
for (;;) {
const item = callbacks.pop();
if (!item)
break;
if (this.#promisesCount >= this.#limit) {
await this.#execute();
}
this.#promisesCount++;
const promise = (async (item) => {
const callback = item.callback || item;
try {
const res = await this.#limitRps(callback);
this.#promisesCount--;
return res;
}
catch (error) {
this.#promisesCount--;
if (this.#maxRetry > 0) {
this.#retryQueue.push({
callback,
retries: item.retries ?? this.#maxRetry,
error: error,
});
}
else {
throw error;
}
}
})(item);
this.#promises.push(promise);
}
if (this.#promises.length > 0) {
await this.#execute();
}
if (this.#retryQueue.length > 0) {
const retryItems = [];
for (;;) {
const item = this.#retryQueue.pop();
if (!item)
break;
if (item.retries > 0) {
item.retries--;
retryItems.push(item);
}
else if (this.#onError) {
this.#onError(new LimiterRetryError("Retry limit exceeded", item.error));
}
else {
throw new LimiterRetryError("Retry limit exceeded", item.error);
}
}
if (retryItems.length) {
await this.process(...retryItems);
}
}
}
get length() {
return this.#promisesCount;
}
}

173
dist/limiter.test.js vendored Normal file
View file

@ -0,0 +1,173 @@
import { beforeAll, expect, test, jest } from "bun:test";
import { Limiter, LimiterRetryError } from "./limiter";
const delay = (ms) => new Promise((r) => setTimeout(r, ms));
const setup = ({ send, close, delay = 300 }) => {
return jest.fn(() => {
let closed = false;
let loading = false;
return {
process: jest.fn(async () => {
if (closed)
throw new Error("Connection closed");
//if (loading) throw new Error("Connection in use");
loading = true;
await send();
await new Promise((resolve) => setTimeout(resolve, delay));
loading = false;
}),
close: jest.fn(async () => {
close();
closed = true;
}),
send,
};
});
};
test("Limiter: opens #limit of concurent connections", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(...connections.map((c) => {
return c.process;
}));
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
expect(connections[0].send).toBeCalledTimes(7);
});
test("Limiter: can add new connections to poll", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process, connection().process);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(2);
await delay(500);
expect(limiter.length).toBe(0);
});
test("Limiter: limit RPS - requests are evenly distributed", async () => {
const connection = setup({
send: jest.fn(() => {
return Promise.resolve();
}),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({ limit: 20, rps: 10 });
const connections = Array.from({ length: 45 }, () => connection());
let count = 0;
const timestamps = [];
await limiter.process(...connections.map((c) => {
return () => {
++count;
timestamps.push(Date.now());
return c.process();
};
}));
expect(count).toBe(45);
const diffsAvg = timestamps
.map((t, i) => {
return i === 0 ? 100 : t - timestamps[i - 1];
})
.reduce((a, b) => a + b) / timestamps.length;
expect(diffsAvg).toBeGreaterThan(99);
expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms
});
test("Limiter: throws an error by deafult", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 6 }, () => connection());
try {
await limiter.process(...connections.map((c) => {
return c.process;
}));
}
catch (e) {
expect(e).toBe(1);
}
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(3);
});
test("Limiter: #onError, no trow", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const onError = jest.fn(() => { });
const limiter = new Limiter({
limit: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(...connections.map((c) => {
return c.process;
}));
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(6);
expect(onError).toBeCalledTimes(6);
});
test("Limiter: #maxRetry, exit on fail", async () => {
const connection = setup({
send: () => Promise.reject(1),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
});
const connections = Array.from({ length: 6 }, () => connection());
let count = 0;
try {
await limiter.process(...connections.map((c) => {
++count;
return c.process;
}));
}
catch (e) {
expect(e).toBeInstanceOf(LimiterRetryError);
}
expect(limiter.length).toBe(0);
});
test("Limiter: #onError, #maxRetry", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(new Error("Connection error"))),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
let error;
const onError = jest.fn((err) => {
error = err;
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(...connections.map((c) => {
return c.process;
}));
expect(onError).toBeCalledTimes(6);
expect(error).toBeInstanceOf(LimiterRetryError);
});

219
limiter.test.ts Normal file
View file

@ -0,0 +1,219 @@
import { beforeAll, expect, test, jest } from "bun:test";
import { Limiter, LimiterRetryError } from "./limiter";
const delay = (ms: number) => new Promise((r) => setTimeout(r, ms));
const setup = ({ send, close, delay = 300 }: any) => {
return jest.fn(() => {
let closed = false;
let loading = false;
return {
process: jest.fn(async () => {
if (closed) throw new Error("Connection closed");
//if (loading) throw new Error("Connection in use");
loading = true;
await send();
await new Promise((resolve) => setTimeout(resolve, delay));
loading = false;
}),
close: jest.fn(async () => {
close();
closed = true;
}),
send,
};
});
};
test("Limiter: opens #limit of concurent connections", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(
...connections.map((c) => {
return c.process;
}),
);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
expect(connections[0].send).toBeCalledTimes(7);
});
test("Limiter: can add new connections to poll", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process, connection().process);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(2);
await delay(500);
expect(limiter.length).toBe(0);
});
test("Limiter: limit RPS - requests are evenly distributed", async () => {
const connection = setup({
send: jest.fn(() => {
return Promise.resolve();
}),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({ limit: 20, rps: 10 });
const connections = Array.from({ length: 45 }, () => connection());
let count = 0;
const timestamps: number[] = [];
await limiter.process(
...connections.map((c) => {
return () => {
++count;
timestamps.push(Date.now());
return c.process();
};
}),
);
expect(count).toBe(45);
const diffsAvg =
timestamps
.map((t, i) => {
return i === 0 ? 100 : t - timestamps[i - 1];
})
.reduce((a, b) => a + b) / timestamps.length;
expect(diffsAvg).toBeGreaterThan(99);
expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms
});
test("Limiter: throws an error by deafult", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 6 }, () => connection());
try {
await limiter.process(
...connections.map((c) => {
return c.process;
}),
);
} catch (e) {
expect(e).toBe(1);
}
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(3);
});
test("Limiter: #onError, no trow", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const onError = jest.fn(() => {});
const limiter = new Limiter({
limit: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(
...connections.map((c) => {
return c.process;
}),
);
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(6);
expect(onError).toBeCalledTimes(6);
});
test("Limiter: #maxRetry, exit on fail", async () => {
const connection = setup({
send: () => Promise.reject(1),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
});
const connections = Array.from({ length: 6 }, () => connection());
let count = 0;
try {
await limiter.process(
...connections.map((c) => {
++count;
return c.process;
}),
);
} catch (e) {
expect(e).toBeInstanceOf(LimiterRetryError);
}
expect(limiter.length).toBe(0);
});
test("Limiter: #onError, #maxRetry", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(new Error("Connection error"))),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
let error;
const onError = jest.fn((err) => {
error = err;
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(
...connections.map((c) => {
return c.process;
}),
);
expect(onError).toBeCalledTimes(6);
expect(error).toBeInstanceOf(LimiterRetryError);
});

146
limiter.ts Normal file
View file

@ -0,0 +1,146 @@
type AsyncCallback = () => any | Promise<any>;
export interface ILimiter {
process: (...cb: AsyncCallback[]) => Promise<void>;
}
export interface ILimiterOptions {
limit?: number;
maxRetry?: number;
rps?: number;
onError?: (error: Error) => Promise<void> | void;
}
interface ILimiterRetryItem {
callback: AsyncCallback;
retries: number;
error?: Error;
}
export class LimiterRetryError extends Error {
constructor(message: string, error?: Error) {
super(message);
this.name = "RetryError";
if (error) {
this.stack = error.stack;
this.cause = error;
}
}
}
export class Limiter implements ILimiter {
#limit = 10;
#promisesCount = 0;
#promises: Promise<any>[] = [];
#retryQueue: Array<ILimiterRetryItem> = [];
#maxRetry = 0;
#rps: number | undefined;
#onError?: (error: Error) => void | Promise<void>;
constructor({
limit = 10,
rps,
maxRetry = 0,
onError = undefined,
}: ILimiterOptions) {
this.#limit = limit;
this.#rps = rps;
this.#maxRetry = maxRetry;
this.#onError = onError?.bind(this);
}
#tick = Date.now();
async #limitRps(callback: AsyncCallback, delay = 0): Promise<any> {
if (!this.#rps) {
return await callback();
}
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}
const diff = Date.now() - this.#tick;
if (diff < 1000 / this.#rps!) {
return await this.#limitRps(callback, 1000 / this.#rps! - diff);
}
this.#tick = Date.now();
return await callback();
}
async #execute() {
try {
await Promise.all(this.#promises);
this.#promises = [];
} catch (error) {
if (!this.#onError) {
throw error;
}
for (;;) {
const promise = this.#promises.pop();
if (!promise) break;
promise.catch(this.#onError);
}
}
}
async process(...callbacks: AsyncCallback[] | ILimiterRetryItem[]) {
for (;;) {
const item = callbacks.pop();
if (!item) break;
if (this.#promisesCount >= this.#limit) {
await this.#execute();
}
this.#promisesCount++;
const promise = (async (item) => {
const callback =
(item as ILimiterRetryItem).callback || (item as AsyncCallback);
try {
const res = await this.#limitRps(callback);
this.#promisesCount--;
return res;
} catch (error) {
this.#promisesCount--;
if (this.#maxRetry > 0) {
this.#retryQueue.push({
callback,
retries: (item as ILimiterRetryItem).retries ?? this.#maxRetry,
error: error as Error,
});
} else {
throw error;
}
}
})(item);
this.#promises.push(promise);
}
if (this.#promises.length > 0) {
await this.#execute();
}
if (this.#retryQueue.length > 0) {
const retryItems: ILimiterRetryItem[] = [];
for (;;) {
const item = this.#retryQueue.pop();
if (!item) break;
if (item.retries > 0) {
item.retries--;
retryItems.push(item);
} else if (this.#onError) {
this.#onError(
new LimiterRetryError("Retry limit exceeded", item.error),
);
} else {
throw new LimiterRetryError("Retry limit exceeded", item.error);
}
}
if (retryItems.length) {
await this.process(...retryItems);
}
}
}
get length() {
return this.#promisesCount;
}
}

16
package.json Normal file
View file

@ -0,0 +1,16 @@
{
"name": "@nesterow/limiter",
"module": "limiter.ts",
"type": "module",
"devDependencies": {
"@types/bun": "latest",
"prettier": "^3.3.2"
},
"peerDependencies": {
"typescript": "^5.0.0"
},
"scripts": {
"build": "tsc",
"format": "prettier --write ."
}
}

31
tsconfig.json Normal file
View file

@ -0,0 +1,31 @@
{
"compilerOptions": {
// Enable latest features
"lib": ["ESNext", "DOM"],
"target": "ESNext",
"module": "ESNext",
"moduleDetection": "force",
"jsx": "react-jsx",
"allowJs": true,
// Bundler mode
"moduleResolution": "bundler",
"allowImportingTsExtensions": false,
"verbatimModuleSyntax": true,
"noEmit": false,
// Best practices
"strict": true,
"skipLibCheck": true,
"noFallthroughCasesInSwitch": true,
// Some stricter flags (disabled by default)
"noUnusedLocals": false,
"noUnusedParameters": false,
"noPropertyAccessFromIndexSignature": false,
"outDir": "dist",
"rootDir": "."
},
"include": ["./*"],
"files": ["./limiter.ts"]
}