UNPKG

5.04 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4 return new (P || (P = Promise))(function (resolve, reject) {
5 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8 step((generator = generator.apply(thisArg, _arguments || [])).next());
9 });
10};
11var __await = (this && this.__await) || function (v) { return this instanceof __await ? (this.v = v, this) : new __await(v); }
12var __asyncGenerator = (this && this.__asyncGenerator) || function (thisArg, _arguments, generator) {
13 if (!Symbol.asyncIterator) throw new TypeError("Symbol.asyncIterator is not defined.");
14 var g = generator.apply(thisArg, _arguments || []), i, q = [];
15 return i = {}, verb("next"), verb("throw"), verb("return"), i[Symbol.asyncIterator] = function () { return this; }, i;
16 function verb(n) { if (g[n]) i[n] = function (v) { return new Promise(function (a, b) { q.push([n, v, a, b]) > 1 || resume(n, v); }); }; }
17 function resume(n, v) { try { step(g[n](v)); } catch (e) { settle(q[0][3], e); } }
18 function step(r) { r.value instanceof __await ? Promise.resolve(r.value.v).then(fulfill, reject) : settle(q[0][2], r); }
19 function fulfill(value) { resume("next", value); }
20 function reject(value) { resume("throw", value); }
21 function settle(f, v) { if (f(v), q.shift(), q.length) resume(q[0][0], q[0][1]); }
22};
23Object.defineProperty(exports, "__esModule", { value: true });
24require("./asyncIterable");
25const queue_1 = require("./queue");
26const utils_1 = require("./utils");
27const BOUNDARY = Symbol();
28/**
29 * An asynchronous queue with a bounded endpoint.
30 */
31class AsyncBoundedQueue {
32 /**
33 * Initializes a new instance of the AsyncProducerConsumerQueue class.
34 *
35 * @param iterable An optional iterable of values or promises.
36 */
37 constructor(iterable) {
38 this._queue = new queue_1.AsyncQueue();
39 this._state = "open";
40 if (!utils_1.isMissing(iterable) && !utils_1.isIterable(iterable))
41 throw new TypeError("Object not iterable: iterable.");
42 if (utils_1.isIterable(iterable)) {
43 for (const value of iterable) {
44 this.put(value);
45 }
46 }
47 }
48 /**
49 * Gets the number of entries in the queue.
50 * When positive, indicates the number of entries available to get.
51 * When negative, indicates the number of requests waiting to be fulfilled.
52 */
53 get size() {
54 return this._state === "closing" ? this._queue.size - 1 : this._queue.size;
55 }
56 /**
57 * Adds a value to the end of the queue. If the queue is empty but has a pending
58 * dequeue request, the value will be dequeued and the request fulfilled.
59 *
60 * @param value A value or promise to add to the queue.
61 */
62 put(value) {
63 if (this._state !== "open")
64 throw new Error("AsyncProducer is done producing values.");
65 this._queue.put(value);
66 }
67 /**
68 * Indicates the queue is done adding and that no more items will be added to the queue.
69 */
70 end() {
71 if (this._state !== "open")
72 return;
73 this._state = "closing";
74 this._queue.put(BOUNDARY);
75 }
76 /**
77 * Removes and returns a Promise for the first value in the queue. If the queue has
78 * ended, returns a Promise for `undefined`. If the queue is empty, returns a Promise
79 * for the next value to be added to the queue.
80 */
81 get() {
82 return __awaiter(this, void 0, void 0, function* () {
83 const result = yield this._dequeue();
84 return result === BOUNDARY ? undefined : result;
85 });
86 }
87 /**
88 * Consumes all items in the queue until the queue ends.
89 */
90 drain() {
91 return __asyncGenerator(this, arguments, function* drain_1() {
92 let value = yield __await(this._dequeue());
93 while (value !== BOUNDARY) {
94 yield yield __await(value);
95 value = yield __await(this._dequeue());
96 }
97 });
98 }
99 _dequeue() {
100 return __awaiter(this, void 0, void 0, function* () {
101 if (this._state === "closed")
102 return BOUNDARY;
103 let result = BOUNDARY;
104 try {
105 result = yield this._queue.get();
106 }
107 finally {
108 if (result === BOUNDARY) {
109 this._state = "closed";
110 }
111 }
112 return result;
113 });
114 }
115}
116exports.AsyncBoundedQueue = AsyncBoundedQueue;