This commit is contained in:
Anton Nesterov 2024-07-11 01:20:10 +02:00
parent a6c579e429
commit 6884386903
6 changed files with 277 additions and 268 deletions

View file

@ -24,21 +24,20 @@ bun add github:nesterow/limiter # or pnpm
### Limit number of requests
```typescript
import {Limiter} from '@nesterow/limiter'
import { Limiter } from "@nesterow/limiter";
const task = () => {
await fetch('https://my.api.xyz')
// ... write
}
await fetch("https://my.api.xyz");
// ... write
};
const limiter = new Limiter({
limit: 10
})
limit: 10,
});
for (let i=0; i<100; i++) {
await limiter.process(task)
for (let i = 0; i < 100; i++) {
await limiter.process(task);
}
```
### Limit RPS

204
dist/limiter.js vendored
View file

@ -1,117 +1,111 @@
export class LimiterRetryError extends Error {
constructor(message, error) {
super(message);
this.name = "RetryError";
if (error) {
this.stack = error.stack;
this.cause = error;
}
constructor(message, error) {
super(message);
this.name = "RetryError";
if (error) {
this.stack = error.stack;
this.cause = error;
}
}
}
export class Limiter {
#limit = 10;
#promisesCount = 0;
#promises = [];
#retryQueue = [];
#maxRetry = 0;
#rps;
#onError;
constructor({ limit = 10, rps, maxRetry = 0, onError = undefined, }) {
this.#limit = limit;
this.#rps = rps;
this.#maxRetry = maxRetry;
this.#onError = onError?.bind(this);
#limit = 10;
#promisesCount = 0;
#promises = [];
#retryQueue = [];
#maxRetry = 0;
#rps;
#onError;
constructor({ limit = 10, rps, maxRetry = 0, onError = undefined }) {
this.#limit = limit;
this.#rps = rps;
this.#maxRetry = maxRetry;
this.#onError = onError?.bind(this);
}
#tick = Date.now();
async #limitRps(callback, delay = 0) {
if (!this.#rps) {
return await callback();
}
#tick = Date.now();
async #limitRps(callback, delay = 0) {
if (!this.#rps) {
return await callback();
}
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}
const diff = Date.now() - this.#tick;
if (diff < 1000 / this.#rps) {
return await this.#limitRps(callback, 1000 / this.#rps - diff);
}
this.#tick = Date.now();
return await callback();
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}
async #execute() {
const diff = Date.now() - this.#tick;
if (diff < 1000 / this.#rps) {
return await this.#limitRps(callback, 1000 / this.#rps - diff);
}
this.#tick = Date.now();
return await callback();
}
async #execute() {
try {
await Promise.all(this.#promises);
this.#promises = [];
} catch (error) {
if (!this.#onError) {
throw error;
}
for (;;) {
const promise = this.#promises.pop();
if (!promise) break;
promise.catch(this.#onError);
}
}
}
async process(...callbacks) {
for (;;) {
const item = callbacks.pop();
if (!item) break;
if (this.#promisesCount >= this.#limit) {
await this.#execute();
}
this.#promisesCount++;
const promise = (async (item) => {
const callback = item.callback || item;
try {
await Promise.all(this.#promises);
this.#promises = [];
}
catch (error) {
if (!this.#onError) {
throw error;
}
for (;;) {
const promise = this.#promises.pop();
if (!promise)
break;
promise.catch(this.#onError);
}
const res = await this.#limitRps(callback);
this.#promisesCount--;
return res;
} catch (error) {
this.#promisesCount--;
if (this.#maxRetry > 0) {
this.#retryQueue.push({
callback,
retries: item.retries ?? this.#maxRetry,
error: error,
});
} else {
throw error;
}
}
})(item);
this.#promises.push(promise);
}
async process(...callbacks) {
for (;;) {
const item = callbacks.pop();
if (!item)
break;
if (this.#promisesCount >= this.#limit) {
await this.#execute();
}
this.#promisesCount++;
const promise = (async (item) => {
const callback = item.callback || item;
try {
const res = await this.#limitRps(callback);
this.#promisesCount--;
return res;
}
catch (error) {
this.#promisesCount--;
if (this.#maxRetry > 0) {
this.#retryQueue.push({
callback,
retries: item.retries ?? this.#maxRetry,
error: error,
});
}
else {
throw error;
}
}
})(item);
this.#promises.push(promise);
}
if (this.#promises.length > 0) {
await this.#execute();
}
if (this.#retryQueue.length > 0) {
const retryItems = [];
for (;;) {
const item = this.#retryQueue.pop();
if (!item)
break;
if (item.retries > 0) {
item.retries--;
retryItems.push(item);
}
else if (this.#onError) {
this.#onError(new LimiterRetryError("Retry limit exceeded", item.error));
}
else {
throw new LimiterRetryError("Retry limit exceeded", item.error);
}
}
if (retryItems.length) {
await this.process(...retryItems);
}
}
if (this.#promises.length > 0) {
await this.#execute();
}
get length() {
return this.#promisesCount;
if (this.#retryQueue.length > 0) {
const retryItems = [];
for (;;) {
const item = this.#retryQueue.pop();
if (!item) break;
if (item.retries > 0) {
item.retries--;
retryItems.push(item);
} else if (this.#onError) {
this.#onError(
new LimiterRetryError("Retry limit exceeded", item.error),
);
} else {
throw new LimiterRetryError("Retry limit exceeded", item.error);
}
}
if (retryItems.length) {
await this.process(...retryItems);
}
}
}
get length() {
return this.#promisesCount;
}
}

