UNPKG

8.99 kBJavaScriptView Raw
1var assert = require('assert');
2var helper = require('./lib/helper');
3var fs = require('fs-extra');
4
5var Queue = require('../lib/queue');
6var MemoryStore = require('../lib/stores/memory');
7var SQLiteStore = require('../lib/stores/sqlite');
8
9describe('Complex Queue', function() {
10 afterEach(helper.destroyQueues);
11
12 it('should run in batch mode', function (done) {
13 var q = new Queue({
14 batchSize: 3,
15 process: function (batch, cb) {
16 assert.equal(batch.length, 3);
17 var total = 0;
18 batch.forEach(function (task) {
19 total += task;
20 })
21 cb(null, total);
22 },
23 })
24 var queued = 0;
25 q.on('task_queued', function () {
26 queued++;
27 if (queued >= 3) {
28 q.resume();
29 }
30 })
31 q.pause();
32 q.push(1, function (err, total) {
33 assert.equal(total, 6);
34 })
35 q.push(2, function (err, total) {
36 assert.equal(total, 6);
37 })
38 q.push(3, function (err, total) {
39 assert.equal(total, 6);
40 done();
41 })
42 this.q = q;
43 })
44
45 it('should store properly', function (done) {
46 var self = this;
47 var s = new MemoryStore();
48 var finished = 0;
49 var queued = 0;
50 var q1 = new Queue(function (n, cb) { throw new Error('failed') }, { store: s })
51 q1.on('task_queued', function () {
52 queued++;
53 if (queued >= 3) {
54 var q2 = new Queue(function (n, cb) {
55 finished++;
56 cb();
57 if (finished === 3) {
58 done();
59 }
60 }, { store: s });
61 self.q2 = q2;
62 }
63 })
64 q1.pause();
65 q1.push(1);
66 q1.push(2);
67 q1.push(3);
68 this.q1 = q1;
69 })
70
71 it('should retry', function (done) {
72 var tries = 0;
73 var q = new Queue(function (n, cb) {
74 tries++;
75 if (tries === 3) {
76 cb();
77 done();
78 } else {
79 cb('fail');
80 }
81 }, { maxRetries: 3 });
82 q.push(1);
83 this.q = q;
84 })
85
86 it('should fail retry', function (done) {
87 var tries = 0;
88 var q = new Queue(function (n, cb) {
89 tries++;
90 if (tries === 3) {
91 cb();
92 } else {
93 cb('fail');
94 }
95 }, { maxRetries: 2, autoResume: true })
96 q.on('task_failed', function () {
97 done();
98 });
99 q.push(1);
100 this.q = q;
101 })
102
103 it('should respect afterProcessDelay', function (done) {
104 var delay = 100;
105 var finished = 0;
106 var startTime;
107 var q = new Queue(function (task, cb) {
108 finished++;
109 cb();
110 if (finished === 1) {
111 startTime = +(new Date());
112 } else if (finished === 2) {
113 var endTime = +(new Date());
114 var elapsedTime = endTime - startTime;
115 assert(elapsedTime >= delay);
116 done();
117 }
118 }, { batchSize: 1, afterProcessDelay: delay });
119 var queued = 0;
120 q.on('task_queued', function () {
121 queued++;
122 if (queued >= 2) {
123 q.resume();
124 }
125 })
126 q.pause();
127 q.push(1);
128 q.push(2);
129 this.q = q;
130 })
131
132 it('should max timeout', function (done) {
133 var q = new Queue(function (tasks, cb) {}, { maxTimeout: 1 })
134 q.on('task_failed', function (taskId, msg) {
135 assert.equal(msg, 'task_timeout');
136 done();
137 });
138 q.push(1, function (err, r) {
139 assert.equal(err, 'task_timeout');
140 });
141 this.q = q;
142 })
143
144 it('should merge tasks', function (done) {
145 var q = new Queue(function (o, cb) {
146 if (o.id === 1) {
147 assert.equal(o.x, 3);
148 cb();
149 } else {
150 cb();
151 }
152 }, {
153 id: 'id',
154 merge: function (a, b, cb) {
155 a.x += b.x;
156 cb(null, a);
157 }
158 })
159 var queued = 0;
160 q.on('task_queued', function () {
161 queued++;
162 if (queued >= 2) {
163 q.resume();
164 }
165 })
166 q.on('task_finish', function (taskId, r) {
167 if (taskId === '1') {
168 done();
169 }
170 })
171 q.pause()
172 q.push({ id: '0', x: 4 });
173 q.push({ id: '1', x: 1 }, function (err, r) {
174 assert.ok(!err)
175 });
176 q.push({ id: '1', x: 2 }, function (err, r) {
177 assert.ok(!err);
178 });
179 this.q = q;
180 })
181
182 it('should respect id property (string)', function (done) {
183 var q = new Queue(function (o, cb) {
184 if (o.name === 'john') {
185 assert.equal(o.x, 4);
186 cb();
187 }
188 if (o.name === 'mary') {
189 assert.equal(o.x, 5);
190 cb();
191 }
192 if (o.name === 'jim') {
193 assert.equal(o.x, 2);
194 cb();
195 }
196 }, {
197 id: 'name',
198 merge: function (a, b, cb) {
199 a.x += b.x;
200 cb(null, a);
201 }
202 })
203 var finished = 0;
204 var queued = 0;
205 q.on('task_finish', function (taskId, r) {
206 finished++;
207 if (finished >= 3) done();
208 })
209 q.on('task_queued', function (taskId, r) {
210 queued++;
211 if (queued >= 3) {
212 q.resume();
213 }
214 })
215 q.pause();
216 q.push({ name: 'john', x: 4 });
217 q.push({ name: 'mary', x: 3 });
218 q.push({ name: 'jim', x: 1 });
219 q.push({ name: 'jim', x: 1 });
220 q.push({ name: 'mary', x: 2 });
221 this.q = q;
222 })
223
224 it('should respect id property (function)', function (done) {
225 var finished = 0;
226 var q = new Queue(function (n, cb) {
227 cb(null, n)
228 }, {
229 batchDelay: 3,
230 id: function (n, cb) {
231 cb(null, n % 2 === 0 ? 'even' : 'odd');
232 },
233 merge: function (a, b, cb) {
234 cb(null, a+b);
235 }
236 })
237 var finished = 0;
238 var queued = 0;
239 q.on('task_queued', function (taskId, r) {
240 })
241 q.on('task_finish', function (taskId, r) {
242 finished++;
243 if (taskId === 'odd') {
244 assert.equal(r, 9);
245 }
246 if (taskId === 'even') {
247 assert.equal(r, 6);
248 }
249 if (finished >= 2) {
250 done();
251 }
252 })
253 q.push(1);
254 q.push(2);
255 q.push(3);
256 q.push(4);
257 q.push(5);
258 this.q = q;
259 })
260
261 it('should cancel if running', function (done) {
262 var ran = 0;
263 var cancelled = false;
264 var q = new Queue(function (n, cb) {
265 ran++;
266 if (ran >= 2) {
267 cb();
268 }
269 if (ran === 3) {
270 assert.ok(cancelled);
271 done();
272 }
273 return {
274 cancel: function () {
275 cancelled = true;
276 }
277 }
278 }, { id: 'id', cancelIfRunning: true })
279 q.push({ id: 1 })
280 .on('started', function () {
281 q.push({ id: 2 });
282 setTimeout(function () {
283 q.push({ id: 1 });
284 }, 1)
285 });
286 this.q = q;
287 })
288
289 it('should release lock after running (sqlite)', function (done) {
290 var s = new SQLiteStore();
291 var q = new Queue(function (n, cb) {
292 cb();
293 setTimeout(function () {
294 s.getRunningTasks(function (err, tasks) {
295 assert.ok(!Object.keys(tasks).length)
296 done();
297 })
298 }, 1)
299 }, { store: s, autoResume: true })
300 q.push(1);
301 this.q = q;
302 })
303
304 it('should resume running task (sqlite)', function (done) {
305 var self = this;
306 var s = new SQLiteStore();
307 var q1 = new Queue(function () {
308 var q2 = new Queue(function () {
309 done();
310 }, { store: s })
311 self.q2 = q2;
312 }, { store: s })
313 q1.push(1);
314 this.q1 = q1;
315 })
316
317 it('failed task should not stack overflow', function (done) {
318 var count = 0;
319 var q = new Queue(function (n, cb) {
320 count++
321 if (count > 100) {
322 cb();
323 done();
324 } else {
325 cb('fail');
326 }
327 }, {
328 maxRetries: Infinity
329 })
330 q.push(1);
331 this.q = q;
332 })
333
334 it('drain should still work with persistent queues', function (done) {
335 var initialQueue = new Queue(function (n, cb) {
336 setTimeout(cb, 1);
337 }, {
338 store: {
339 type: 'sql',
340 dialect: 'sqlite',
341 path: 'testqueue.sql'
342 }
343 })
344 var drained = false;
345 initialQueue.on('drain', function () {
346 drained = true;
347 });
348 initialQueue.push(1);
349
350 setTimeout(function () {
351 initialQueue.destroy();
352
353 assert.ok(drained);
354 done();
355 }, 20)
356 })
357
358 it('drain should still work when there are persisted items at load time', function (done) {
359 var initialQueue = new Queue(function (n, cb) {
360 setTimeout(cb, 100);
361 }, {
362 store: {
363 type: 'sql',
364 dialect: 'sqlite',
365 path: 'testqueue.sql'
366 }
367 });
368 initialQueue.push('' + 1);
369 initialQueue.push('' + 2);
370 setTimeout(function () {
371 // This effectively captures the queue in a state where there were unprocessed items
372 fs.copySync('testqueue.sql', 'testqueue2.sql');
373 initialQueue.destroy();
374 var persistedQueue = new Queue(function (n, cb) {
375 setTimeout(cb, 1);
376 }, {
377 store: {
378 type: 'sql',
379 dialect: 'sqlite',
380 path: 'testqueue2.sql'
381 }
382 })
383 var drained = false;
384 persistedQueue.on('drain', function () {
385 drained = true;
386 });
387 persistedQueue.push(2);
388 setTimeout(function () {
389 persistedQueue.destroy();
390
391 assert.ok(drained);
392 done();
393 }, 140)
394 }, 40)
395 })
396})