UNPKG

17.3 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = exports.QueueClosedError = void 0;
7
8var _autoBind = _interopRequireDefault(require("auto-bind"));
9
10var _Module = _interopRequireDefault(require("./Module"));
11
12function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
13
14function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
15
16const {
17 MODULE_NAME,
18 log,
19 warn,
20 error,
21 noteGauge,
22 noteCount,
23 trackOp
24} = new _Module.default(__filename); // eslint-disable-line no-unused-vars
25
26const DEFAULT_MAX_QUEUE_SIZE = -1;
27
28class QueueClosedError extends Error {
29 constructor(queueName) {
30 super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed');
31 }
32
33}
34
35exports.QueueClosedError = QueueClosedError;
36
37class AsyncQueue {
38 constructor(config) {
39 _defineProperty(this, "name", void 0);
40
41 _defineProperty(this, "_items", void 0);
42
43 _defineProperty(this, "_closed", void 0);
44
45 _defineProperty(this, "_maxSize", void 0);
46
47 _defineProperty(this, "_enqueueWaitingList", void 0);
48
49 _defineProperty(this, "_dequeueWaitingList", void 0);
50
51 _defineProperty(this, "_closeWaitingList", void 0);
52
53 _defineProperty(this, "getLength", () => {
54 return this._items.length;
55 });
56
57 // config
58 config = config || {
59 name: undefined,
60 maxSize: undefined
61 };
62 this.name = config.name;
63 this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE; // state
64
65 this._items = [];
66 this._closed = false;
67 this._enqueueWaitingList = [];
68 this._dequeueWaitingList = [];
69 this._closeWaitingList = [];
70 (0, _autoBind.default)(this);
71 }
72
73 async enqueue(newItem) {
74 if (this._closed) {
75 throw new Error(`Queue is closed`);
76 } // wait for space to enqueue
77
78
79 if (this._maxSize > 0 && this._items.length >= this._maxSize) {
80 await new Promise((res, rej) => this._enqueueWaitingList.push({
81 res,
82 rej
83 }));
84 } //log(`Enqueued '${newItem}' to queue '${this.name}'`)
85 // release oldest waiting dequeue
86
87
88 if (this._dequeueWaitingList.length) {
89 this._dequeueWaitingList.shift().res(newItem);
90 } else {
91 this._items.push(newItem);
92 }
93
94 this.name && noteCount(`${this.name}.enqueue`, 1);
95 this.name && noteGauge(`${this.name}.items`, this._items.length);
96 }
97
98 async dequeue() {
99 let item;
100
101 if (this._items.length === 0) {
102 // if there are NO queued items
103 if (this._closed) {
104 throw new QueueClosedError(this.name);
105 }
106
107 item = await new Promise((res, rej) => this._dequeueWaitingList.push({
108 res,
109 rej
110 }));
111 } else {
112 // if there are queued items
113 item = this._items.shift(); // release oldest waiting enqueue
114
115 if (this._enqueueWaitingList.length) {
116 this._enqueueWaitingList.shift().res();
117 }
118 }
119
120 this.name && noteCount(`${this.name}.dequeue`, 1);
121 this.name && noteGauge(`${this.name}.items`, this._items.length); //log(`Dequeued '${item}' from queue '${this.name}'`)
122 // release all waiting closes
123
124 if (this._items.length === 0) {
125 this._closeWaitingList.forEach(cbs => cbs.res());
126 }
127
128 return item;
129 }
130
131 dequeueAllAvailable() {
132 const items = this._items.splice(0);
133
134 this.name && noteCount(`${this.name}.dequeue`, items.length);
135 this.name && noteGauge(`${this.name}.items`, this._items.length);
136
137 if (this._enqueueWaitingList.length > 0) {
138 const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize);
139
140 for (let index = 0; index < enqueuesToRelease; index++) {
141 this._enqueueWaitingList.shift().res();
142 }
143 } // release all waiting closes
144
145
146 this._closeWaitingList.forEach(cbs => cbs.res());
147
148 return items;
149 }
150
151 async close() {
152 this._closed = true; // fail all waiting enqueued
153
154 this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed'))); // wait for all existing items to be dequeued
155
156
157 if (this._items.length > 0) {
158 await new Promise((res, rej) => this._closeWaitingList.push({
159 res,
160 rej
161 }));
162 } // release all remaining waiting dequeues
163
164
165 this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)));
166 }
167
168 async enqueueRange(newItems) {
169 for (const newItem of newItems) {
170 await this.enqueue(newItem);
171 }
172 }
173
174 async dequeueRange(count) {
175 const results = [];
176
177 while (results.length < count) {
178 try {
179 const item = await this.dequeue();
180 results.push(item);
181 } catch (err) {
182 if (err instanceof QueueClosedError) break;
183 throw err;
184 }
185 }
186
187 return results;
188 }
189
190}
191
192exports.default = AsyncQueue;
193//# sourceMappingURL=data:application/json;charset=utf-8;base64,
\No newline at end of file