1 | var assert = require('assert');
|
2 | var helper = require('./lib/helper');
|
3 | var Queue = require('../lib/queue');
|
4 |
|
5 | describe('Basic Queue', function() {
|
6 | afterEach(helper.destroyQueues);
|
7 |
|
8 | it('should succeed', function (done) {
|
9 | var q = new Queue(function (n, cb) {
|
10 | cb(null, n+1)
|
11 | }, { autoResume: true })
|
12 | q.on('task_finish', function (taskId, r) {
|
13 | assert.equal(r, 2);
|
14 | done();
|
15 | })
|
16 | q.push(1, function (err, r) {
|
17 | assert.equal(r, 2);
|
18 | })
|
19 | this.q = q;
|
20 | });
|
21 |
|
22 | it('should fail task if failTaskOnProcessException is true', function (done) {
|
23 | var q = new Queue(function (n, cb) {
|
24 | throw new Error("failed");
|
25 | }, { autoResume: true })
|
26 | q.on('task_failed', function (taskId, msg) {
|
27 | assert.equal(msg, "failed");
|
28 | done();
|
29 | })
|
30 | q.push(1)
|
31 | this.q = q;
|
32 | });
|
33 |
|
34 | it('should emit an error if failTaskOnProcessException is false', function (done) {
|
35 | var q = new Queue(function (n, cb) {
|
36 | throw new Error("failed");
|
37 | }, { failTaskOnProcessException: false, autoResume: true })
|
38 | q.on('error', function () {
|
39 | done();
|
40 | })
|
41 | q.push(1)
|
42 | this.q = q;
|
43 | });
|
44 |
|
45 | it('should fail', function (done) {
|
46 | var q = new Queue(function (n, cb) {
|
47 | cb('nope')
|
48 | }, { autoResume: true })
|
49 | q.on('task_failed', function (taskId, msg) {
|
50 | assert.equal(msg, 'nope');
|
51 | done();
|
52 | })
|
53 | q.push(1, function (err, r) {
|
54 | assert.equal(err, 'nope');
|
55 | })
|
56 | this.q = q;
|
57 | });
|
58 |
|
59 | it('should run fifo', function (done) {
|
60 | var finished = 0;
|
61 | var queued = 0;
|
62 | var q = new Queue(function (num, cb) { cb() })
|
63 | q.on('task_finish', function () {
|
64 | if (finished >= 3) {
|
65 | done();
|
66 | }
|
67 | })
|
68 | q.on('task_queued', function () {
|
69 | queued++;
|
70 | if (queued >= 3) {
|
71 | q.resume();
|
72 | }
|
73 | })
|
74 | q.pause();
|
75 | q.push(1, function (err, r) {
|
76 | assert.equal(finished, 0);
|
77 | finished++;
|
78 | })
|
79 | q.push(2, function (err, r) {
|
80 | assert.equal(finished, 1);
|
81 | finished++;
|
82 | })
|
83 | q.push(3, function (err, r) {
|
84 | assert.equal(finished, 2);
|
85 | finished++;
|
86 | })
|
87 | this.q = q;
|
88 | })
|
89 |
|
90 | it('should prioritize', function (done) {
|
91 | var q = new Queue(function (num, cb) { cb() }, {
|
92 | priority: function (n, cb) {
|
93 | if (n === 2) return cb(null, 10);
|
94 | if (n === 1) return cb(null, 5);
|
95 | return cb(null, 1);
|
96 | }
|
97 | })
|
98 | q.pause();
|
99 | var finished = 0;
|
100 | var queued = 0;
|
101 | q.on('task_queued', function () {
|
102 | queued++;
|
103 | if (queued === 3) {
|
104 | q.resume();
|
105 | }
|
106 | })
|
107 | q.push(3, function (err, r) {
|
108 | assert.equal(finished, 2);
|
109 | finished++;
|
110 | });
|
111 | q.push(2, function (err, r) {
|
112 | assert.equal(finished, 0);
|
113 | finished++;
|
114 | });
|
115 | q.push(1, function (err, r) {
|
116 | assert.equal(finished, 1);
|
117 | finished++;
|
118 | done()
|
119 | });
|
120 | this.q = q;
|
121 | })
|
122 |
|
123 | it('should run filo', function (done) {
|
124 | var finished = 0;
|
125 | var queued = 0;
|
126 | var q = new Queue(function (num, cb) {
|
127 | cb();
|
128 | }, { filo: true })
|
129 | q.on('task_finish', function () {
|
130 | if (finished >= 3) {
|
131 | done();
|
132 | }
|
133 | })
|
134 | q.on('task_queued', function () {
|
135 | queued++;
|
136 | if (queued >= 3) {
|
137 | q.resume();
|
138 | }
|
139 | })
|
140 | q.pause();
|
141 | q.push(1, function (err, r) {
|
142 | assert.equal(finished, 2);
|
143 | finished++;
|
144 | })
|
145 | q.push(2, function (err, r) {
|
146 | assert.equal(finished, 1);
|
147 | finished++;
|
148 | })
|
149 | q.push(3, function (err, r) {
|
150 | assert.equal(finished, 0);
|
151 | finished++;
|
152 | })
|
153 | this.q = q;
|
154 | })
|
155 |
|
156 | it('should filter before process', function (done) {
|
157 | var q = new Queue(function (n, cb) { cb(null, n) }, {
|
158 | filter: function (n, cb) {
|
159 | cb(null, n === 2 ? false : n);
|
160 | }
|
161 | })
|
162 | q.push(2, function (err, r) {
|
163 | assert.equal(err, 'input_rejected');
|
164 | })
|
165 | q.push(3, function (err, r) {
|
166 | assert.equal(r, 3);
|
167 | done();
|
168 | })
|
169 | this.q = q;
|
170 | })
|
171 |
|
172 | it('should batch delay', function (done) {
|
173 | var batches = 0;
|
174 | var q = new Queue(function (batch, cb) {
|
175 | batches++;
|
176 | if (batches === 1) {
|
177 | assert.equal(batch.length, 2);
|
178 | return cb();
|
179 | }
|
180 | if (batches === 2) {
|
181 | assert.equal(batch.length, 1);
|
182 | cb();
|
183 | return done();
|
184 | }
|
185 | }, { batchSize: 2, batchDelay: 5, failTaskOnProcessException: false });
|
186 | q.push(1);
|
187 | q.push(2);
|
188 | q.push(3);
|
189 | this.q = q;
|
190 | })
|
191 |
|
192 | it('should batch 2', function (done) {
|
193 | var finished = 0;
|
194 | var q = new Queue(function (batch, cb) {
|
195 | finished++;
|
196 | assert.equal(batch.length, 1);
|
197 | if (finished >= 2) {
|
198 | done();
|
199 | }
|
200 | cb();
|
201 | }, { batchSize: 2, batchDelay: 1, autoResume: true });
|
202 | q.push(1)
|
203 | .on('queued', function () {
|
204 | setTimeout(function () {
|
205 | q.push(2);
|
206 | }, 2)
|
207 | })
|
208 | this.q = q;
|
209 | })
|
210 |
|
211 | it('should drain and empty', function (done) {
|
212 | var emptied = false;
|
213 | var q = new Queue(function (n, cb) { cb() })
|
214 | q.on('empty', function () {
|
215 | emptied = true;
|
216 | }, { autoResume: true })
|
217 | q.on('drain', function () {
|
218 | assert.ok(emptied);
|
219 | done();
|
220 | });
|
221 | var queued = 0;
|
222 | q.on('task_queued', function () {
|
223 | queued++;
|
224 | if (queued >= 3) {
|
225 | q.resume();
|
226 | }
|
227 | })
|
228 | q.pause();
|
229 | q.push(1)
|
230 | q.push(2)
|
231 | q.push(3)
|
232 | this.q = q;
|
233 | })
|
234 |
|
235 | it('should drain only once the task is complete', function (done) {
|
236 | var finished_task = false;
|
237 | var q = new Queue(function (n, cb) {
|
238 | finished_task = true;
|
239 | cb();
|
240 | }, { concurrent: 2 });
|
241 | q.on('drain', function () {
|
242 | assert.ok(finished_task);
|
243 | done();
|
244 | });
|
245 | q.push(1);
|
246 | this.q = q;
|
247 | });
|
248 |
|
249 | it('should queue 50 things', function (done) {
|
250 | var q = new Queue(function (n, cb) {
|
251 | cb(null, n+1);
|
252 | })
|
253 | var finished = 0;
|
254 | for (var i = 0; i < 50; i++) {
|
255 | (function (n) {
|
256 | q.push(n, function (err, r) {
|
257 | assert.equal(r, n+1);
|
258 | finished++;
|
259 | if (finished === 50) {
|
260 | done();
|
261 | }
|
262 | })
|
263 | })(i)
|
264 | }
|
265 | this.q = q;
|
266 | });
|
267 |
|
268 | it('should concurrently handle tasks', function (done) {
|
269 | var concurrent = 0;
|
270 | var ok = false;
|
271 | var q = new Queue(function (n, cb) {
|
272 | var wait = function () {
|
273 | if (concurrent === 3) {
|
274 | ok = true;
|
275 | }
|
276 | if (ok) return cb();
|
277 | setImmediate(function () {
|
278 | wait();
|
279 | })
|
280 | }
|
281 | concurrent++;
|
282 | wait();
|
283 | }, { concurrent: 3 })
|
284 | var finished = 0;
|
285 | var finish = function () {
|
286 | finished++;
|
287 | if (finished >= 4) {
|
288 | done();
|
289 | }
|
290 | }
|
291 | q.push(0, finish);
|
292 | q.push(1, finish);
|
293 | q.push(2, finish);
|
294 | q.push(3, finish);
|
295 | this.q = q;
|
296 | })
|
297 |
|
298 | it('should pause and resume', function (done) {
|
299 | var running = false;
|
300 | var q = new Queue(function (n, cb) {
|
301 | running = true;
|
302 | return {
|
303 | pause: function () {
|
304 | running = false;
|
305 | },
|
306 | resume: function () {
|
307 | running = true;
|
308 | cb();
|
309 | done();
|
310 | }
|
311 | }
|
312 | })
|
313 | q.pause();
|
314 | q.push(1)
|
315 | .on('started', function () {
|
316 | setTimeout(function () {
|
317 | assert.ok(running);
|
318 | q.pause();
|
319 | assert.ok(!running);
|
320 | q.resume();
|
321 | }, 1)
|
322 | })
|
323 | assert.ok(!running);
|
324 | q.resume();
|
325 | this.q = q;
|
326 | })
|
327 |
|
328 | it('should timeout and fail', function (done) {
|
329 | var tries = 0;
|
330 | var q = new Queue(function (n, cb) {
|
331 | tries++;
|
332 | setTimeout(function () {
|
333 | cb(null, 'done!')
|
334 | }, 3)
|
335 | }, { maxTimeout: 1, maxRetries: 2 })
|
336 | q.push(1)
|
337 | .on('finish', function (result) {
|
338 | assert.ok(false)
|
339 | })
|
340 | .on('failed', function (err) {
|
341 | assert.equal(tries, 2);
|
342 | setTimeout(function () {
|
343 | done();
|
344 | }, 5)
|
345 | })
|
346 | this.q = q;
|
347 | })
|
348 |
|
349 | it('should cancel while running and in queue', function (done) {
|
350 | var q = new Queue(function (task, cb) {
|
351 | assert.ok(task.n, 2)
|
352 | setTimeout(function () {
|
353 | q.cancel(1);
|
354 | }, 1)
|
355 | return {
|
356 | cancel: function () {
|
357 | done();
|
358 | }
|
359 | }
|
360 | }, {
|
361 | id: 'id',
|
362 | merge: function (a,b) {
|
363 | assert.ok(false);
|
364 | }
|
365 | })
|
366 | q.push({ id: 1, n: 1 })
|
367 | .on('queued', function () {
|
368 | q.cancel(1, function () {
|
369 | q.push({ id: 1, n: 2 });
|
370 | })
|
371 | });
|
372 | this.q = q;
|
373 | })
|
374 |
|
375 | it('should stop if precondition fails', function (done) {
|
376 | var retries = 0;
|
377 | var q = new Queue(function (n) {
|
378 | assert.equal(retries, 2);
|
379 | done();
|
380 | }, {
|
381 | precondition: function (cb) {
|
382 | retries++;
|
383 | cb(null, retries === 2)
|
384 | },
|
385 | preconditionRetryTimeout: 1
|
386 | })
|
387 | q.push(1);
|
388 | this.q = q;
|
389 | })
|
390 |
|
391 | it('should call cb on throw', function (done) {
|
392 | var called = false;
|
393 | var q = new Queue(function (task, cb) {
|
394 | throw new Error('fail');
|
395 | });
|
396 | q.push(1, function (err) {
|
397 | called = true;
|
398 | assert.ok(err);
|
399 | });
|
400 | q.on('drain', function () {
|
401 | assert.ok(called);
|
402 | done();
|
403 | });
|
404 | this.q = q;
|
405 | })
|
406 |
|
407 | it('should respect batchDelayTimeout', function (done) {
|
408 | var q = new Queue(function (arr) {
|
409 | assert.equal(arr.length, 2);
|
410 | done();
|
411 | }, {
|
412 | batchSize: 3,
|
413 | batchDelay: Infinity,
|
414 | batchDelayTimeout: 5
|
415 | })
|
416 | q.push(1);
|
417 | setTimeout(function () {
|
418 | q.push(2);
|
419 | }, 1)
|
420 | this.q = q;
|
421 | })
|
422 |
|
423 | it('should merge but not batch until the delay has happened', function (done) {
|
424 | var running = false;
|
425 | var q = new Queue(function (arr) {
|
426 | running = true;
|
427 | }, {
|
428 | autoResume: true,
|
429 | batchSize: 2,
|
430 | batchDelay: Infinity,
|
431 | id: 'id'
|
432 | })
|
433 | setTimeout(function () {
|
434 | q.push({ id: 'a', x: 1 });
|
435 | q.push({ id: 'a', x: 2 });
|
436 | }, 1)
|
437 | setTimeout(function () {
|
438 | assert.ok(!running);
|
439 | done();
|
440 | }, 10)
|
441 | this.q = q;
|
442 | })
|
443 |
|
444 | it('merge batches should call all push callbacks', function (done) {
|
445 | var count = 0
|
446 | function finish() {
|
447 | count++
|
448 | if (count === 2) done()
|
449 | }
|
450 | var q = new Queue(function (arr, cb) {
|
451 | cb()
|
452 | }, {
|
453 | autoResume: true,
|
454 | batchSize: 2,
|
455 | id: 'id'
|
456 | })
|
457 | q.push({ id: 'a', x: 1 }, finish)
|
458 | q.push({ id: 'a', x: 2 }, finish)
|
459 | this.q = q;
|
460 | })
|
461 |
|
462 | it('cancel should not retry', function (done) {
|
463 | var count = 0;
|
464 | var q = new Queue(function (n, cb) {
|
465 | count++;
|
466 | if (count === 2) {
|
467 | q.cancel('a', function () {
|
468 | cb('failed again');
|
469 | setTimeout(function () {
|
470 | if (count === 2) {
|
471 | done();
|
472 | }
|
473 | }, 100)
|
474 | })
|
475 | } else {
|
476 | cb('failed');
|
477 | }
|
478 | }, {
|
479 | autoResume: true,
|
480 | failTaskOnProcessException: true,
|
481 | maxRetries: Infinity,
|
482 | id: 'id'
|
483 | })
|
484 | q.push({ id: 'a', x: 1 });
|
485 | this.q = q;
|
486 | })
|
487 |
|
488 | })
|