1 | var assert = require('assert');
|
2 | var helper = require('./lib/helper');
|
3 | var fs = require('fs-extra');
|
4 |
|
5 | var Queue = require('../lib/queue');
|
6 | var MemoryStore = require('../lib/stores/memory');
|
7 | var SQLiteStore = require('../lib/stores/sqlite');
|
8 |
|
9 | describe('Complex Queue', function() {
|
10 | afterEach(helper.destroyQueues);
|
11 |
|
12 | it('should run in batch mode', function (done) {
|
13 | var q = new Queue({
|
14 | batchSize: 3,
|
15 | process: function (batch, cb) {
|
16 | assert.equal(batch.length, 3);
|
17 | var total = 0;
|
18 | batch.forEach(function (task) {
|
19 | total += task;
|
20 | })
|
21 | cb(null, total);
|
22 | },
|
23 | })
|
24 | var queued = 0;
|
25 | q.on('task_queued', function () {
|
26 | queued++;
|
27 | if (queued >= 3) {
|
28 | q.resume();
|
29 | }
|
30 | })
|
31 | q.pause();
|
32 | q.push(1, function (err, total) {
|
33 | assert.equal(total, 6);
|
34 | })
|
35 | q.push(2, function (err, total) {
|
36 | assert.equal(total, 6);
|
37 | })
|
38 | q.push(3, function (err, total) {
|
39 | assert.equal(total, 6);
|
40 | done();
|
41 | })
|
42 | this.q = q;
|
43 | })
|
44 |
|
45 | it('should store properly', function (done) {
|
46 | var self = this;
|
47 | var s = new MemoryStore();
|
48 | var finished = 0;
|
49 | var queued = 0;
|
50 | var q1 = new Queue(function (n, cb) { throw new Error('failed') }, { store: s })
|
51 | q1.on('task_queued', function () {
|
52 | queued++;
|
53 | if (queued >= 3) {
|
54 | var q2 = new Queue(function (n, cb) {
|
55 | finished++;
|
56 | cb();
|
57 | if (finished === 3) {
|
58 | done();
|
59 | }
|
60 | }, { store: s });
|
61 | self.q2 = q2;
|
62 | }
|
63 | })
|
64 | q1.pause();
|
65 | q1.push(1);
|
66 | q1.push(2);
|
67 | q1.push(3);
|
68 | this.q1 = q1;
|
69 | })
|
70 |
|
71 | it('should retry', function (done) {
|
72 | var tries = 0;
|
73 | var q = new Queue(function (n, cb) {
|
74 | tries++;
|
75 | if (tries === 3) {
|
76 | cb();
|
77 | done();
|
78 | } else {
|
79 | cb('fail');
|
80 | }
|
81 | }, { maxRetries: 3 });
|
82 | q.push(1);
|
83 | this.q = q;
|
84 | })
|
85 |
|
86 | it('should fail retry', function (done) {
|
87 | var tries = 0;
|
88 | var q = new Queue(function (n, cb) {
|
89 | tries++;
|
90 | if (tries === 3) {
|
91 | cb();
|
92 | } else {
|
93 | cb('fail');
|
94 | }
|
95 | }, { maxRetries: 2, autoResume: true })
|
96 | q.on('task_failed', function () {
|
97 | done();
|
98 | });
|
99 | q.push(1);
|
100 | this.q = q;
|
101 | })
|
102 |
|
103 | it('should respect afterProcessDelay', function (done) {
|
104 | var delay = 100;
|
105 | var finished = 0;
|
106 | var startTime;
|
107 | var q = new Queue(function (task, cb) {
|
108 | finished++;
|
109 | cb();
|
110 | if (finished === 1) {
|
111 | startTime = +(new Date());
|
112 | } else if (finished === 2) {
|
113 | var endTime = +(new Date());
|
114 | var elapsedTime = endTime - startTime;
|
115 | assert(elapsedTime >= delay);
|
116 | done();
|
117 | }
|
118 | }, { batchSize: 1, afterProcessDelay: delay });
|
119 | var queued = 0;
|
120 | q.on('task_queued', function () {
|
121 | queued++;
|
122 | if (queued >= 2) {
|
123 | q.resume();
|
124 | }
|
125 | })
|
126 | q.pause();
|
127 | q.push(1);
|
128 | q.push(2);
|
129 | this.q = q;
|
130 | })
|
131 |
|
132 | it('should max timeout', function (done) {
|
133 | var q = new Queue(function (tasks, cb) {}, { maxTimeout: 1 })
|
134 | q.on('task_failed', function (taskId, msg) {
|
135 | assert.equal(msg, 'task_timeout');
|
136 | done();
|
137 | });
|
138 | q.push(1, function (err, r) {
|
139 | assert.equal(err, 'task_timeout');
|
140 | });
|
141 | this.q = q;
|
142 | })
|
143 |
|
144 | it('should merge tasks', function (done) {
|
145 | var q = new Queue(function (o, cb) {
|
146 | if (o.id === 1) {
|
147 | assert.equal(o.x, 3);
|
148 | cb();
|
149 | } else {
|
150 | cb();
|
151 | }
|
152 | }, {
|
153 | id: 'id',
|
154 | merge: function (a, b, cb) {
|
155 | a.x += b.x;
|
156 | cb(null, a);
|
157 | }
|
158 | })
|
159 | var queued = 0;
|
160 | q.on('task_queued', function () {
|
161 | queued++;
|
162 | if (queued >= 2) {
|
163 | q.resume();
|
164 | }
|
165 | })
|
166 | q.on('task_finish', function (taskId, r) {
|
167 | if (taskId === '1') {
|
168 | done();
|
169 | }
|
170 | })
|
171 | q.pause()
|
172 | q.push({ id: '0', x: 4 });
|
173 | q.push({ id: '1', x: 1 }, function (err, r) {
|
174 | assert.ok(!err)
|
175 | });
|
176 | q.push({ id: '1', x: 2 }, function (err, r) {
|
177 | assert.ok(!err);
|
178 | });
|
179 | this.q = q;
|
180 | })
|
181 |
|
182 | it('should respect id property (string)', function (done) {
|
183 | var q = new Queue(function (o, cb) {
|
184 | if (o.name === 'john') {
|
185 | assert.equal(o.x, 4);
|
186 | cb();
|
187 | }
|
188 | if (o.name === 'mary') {
|
189 | assert.equal(o.x, 5);
|
190 | cb();
|
191 | }
|
192 | if (o.name === 'jim') {
|
193 | assert.equal(o.x, 2);
|
194 | cb();
|
195 | }
|
196 | }, {
|
197 | id: 'name',
|
198 | merge: function (a, b, cb) {
|
199 | a.x += b.x;
|
200 | cb(null, a);
|
201 | }
|
202 | })
|
203 | var finished = 0;
|
204 | var queued = 0;
|
205 | q.on('task_finish', function (taskId, r) {
|
206 | finished++;
|
207 | if (finished >= 3) done();
|
208 | })
|
209 | q.on('task_queued', function (taskId, r) {
|
210 | queued++;
|
211 | if (queued >= 3) {
|
212 | q.resume();
|
213 | }
|
214 | })
|
215 | q.pause();
|
216 | q.push({ name: 'john', x: 4 });
|
217 | q.push({ name: 'mary', x: 3 });
|
218 | q.push({ name: 'jim', x: 1 });
|
219 | q.push({ name: 'jim', x: 1 });
|
220 | q.push({ name: 'mary', x: 2 });
|
221 | this.q = q;
|
222 | })
|
223 |
|
224 | it('should respect id property (function)', function (done) {
|
225 | var finished = 0;
|
226 | var q = new Queue(function (n, cb) {
|
227 | cb(null, n)
|
228 | }, {
|
229 | batchDelay: 3,
|
230 | id: function (n, cb) {
|
231 | cb(null, n % 2 === 0 ? 'even' : 'odd');
|
232 | },
|
233 | merge: function (a, b, cb) {
|
234 | cb(null, a+b);
|
235 | }
|
236 | })
|
237 | var finished = 0;
|
238 | var queued = 0;
|
239 | q.on('task_queued', function (taskId, r) {
|
240 | })
|
241 | q.on('task_finish', function (taskId, r) {
|
242 | finished++;
|
243 | if (taskId === 'odd') {
|
244 | assert.equal(r, 9);
|
245 | }
|
246 | if (taskId === 'even') {
|
247 | assert.equal(r, 6);
|
248 | }
|
249 | if (finished >= 2) {
|
250 | done();
|
251 | }
|
252 | })
|
253 | q.push(1);
|
254 | q.push(2);
|
255 | q.push(3);
|
256 | q.push(4);
|
257 | q.push(5);
|
258 | this.q = q;
|
259 | })
|
260 |
|
261 | it('should cancel if running', function (done) {
|
262 | var ran = 0;
|
263 | var cancelled = false;
|
264 | var q = new Queue(function (n, cb) {
|
265 | ran++;
|
266 | if (ran >= 2) {
|
267 | cb();
|
268 | }
|
269 | if (ran === 3) {
|
270 | assert.ok(cancelled);
|
271 | done();
|
272 | }
|
273 | return {
|
274 | cancel: function () {
|
275 | cancelled = true;
|
276 | }
|
277 | }
|
278 | }, { id: 'id', cancelIfRunning: true })
|
279 | q.push({ id: 1 })
|
280 | .on('started', function () {
|
281 | q.push({ id: 2 });
|
282 | setTimeout(function () {
|
283 | q.push({ id: 1 });
|
284 | }, 1)
|
285 | });
|
286 | this.q = q;
|
287 | })
|
288 |
|
289 | it('should release lock after running (sqlite)', function (done) {
|
290 | var s = new SQLiteStore();
|
291 | var q = new Queue(function (n, cb) {
|
292 | cb();
|
293 | setTimeout(function () {
|
294 | s.getRunningTasks(function (err, tasks) {
|
295 | assert.ok(!Object.keys(tasks).length)
|
296 | done();
|
297 | })
|
298 | }, 1)
|
299 | }, { store: s, autoResume: true })
|
300 | q.push(1);
|
301 | this.q = q;
|
302 | })
|
303 |
|
304 | it('should resume running task (sqlite)', function (done) {
|
305 | var self = this;
|
306 | var s = new SQLiteStore();
|
307 | var q1 = new Queue(function () {
|
308 | var q2 = new Queue(function () {
|
309 | done();
|
310 | }, { store: s })
|
311 | self.q2 = q2;
|
312 | }, { store: s })
|
313 | q1.push(1);
|
314 | this.q1 = q1;
|
315 | })
|
316 |
|
317 | it('failed task should not stack overflow', function (done) {
|
318 | var count = 0;
|
319 | var q = new Queue(function (n, cb) {
|
320 | count++
|
321 | if (count > 100) {
|
322 | cb();
|
323 | done();
|
324 | } else {
|
325 | cb('fail');
|
326 | }
|
327 | }, {
|
328 | maxRetries: Infinity
|
329 | })
|
330 | q.push(1);
|
331 | this.q = q;
|
332 | })
|
333 |
|
334 | it('drain should still work with persistent queues', function (done) {
|
335 | var initialQueue = new Queue(function (n, cb) {
|
336 | setTimeout(cb, 1);
|
337 | }, {
|
338 | store: {
|
339 | type: 'sql',
|
340 | dialect: 'sqlite',
|
341 | path: 'testqueue.sql'
|
342 | }
|
343 | })
|
344 | var drained = false;
|
345 | initialQueue.on('drain', function () {
|
346 | drained = true;
|
347 | });
|
348 | initialQueue.push(1);
|
349 |
|
350 | setTimeout(function () {
|
351 | initialQueue.destroy();
|
352 |
|
353 | assert.ok(drained);
|
354 | done();
|
355 | }, 20)
|
356 | })
|
357 |
|
358 | it('drain should still work when there are persisted items at load time', function (done) {
|
359 | var initialQueue = new Queue(function (n, cb) {
|
360 | setTimeout(cb, 100);
|
361 | }, {
|
362 | store: {
|
363 | type: 'sql',
|
364 | dialect: 'sqlite',
|
365 | path: 'testqueue.sql'
|
366 | }
|
367 | });
|
368 | initialQueue.push('' + 1);
|
369 | initialQueue.push('' + 2);
|
370 | setTimeout(function () {
|
371 |
|
372 | fs.copySync('testqueue.sql', 'testqueue2.sql');
|
373 | initialQueue.destroy();
|
374 | var persistedQueue = new Queue(function (n, cb) {
|
375 | setTimeout(cb, 1);
|
376 | }, {
|
377 | store: {
|
378 | type: 'sql',
|
379 | dialect: 'sqlite',
|
380 | path: 'testqueue2.sql'
|
381 | }
|
382 | })
|
383 | var drained = false;
|
384 | persistedQueue.on('drain', function () {
|
385 | drained = true;
|
386 | });
|
387 | persistedQueue.push(2);
|
388 | setTimeout(function () {
|
389 | persistedQueue.destroy();
|
390 |
|
391 | assert.ok(drained);
|
392 | done();
|
393 | }, 140)
|
394 | }, 40)
|
395 | })
|
396 | })
|