UNPKG

10.8 kBJavaScriptView Raw
1var assert = require('assert');
2var helper = require('./lib/helper');
3var Queue = require('../lib/queue');
4
5describe('Basic Queue', function() {
6 afterEach(helper.destroyQueues);
7
8 it('should succeed', function (done) {
9 var q = new Queue(function (n, cb) {
10 cb(null, n+1)
11 }, { autoResume: true })
12 q.on('task_finish', function (taskId, r) {
13 assert.equal(r, 2);
14 done();
15 })
16 q.push(1, function (err, r) {
17 assert.equal(r, 2);
18 })
19 this.q = q;
20 });
21
22 it('should fail task if failTaskOnProcessException is true', function (done) {
23 var q = new Queue(function (n, cb) {
24 throw new Error("failed");
25 }, { autoResume: true })
26 q.on('task_failed', function (taskId, msg) {
27 assert.equal(msg, "failed");
28 done();
29 })
30 q.push(1)
31 this.q = q;
32 });
33
34 it('should emit an error if failTaskOnProcessException is false', function (done) {
35 var q = new Queue(function (n, cb) {
36 throw new Error("failed");
37 }, { failTaskOnProcessException: false, autoResume: true })
38 q.on('error', function () {
39 done();
40 })
41 q.push(1)
42 this.q = q;
43 });
44
45 it('should fail', function (done) {
46 var q = new Queue(function (n, cb) {
47 cb('nope')
48 }, { autoResume: true })
49 q.on('task_failed', function (taskId, msg) {
50 assert.equal(msg, 'nope');
51 done();
52 })
53 q.push(1, function (err, r) {
54 assert.equal(err, 'nope');
55 })
56 this.q = q;
57 });
58
59 it('should run fifo', function (done) {
60 var finished = 0;
61 var queued = 0;
62 var q = new Queue(function (num, cb) { cb() })
63 q.on('task_finish', function () {
64 if (finished >= 3) {
65 done();
66 }
67 })
68 q.on('task_queued', function () {
69 queued++;
70 if (queued >= 3) {
71 q.resume();
72 }
73 })
74 q.pause();
75 q.push(1, function (err, r) {
76 assert.equal(finished, 0);
77 finished++;
78 })
79 q.push(2, function (err, r) {
80 assert.equal(finished, 1);
81 finished++;
82 })
83 q.push(3, function (err, r) {
84 assert.equal(finished, 2);
85 finished++;
86 })
87 this.q = q;
88 })
89
90 it('should prioritize', function (done) {
91 var q = new Queue(function (num, cb) { cb() }, {
92 priority: function (n, cb) {
93 if (n === 2) return cb(null, 10);
94 if (n === 1) return cb(null, 5);
95 return cb(null, 1);
96 }
97 })
98 q.pause();
99 var finished = 0;
100 var queued = 0;
101 q.on('task_queued', function () {
102 queued++;
103 if (queued === 3) {
104 q.resume();
105 }
106 })
107 q.push(3, function (err, r) {
108 assert.equal(finished, 2);
109 finished++;
110 });
111 q.push(2, function (err, r) {
112 assert.equal(finished, 0);
113 finished++;
114 });
115 q.push(1, function (err, r) {
116 assert.equal(finished, 1);
117 finished++;
118 done()
119 });
120 this.q = q;
121 })
122
123 it('should run filo', function (done) {
124 var finished = 0;
125 var queued = 0;
126 var q = new Queue(function (num, cb) {
127 cb();
128 }, { filo: true })
129 q.on('task_finish', function () {
130 if (finished >= 3) {
131 done();
132 }
133 })
134 q.on('task_queued', function () {
135 queued++;
136 if (queued >= 3) {
137 q.resume();
138 }
139 })
140 q.pause();
141 q.push(1, function (err, r) {
142 assert.equal(finished, 2);
143 finished++;
144 })
145 q.push(2, function (err, r) {
146 assert.equal(finished, 1);
147 finished++;
148 })
149 q.push(3, function (err, r) {
150 assert.equal(finished, 0);
151 finished++;
152 })
153 this.q = q;
154 })
155
156 it('should filter before process', function (done) {
157 var q = new Queue(function (n, cb) { cb(null, n) }, {
158 filter: function (n, cb) {
159 cb(null, n === 2 ? false : n);
160 }
161 })
162 q.push(2, function (err, r) {
163 assert.equal(err, 'input_rejected');
164 })
165 q.push(3, function (err, r) {
166 assert.equal(r, 3);
167 done();
168 })
169 this.q = q;
170 })
171
172 it('should batch delay', function (done) {
173 var batches = 0;
174 var q = new Queue(function (batch, cb) {
175 batches++;
176 if (batches === 1) {
177 assert.equal(batch.length, 2);
178 return cb();
179 }
180 if (batches === 2) {
181 assert.equal(batch.length, 1);
182 cb();
183 return done();
184 }
185 }, { batchSize: 2, batchDelay: 5, failTaskOnProcessException: false });
186 q.push(1);
187 q.push(2);
188 q.push(3);
189 this.q = q;
190 })
191
192 it('should batch 2', function (done) {
193 var finished = 0;
194 var q = new Queue(function (batch, cb) {
195 finished++;
196 assert.equal(batch.length, 1);
197 if (finished >= 2) {
198 done();
199 }
200 cb();
201 }, { batchSize: 2, batchDelay: 1, autoResume: true });
202 q.push(1)
203 .on('queued', function () {
204 setTimeout(function () {
205 q.push(2);
206 }, 2)
207 })
208 this.q = q;
209 })
210
211 it('should drain and empty', function (done) {
212 var emptied = false;
213 var q = new Queue(function (n, cb) { cb() })
214 q.on('empty', function () {
215 emptied = true;
216 }, { autoResume: true })
217 q.on('drain', function () {
218 assert.ok(emptied);
219 done();
220 });
221 var queued = 0;
222 q.on('task_queued', function () {
223 queued++;
224 if (queued >= 3) {
225 q.resume();
226 }
227 })
228 q.pause();
229 q.push(1)
230 q.push(2)
231 q.push(3)
232 this.q = q;
233 })
234
235 it('should drain only once the task is complete', function (done) {
236 var finished_task = false;
237 var q = new Queue(function (n, cb) {
238 finished_task = true;
239 cb();
240 }, { concurrent: 2 });
241 q.on('drain', function () {
242 assert.ok(finished_task);
243 done();
244 });
245 q.push(1);
246 this.q = q;
247 });
248
249 it('should queue 50 things', function (done) {
250 var q = new Queue(function (n, cb) {
251 cb(null, n+1);
252 })
253 var finished = 0;
254 for (var i = 0; i < 50; i++) {
255 (function (n) {
256 q.push(n, function (err, r) {
257 assert.equal(r, n+1);
258 finished++;
259 if (finished === 50) {
260 done();
261 }
262 })
263 })(i)
264 }
265 this.q = q;
266 });
267
268 it('should concurrently handle tasks', function (done) {
269 var concurrent = 0;
270 var ok = false;
271 var q = new Queue(function (n, cb) {
272 var wait = function () {
273 if (concurrent === 3) {
274 ok = true;
275 }
276 if (ok) return cb();
277 setImmediate(function () {
278 wait();
279 })
280 }
281 concurrent++;
282 wait();
283 }, { concurrent: 3 })
284 var finished = 0;
285 var finish = function () {
286 finished++;
287 if (finished >= 4) {
288 done();
289 }
290 }
291 q.push(0, finish);
292 q.push(1, finish);
293 q.push(2, finish);
294 q.push(3, finish);
295 this.q = q;
296 })
297
298 it('should pause and resume', function (done) {
299 var running = false;
300 var q = new Queue(function (n, cb) {
301 running = true;
302 return {
303 pause: function () {
304 running = false;
305 },
306 resume: function () {
307 running = true;
308 cb();
309 done();
310 }
311 }
312 })
313 q.pause();
314 q.push(1)
315 .on('started', function () {
316 setTimeout(function () {
317 assert.ok(running);
318 q.pause();
319 assert.ok(!running);
320 q.resume();
321 }, 1)
322 })
323 assert.ok(!running);
324 q.resume();
325 this.q = q;
326 })
327
328 it('should timeout and fail', function (done) {
329 var tries = 0;
330 var q = new Queue(function (n, cb) {
331 tries++;
332 setTimeout(function () {
333 cb(null, 'done!')
334 }, 3)
335 }, { maxTimeout: 1, maxRetries: 2 })
336 q.push(1)
337 .on('finish', function (result) {
338 assert.ok(false)
339 })
340 .on('failed', function (err) {
341 assert.equal(tries, 2);
342 setTimeout(function () {
343 done();
344 }, 5)
345 })
346 this.q = q;
347 })
348
349 it('should cancel while running and in queue', function (done) {
350 var q = new Queue(function (task, cb) {
351 assert.ok(task.n, 2)
352 setTimeout(function () {
353 q.cancel(1);
354 }, 1)
355 return {
356 cancel: function () {
357 done();
358 }
359 }
360 }, {
361 id: 'id',
362 merge: function (a,b) {
363 assert.ok(false);
364 }
365 })
366 q.push({ id: 1, n: 1 })
367 .on('queued', function () {
368 q.cancel(1, function () {
369 q.push({ id: 1, n: 2 });
370 })
371 });
372 this.q = q;
373 })
374
375 it('should stop if precondition fails', function (done) {
376 var retries = 0;
377 var q = new Queue(function (n) {
378 assert.equal(retries, 2);
379 done();
380 }, {
381 precondition: function (cb) {
382 retries++;
383 cb(null, retries === 2)
384 },
385 preconditionRetryTimeout: 1
386 })
387 q.push(1);
388 this.q = q;
389 })
390
391 it('should call cb on throw', function (done) {
392 var called = false;
393 var q = new Queue(function (task, cb) {
394 throw new Error('fail');
395 });
396 q.push(1, function (err) {
397 called = true;
398 assert.ok(err);
399 });
400 q.on('drain', function () {
401 assert.ok(called);
402 done();
403 });
404 this.q = q;
405 })
406
407 it('should respect batchDelayTimeout', function (done) {
408 var q = new Queue(function (arr) {
409 assert.equal(arr.length, 2);
410 done();
411 }, {
412 batchSize: 3,
413 batchDelay: Infinity,
414 batchDelayTimeout: 5
415 })
416 q.push(1);
417 setTimeout(function () {
418 q.push(2);
419 }, 1)
420 this.q = q;
421 })
422
423 it('should merge but not batch until the delay has happened', function (done) {
424 var running = false;
425 var q = new Queue(function (arr) {
426 running = true;
427 }, {
428 autoResume: true,
429 batchSize: 2,
430 batchDelay: Infinity,
431 id: 'id'
432 })
433 setTimeout(function () {
434 q.push({ id: 'a', x: 1 });
435 q.push({ id: 'a', x: 2 });
436 }, 1)
437 setTimeout(function () {
438 assert.ok(!running);
439 done();
440 }, 10)
441 this.q = q;
442 })
443
444 it('merge batches should call all push callbacks', function (done) {
445 var count = 0
446 function finish() {
447 count++
448 if (count === 2) done()
449 }
450 var q = new Queue(function (arr, cb) {
451 cb()
452 }, {
453 autoResume: true,
454 batchSize: 2,
455 id: 'id'
456 })
457 q.push({ id: 'a', x: 1 }, finish)
458 q.push({ id: 'a', x: 2 }, finish)
459 this.q = q;
460 })
461
462 it('cancel should not retry', function (done) {
463 var count = 0;
464 var q = new Queue(function (n, cb) {
465 count++;
466 if (count === 2) {
467 q.cancel('a', function () {
468 cb('failed again');
469 setTimeout(function () {
470 if (count === 2) {
471 done();
472 }
473 }, 100)
474 })
475 } else {
476 cb('failed');
477 }
478 }, {
479 autoResume: true,
480 failTaskOnProcessException: true,
481 maxRetries: Infinity,
482 id: 'id'
483 })
484 q.push({ id: 'a', x: 1 });
485 this.q = q;
486 })
487
488})