[feat] add done() callback

This commit is contained in:
Anton Nesterov 2024-07-11 17:09:29 +02:00
parent 2550f7e165
commit 4494c91bec
6 changed files with 169 additions and 78 deletions

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Anton Nesterov
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -19,44 +19,58 @@ Features:
bun add github:nesterow/limiter # or pnpm
```
## API
- limit - default 10
- maxRetry - number of retries, use Infinity to retry until dead
- rps - if set throttles task execution based on provided rate per second
- onError() - if set, the errors are handled silently
```typescript
limiter = new Limiter({
limit?: number;
maxRetry?: number;
rps?: number;
onError?: (error: Error) => Promise<void> | void;
})
```
## Usage
### Add tasks
```typescript
import { Limiter, LimiterRetryError } from "@nesterow/limiter";
const task = ({ url }) => {
await fetch(url);
// ... write
};
import { Limiter } from "@nesterow/limiter";
const limiter = new Limiter({
limit: 20,
onError(error) {
// Logger.error(error)
},
});
const task = () => {
await fetch(url);
};
limiter.process(task);
limiter.process(task);
limiter.process(task);
await limiter.done()
```
### Limit number of requests
### Batch processing
```typescript
import { Limiter } from "@nesterow/limiter";
const task = () => {
await fetch("https://my.api.xyz");
// ... write
};
const limiter = new Limiter({
limit: 10,
});
// process 100 tasks, 10 at the same time
await limiter.process(...Array.from({ length: 100 }, () => task()));
```
@ -67,7 +81,6 @@ import { Limiter } from "@nesterow/limiter";
const execEvery100ms = () => {
await fetch("https://my.api.xyz");
// ... write
};
const limiter = new Limiter({
@ -75,6 +88,7 @@ const limiter = new Limiter({
rps: 10,
});
// trottle every 100ms
await limiter.process(...Array.from({ length: 100 }, () => execEvery100ms()));
```
@ -86,7 +100,6 @@ import { Limiter, LimiterRetryError } from "@nesterow/limiter";
const retry5times = () => {
await fetch("https://my.api.xyz");
throw new Error("Connection refused");
// ... write
};
const limiter = new Limiter({
@ -96,7 +109,7 @@ const limiter = new Limiter({
for (let i = 0; i < 100; i++) {
try {
await limiter.process(Array.from({ length: 100 }, () => retry5times()));
await limiter.process(...Array.from({ length: 100 }, () => retry5times()));
} catch (e) {
if (e instanceof LimiterRetryError) {
// Logger.log(e)
@ -105,26 +118,6 @@ for (let i = 0; i < 100; i++) {
}
```
### Handle errors in background
## License
```typescript
import { Limiter, LimiterRetryError } from "@nesterow/limiter";
const wontStopPooling = () => {
await fetch("https://my.api.xyz");
throw new Error("Connection refused");
// ... write
};
const limiter = new Limiter({
limit: 20,
maxRetry: 5,
onError(error) {
// Logger.error(error)
},
});
for (let i = 0; i < 100; i++) {
await limiter.process(Array.from({ length: 100 }, () => wontStopPooling()));
}
```
MIT

31
dist/limiter.js vendored
View file

@ -10,6 +10,7 @@ export class LimiterRetryError extends Error {
}
export class Limiter {
#limit = 10;
#processing = false;
#promisesCount = 0;
#promises = [];
#retryQueue = [];
@ -43,6 +44,8 @@ export class Limiter {
this.#promises = [];
} catch (error) {
if (!this.#onError) {
this.#promises = [];
this.#processing = false;
throw error;
}
for (;;) {
@ -52,7 +55,13 @@ export class Limiter {
}
}
}
/**
* Process the callbacks.
* A callback must be a function that returns a promise.
* @param callbacks
*/
async process(...callbacks) {
this.#processing = true;
for (;;) {
const item = callbacks.pop();
if (!item) break;
@ -97,6 +106,9 @@ export class Limiter {
new LimiterRetryError("Retry limit exceeded", item.error),
);
} else {
this.#promises = [];
this.#retryQueue = [];
this.#processing = false;
throw new LimiterRetryError("Retry limit exceeded", item.error);
}
}
@ -104,8 +116,27 @@ export class Limiter {
await this.process(...retryItems);
}
}
this.#processing = false;
}
/**
* Wait until all the promises are resolved.
**/
async done() {
if (this.isProcessing) {
await new Promise((resolve) => setTimeout(resolve, 10));
await this.done();
}
}
/**
* Get the number of promises in the queue.
*/
get length() {
return this.#promisesCount;
}
/**
* Get the processing status.
*/
get isProcessing() {
return this.#processing;
}
}

47
dist/limiter.test.js vendored
View file

@ -22,27 +22,32 @@ const setup = ({ send, close, delay = 300 }) => {
};
});
};
test("Limiter: opens #limit of concurent connections", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(
...connections.map((c) => {
return c.process;
}),
);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
expect(connections[0].send).toBeCalledTimes(7);
});
test(
"Limiter: opens #limit of concurent connections",
async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(
...connections.map((c) => {
return c.process;
}),
);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
await limiter.done();
expect(connections[0].send).toBeCalledTimes(7);
},
{ timeout: 5000 },
);
test("Limiter: can add new connections to poll", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),

View file

@ -25,33 +25,38 @@ const setup = ({ send, close, delay = 300 }: any) => {
});
};
test("Limiter: opens #limit of concurent connections", async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
test(
"Limiter: opens #limit of concurent connections",
async () => {
const connection = setup({
send: jest.fn(() => Promise.resolve()),
close: jest.fn(() => Promise.resolve()),
delay: 500,
});
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
const limiter = new Limiter({ limit: 3 });
const connections = Array.from({ length: 7 }, () => connection());
limiter.process(
...connections.map((c) => {
return c.process;
}),
);
limiter.process(
...connections.map((c) => {
return c.process;
}),
);
await delay(0);
expect(limiter.length).toBe(3);
await delay(0);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(3);
await delay(500);
expect(limiter.length).toBe(1);
await delay(500);
expect(limiter.length).toBe(1);
expect(connections[0].send).toBeCalledTimes(7);
});
await limiter.done();
expect(connections[0].send).toBeCalledTimes(7);
},
{ timeout: 5000 },
);
test("Limiter: can add new connections to poll", async () => {
const connection = setup({

View file

@ -2,6 +2,9 @@ type AsyncCallback = () => any | Promise<any>;
export interface ILimiter {
process: (...cb: AsyncCallback[]) => Promise<void>;
done: () => Promise<void>;
length: number;
isProcessing: boolean;
}
export interface ILimiterOptions {
@ -30,6 +33,7 @@ export class LimiterRetryError extends Error {
export class Limiter implements ILimiter {
#limit = 10;
#processing: boolean = false;
#promisesCount = 0;
#promises: Promise<any>[] = [];
#retryQueue: Array<ILimiterRetryItem> = [];
@ -71,6 +75,8 @@ export class Limiter implements ILimiter {
this.#promises = [];
} catch (error) {
if (!this.#onError) {
this.#promises = [];
this.#processing = false;
throw error;
}
for (;;) {
@ -81,7 +87,13 @@ export class Limiter implements ILimiter {
}
}
/**
* Process the callbacks.
* A callback must be a function that returns a promise.
* @param callbacks
*/
async process(...callbacks: AsyncCallback[] | ILimiterRetryItem[]) {
this.#processing = true;
for (;;) {
const item = callbacks.pop();
if (!item) break;
@ -131,6 +143,9 @@ export class Limiter implements ILimiter {
new LimiterRetryError("Retry limit exceeded", item.error),
);
} else {
this.#promises = [];
this.#retryQueue = [];
this.#processing = false;
throw new LimiterRetryError("Retry limit exceeded", item.error);
}
}
@ -138,9 +153,30 @@ export class Limiter implements ILimiter {
await this.process(...retryItems);
}
}
this.#processing = false;
}
/**
* Wait until all the promises are resolved.
**/
async done() {
if (this.isProcessing) {
await new Promise((resolve) => setTimeout(resolve, 10));
await this.done();
}
}
/**
* Get the number of promises in the queue.
*/
get length(): number {
return this.#promisesCount;
}
/**
* Get the processing status.
*/
get isProcessing(): boolean {
return this.#processing;
}
}