All files / lib AsyncProcessingQueue.ts

15.79% Statements 3/19
0% Branches 0/2
0% Functions 0/4
16.67% Lines 3/18

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 641x   1x           1x                                                                                                              
import { EventEmitter } from 'events';
 
import { Queue } from './Queue';
 
/**
 * Serializes execution of events. This class is constructed with
 * a delegate function that is executed for each item that is enqueued.
 */
export class AsyncProcessingQueue<T> extends EventEmitter {
  private _fn: (...args: T[]) => Promise<void>;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private _queue: Queue<any>;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private _flushHandle: any;
 
  constructor(fn: (...args: T[]) => Promise<void>) {
    super();
    this._fn = fn;
    this._queue = new Queue();
    this._flush = this._flush.bind(this);
  }
 
  /**
   * Adds a new item to the processing queue
   */
  public enqueue(value: T): void {
    this._queue.enqueue(value);
 
    // Postpone flushing until end of event loop to allow multiple operations
    // to enqueue. This handle will be cleared once flushing has completed.
    if (!this._flushHandle) this._flushHandle = setImmediate(this._flush);
  }
 
  /**
   * Gets the number of pending items in the processor queue
   * @type {number}
   */
  get size(): number {
    return this._queue.length;
  }
 
  private async _flush() {
    // emit that flushing is starting
    this.emit('flushing');
 
    // process all items on the queue, even items that
    // are added to the queue while flushing is occuring
    while (this._queue.length > 0) {
      try {
        const value = this._queue.dequeue();
        await this._fn(value);
      } catch (ex) {
        this.emit('error', ex);
      }
    }
 
    // emit flushing has completed
    this.emit('flushed');
 
    // clear flush handle so that next enqueue will trigger a flush
    this._flushHandle = undefined;
  }
}