Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | 1x 1x 1x | import { EventEmitter } from 'events';
import { Queue } from './Queue';
/**
* Serializes execution of events. This class is constructed with
* a delegate function that is executed for each item that is enqueued.
*/
export class AsyncProcessingQueue<T> extends EventEmitter {
private _fn: (...args: T[]) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private _queue: Queue<any>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private _flushHandle: any;
constructor(fn: (...args: T[]) => Promise<void>) {
super();
this._fn = fn;
this._queue = new Queue();
this._flush = this._flush.bind(this);
}
/**
* Adds a new item to the processing queue
*/
public enqueue(value: T): void {
this._queue.enqueue(value);
// Postpone flushing until end of event loop to allow multiple operations
// to enqueue. This handle will be cleared once flushing has completed.
if (!this._flushHandle) this._flushHandle = setImmediate(this._flush);
}
/**
* Gets the number of pending items in the processor queue
* @type {number}
*/
get size(): number {
return this._queue.length;
}
private async _flush() {
// emit that flushing is starting
this.emit('flushing');
// process all items on the queue, even items that
// are added to the queue while flushing is occuring
while (this._queue.length > 0) {
try {
const value = this._queue.dequeue();
await this._fn(value);
} catch (ex) {
this.emit('error', ex);
}
}
// emit flushing has completed
this.emit('flushed');
// clear flush handle so that next enqueue will trigger a flush
this._flushHandle = undefined;
}
}
|