UNPKG

6.56 kBJavaScriptView Raw
1'use strict';
2
3const EventEmitter = require('events');
4
5// Port of lower_bound from http://en.cppreference.com/w/cpp/algorithm/lower_bound
6// Used to compute insertion index to keep queue sorted after insertion
7function lowerBound(array, value, comp) {
8 let first = 0;
9 let count = array.length;
10
11 while (count > 0) {
12 const step = (count / 2) | 0;
13 let it = first + step;
14
15 if (comp(array[it], value) <= 0) {
16 first = ++it;
17 count -= step + 1;
18 } else {
19 count = step;
20 }
21 }
22
23 return first;
24}
25
26class PriorityQueue {
27 constructor() {
28 this._queue = [];
29 }
30
31 enqueue(run, options) {
32 options = Object.assign({
33 priority: 0
34 }, options);
35
36 const element = {priority: options.priority, run};
37
38 if (this.size && this._queue[this.size - 1].priority >= options.priority) {
39 this._queue.push(element);
40 return;
41 }
42
43 const index = lowerBound(this._queue, element, (a, b) => b.priority - a.priority);
44 this._queue.splice(index, 0, element);
45 }
46
47 dequeue() {
48 return this._queue.shift().run;
49 }
50
51 get size() {
52 return this._queue.length;
53 }
54}
55
56class PQueue extends EventEmitter {
57 constructor(options) {
58 super();
59
60 options = Object.assign({
61 carryoverConcurrencyCount: false,
62 intervalCap: Infinity,
63 interval: 0,
64 concurrency: Infinity,
65 autoStart: true,
66 queueClass: PriorityQueue
67 }, options);
68
69 if (!(typeof options.concurrency === 'number' && options.concurrency >= 1)) {
70 throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${options.concurrency}\` (${typeof options.concurrency})`);
71 }
72
73 if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
74 throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${options.intervalCap}\` (${typeof options.intervalCap})`);
75 }
76
77 if (!(Number.isFinite(options.interval) && options.interval >= 0)) {
78 throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${options.interval}\` (${typeof options.interval})`);
79 }
80
81 this._carryoverConcurrencyCount = options.carryoverConcurrencyCount;
82 this._isIntervalIgnored = options.intervalCap === Infinity || options.interval === 0;
83 this._intervalCount = 0;
84 this._intervalCap = options.intervalCap;
85 this._interval = options.interval;
86 this._intervalId = null;
87 this._intervalEnd = 0;
88 this._timeoutId = null;
89
90 this.queue = new options.queueClass(); // eslint-disable-line new-cap
91 this._queueClass = options.queueClass;
92 this._pendingCount = 0;
93 this._concurrency = options.concurrency;
94 this._isPaused = options.autoStart === false;
95 this._resolveEmpty = () => {};
96 this._resolveIdle = () => {};
97 }
98
99 get _doesIntervalAllowAnother() {
100 return this._isIntervalIgnored || this._intervalCount < this._intervalCap;
101 }
102
103 get _doesConcurrentAllowAnother() {
104 return this._pendingCount < this._concurrency;
105 }
106
107 _next() {
108 this._pendingCount--;
109 this._tryToStartAnother();
110 }
111
112 _resolvePromises() {
113 this._resolveEmpty();
114 this._resolveEmpty = () => {};
115
116 if (this._pendingCount === 0) {
117 this._resolveIdle();
118 this._resolveIdle = () => {};
119 }
120 }
121
122 _onResumeInterval() {
123 this._onInterval();
124 this._initializeIntervalIfNeeded();
125 this._timeoutId = null;
126 }
127
128 _intervalPaused() {
129 const now = Date.now();
130
131 if (this._intervalId === null) {
132 const delay = this._intervalEnd - now;
133 if (delay < 0) {
134 // Act as the interval was done
135 // We don't need to resume it here,
136 // because it'll be resumed on line 160
137 this._intervalCount = (this._carryoverConcurrencyCount) ? this._pendingCount : 0;
138 } else {
139 // Act as the interval is pending
140 if (this._timeoutId === null) {
141 this._timeoutId = setTimeout(() => this._onResumeInterval(), delay);
142 }
143
144 return true;
145 }
146 }
147
148 return false;
149 }
150
151 _tryToStartAnother() {
152 if (this.queue.size === 0) {
153 // We can clear the interval ("pause")
154 // because we can redo it later ("resume")
155 clearInterval(this._intervalId);
156 this._intervalId = null;
157
158 this._resolvePromises();
159
160 return false;
161 }
162
163 if (!this._isPaused) {
164 const canInitializeInterval = !this._intervalPaused();
165 if (this._doesIntervalAllowAnother && this._doesConcurrentAllowAnother) {
166 this.emit('active');
167 this.queue.dequeue()();
168 if (canInitializeInterval) {
169 this._initializeIntervalIfNeeded();
170 }
171
172 return true;
173 }
174 }
175
176 return false;
177 }
178
179 _initializeIntervalIfNeeded() {
180 if (this._isIntervalIgnored || this._intervalId !== null) {
181 return;
182 }
183
184 this._intervalId = setInterval(() => this._onInterval(), this._interval);
185 this._intervalEnd = Date.now() + this._interval;
186 }
187
188 _onInterval() {
189 if (this._intervalCount === 0 && this._pendingCount === 0) {
190 clearInterval(this._intervalId);
191 this._intervalId = null;
192 }
193
194 this._intervalCount = (this._carryoverConcurrencyCount) ? this._pendingCount : 0;
195 while (this._tryToStartAnother()) {} // eslint-disable-line no-empty
196 }
197
198 add(fn, options) {
199 return new Promise((resolve, reject) => {
200 const run = () => {
201 this._pendingCount++;
202 this._intervalCount++;
203
204 try {
205 Promise.resolve(fn()).then(
206 val => {
207 resolve(val);
208 this._next();
209 },
210 err => {
211 reject(err);
212 this._next();
213 }
214 );
215 } catch (error) {
216 reject(error);
217 this._next();
218 }
219 };
220
221 this.queue.enqueue(run, options);
222 this._tryToStartAnother();
223 });
224 }
225
226 addAll(fns, options) {
227 return Promise.all(fns.map(fn => this.add(fn, options)));
228 }
229
230 start() {
231 if (!this._isPaused) {
232 return;
233 }
234
235 this._isPaused = false;
236 while (this._tryToStartAnother()) {} // eslint-disable-line no-empty
237 }
238
239 pause() {
240 this._isPaused = true;
241 }
242
243 clear() {
244 this.queue = new this._queueClass();
245 }
246
247 onEmpty() {
248 // Instantly resolve if the queue is empty
249 if (this.queue.size === 0) {
250 return Promise.resolve();
251 }
252
253 return new Promise(resolve => {
254 const existingResolve = this._resolveEmpty;
255 this._resolveEmpty = () => {
256 existingResolve();
257 resolve();
258 };
259 });
260 }
261
262 onIdle() {
263 // Instantly resolve if none pending and if nothing else is queued
264 if (this._pendingCount === 0 && this.queue.size === 0) {
265 return Promise.resolve();
266 }
267
268 return new Promise(resolve => {
269 const existingResolve = this._resolveIdle;
270 this._resolveIdle = () => {
271 existingResolve();
272 resolve();
273 };
274 });
275 }
276
277 get size() {
278 return this.queue.size;
279 }
280
281 get pending() {
282 return this._pendingCount;
283 }
284
285 get isPaused() {
286 return this._isPaused;
287 }
288}
289
290module.exports = PQueue;
291module.exports.default = PQueue;