316
dist/limiter.test.js vendored
View file

@ -2,172 +2,182 @@ import { beforeAll, expect, test, jest } from "bun:test";
import { Limiter, LimiterRetryError } from "./limiter";
const delay = (ms) => new Promise((r) => setTimeout(r, ms));
const setup = ({ send, close, delay = 300 }) => {
return jest.fn(() => {
let closed = false;
let loading = false;
return {
process: jest.fn(async () => {
if (closed)
throw new Error("Connection closed");
//if (loading) throw new Error("Connection in use");
loading = true;
await send();
await new Promise((resolve) => setTimeout(resolve, delay));
loading = false;
}),
close: jest.fn(async () => {
close();
closed = true;
}),
send,
};
});
return jest.fn(() => {
let closed = false;
let loading = false;
return {
process: jest.fn(async () => {
if (closed) throw new Error("Connection closed");
//if (loading) throw new Error("Connection in use");
loading = true;
await send();
await new Promise((resolve) => setTimeout(resolve, delay));
loading = false;
}),
close: jest.fn(async () => {
close();
closed = true;
}),
send,
};
});
};
test("Limiter: opens #limit of concurent connections", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(...connections.map((c) => {
return c.process;
}));
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
expect(connections[0].send).toBeCalledTimes(7);
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(
...connections.map((c) => {
return c.process;
}),
);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
expect(connections[0].send).toBeCalledTimes(7);
});
test("Limiter: can add new connections to poll", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process, connection().process);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(2);
await delay(500);
expect(limiter.length).toBe(0);
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process);
limiter.process(connection().process, connection().process);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(2);
await delay(500);
expect(limiter.length).toBe(0);
});
test("Limiter: limit RPS - requests are evenly distributed", async () => {
const connection = setup({
send: jest.fn(() => {
return Promise.resolve();
}),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({ limit: 20, rps: 10 });
const connections = Array.from({ length: 45 }, () => connection());
let count = 0;
const timestamps = [];
await limiter.process(...connections.map((c) => {
return () => {
++count;
timestamps.push(Date.now());
return c.process();
};
}));
expect(count).toBe(45);
const diffsAvg = timestamps
.map((t, i) => {
const connection = setup({
send: jest.fn(() => {
return Promise.resolve();
}),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({ limit: 20, rps: 10 });
const connections = Array.from({ length: 45 }, () => connection());
let count = 0;
const timestamps = [];
await limiter.process(
...connections.map((c) => {
return () => {
++count;
timestamps.push(Date.now());
return c.process();
};
}),
);
expect(count).toBe(45);
const diffsAvg =
timestamps
.map((t, i) => {
return i === 0 ? 100 : t - timestamps[i - 1];
})
.reduce((a, b) => a + b) / timestamps.length;
expect(diffsAvg).toBeGreaterThan(99);
expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms
})
.reduce((a, b) => a + b) / timestamps.length;
expect(diffsAvg).toBeGreaterThan(99);
expect(diffsAvg).toBeLessThan(102); // 100ms +- 2ms
});
test("Limiter: throws an error by deafult", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 6 }, () => connection());
try {
await limiter.process(...connections.map((c) => {
return c.process;
}));
}
catch (e) {
expect(e).toBe(1);
}
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(3);
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 6 }, () => connection());
try {
await limiter.process(
...connections.map((c) => {
return c.process;
}),
);
} catch (e) {
expect(e).toBe(1);
}
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(3);
});
test("Limiter: #onError, no trow", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const onError = jest.fn(() => { });
const limiter = new Limiter({
limit: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(...connections.map((c) => {
return c.process;
}));
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(6);
expect(onError).toBeCalledTimes(6);
const connection = setup({
send: jest.fn(() => Promise.reject(1)),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const onError = jest.fn(() => {});
const limiter = new Limiter({
limit: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(
...connections.map((c) => {
return c.process;
}),
);
expect(limiter.length).toBe(0);
expect(connections[0].send).toBeCalledTimes(6);
expect(onError).toBeCalledTimes(6);
});
test("Limiter: #maxRetry, exit on fail", async () => {
const connection = setup({
send: () => Promise.reject(1),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
});
const connections = Array.from({ length: 6 }, () => connection());
let count = 0;
try {
await limiter.process(...connections.map((c) => {
++count;
return c.process;
}));
}
catch (e) {
expect(e).toBeInstanceOf(LimiterRetryError);
}
expect(limiter.length).toBe(0);
const connection = setup({
send: () => Promise.reject(1),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
});
const connections = Array.from({ length: 6 }, () => connection());
let count = 0;
try {
await limiter.process(
...connections.map((c) => {
++count;
return c.process;
}),
);
} catch (e) {
expect(e).toBeInstanceOf(LimiterRetryError);
}
expect(limiter.length).toBe(0);
});
test("Limiter: #onError, #maxRetry", async () => {
const connection = setup({
send: jest.fn(() => Promise.reject(new Error("Connection error"))),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
let error;
const onError = jest.fn((err) => {
error = err;
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(...connections.map((c) => {
return c.process;
}));
expect(onError).toBeCalledTimes(6);
expect(error).toBeInstanceOf(LimiterRetryError);
const connection = setup({
send: jest.fn(() => Promise.reject(new Error("Connection error"))),
close: jest.fn(() => Promise.resolve()),
delay: 0,
});
let error;
const onError = jest.fn((err) => {
error = err;
});
const limiter = new Limiter({
limit: 3,
maxRetry: 3,
onError,
});
const connections = Array.from({ length: 6 }, () => connection());
await limiter.process(
...connections.map((c) => {
return c.process;
}),
);
expect(onError).toBeCalledTimes(6);
expect(error).toBeInstanceOf(LimiterRetryError);
});

5
jsr.json Normal file
View file

@ -0,0 +1,5 @@
{
"name": "@nesterow/limiter",
"version": "0.1.0",
"exports": "./limiter.ts"
}

View file

@ -140,7 +140,7 @@ export class Limiter implements ILimiter {
}
}
get length() {
get length(): number {
return this.#promisesCount;
}
}

View file

@ -1,4 +1,5 @@
{
"version": "0.1.0",
"name": "@nesterow/limiter",
"module": "limiter.ts",
"type": "module",