UNPKG

20.5 kBJavaScriptView Raw
1var uuid = require('node-uuid');
2var util = require('util');
3var EE = require('events').EventEmitter;
4var Ticket = require('./ticket');
5var Worker = require('./worker');
6var Tickets = require('./tickets');
7
8var stores = {
9 memory: './stores/memory',
10 sqlite: './stores/sqlite',
11 sql: './stores/SqlStore',
12}
13
14function Queue(process, opts) {
15 var self = this;
16 opts = opts || {};
17 if (typeof process === 'object') {
18 opts = process || {};
19 }
20 if (typeof process === 'function') {
21 opts.process = process;
22 }
23 if (!opts.process) {
24 throw new Error("Queue has no process function.");
25 }
26
27 opts = opts || {};
28
29 self.process = opts.process || function (task, cb) { cb(null, {}) };
30 self.filter = opts.filter || function (input, cb) { cb(null, input) };
31 self.merge = opts.merge || function (oldTask, newTask, cb) { cb(null, newTask) };
32 self.precondition = opts.precondition || function (cb) { cb(null, true) };
33 self.id = opts.id || false;
34 self.priority = opts.priority || null;
35
36 self.cancelIfRunning = (opts.cancelIfRunning === undefined ? false : !!opts.cancelIfRunning);
37 self.autoResume = (opts.autoResume === undefined ? true : !!opts.autoResume);
38 self.failTaskOnProcessException = (opts.failTaskOnProcessException === undefined ? true : !!opts.failTaskOnProcessException);
39 self.filo = opts.filo || false;
40 self.batchSize = opts.batchSize || 1;
41 self.batchDelay = opts.batchDelay || 0;
42 self.batchDelayTimeout = opts.batchDelayTimeout || Infinity;
43 self.afterProcessDelay = opts.afterProcessDelay || 0;
44 self.concurrent = opts.concurrent || 1;
45 self.maxTimeout = opts.maxTimeout || Infinity;
46 self.maxRetries = opts.maxRetries || 0;
47 self.retryDelay = opts.retryDelay || 0;
48 self.storeMaxRetries = opts.storeMaxRetries || Infinity;
49 self.storeRetryTimeout = opts.storeRetryTimeout || 1000;
50 self.preconditionRetryTimeout = opts.preconditionRetryTimeout || 1000;
51
52 // Statuses
53 self._queuedPeak = 0;
54 self._queuedTime = {};
55 self._processedTotalElapsed = 0;
56 self._processedAverage = 0;
57 self._processedTotal = 0;
58 self._failedTotal = 0;
59 self.length = 0;
60 self._stopped = false;
61 self._saturated = false;
62
63 self._preconditionRetryTimeoutId = null;
64 self._batchTimeoutId = null;
65 self._batchDelayTimeoutId = null;
66 self._connected = false;
67 self._storeRetries = 0;
68
69 // Locks
70 self._hasMore = false;
71 self._isWriting = false;
72 self._writeQueue = [];
73 self._writing = {};
74 self._tasksWaitingForConnect = [];
75
76 self._calledDrain = true;
77 self._calledEmpty = true;
78 self._fetching = 0;
79 self._running = 0; // Active running tasks
80 self._retries = {}; // Map of taskId => retries
81 self._workers = {}; // Map of taskId => active job
82 self._tickets = {}; // Map of taskId => tickets
83
84 // Initialize Storage
85 self.use(opts.store || 'memory');
86 if (!self._store) {
87 throw new Error('Queue cannot continue without a valid store.')
88 }
89}
90
91util.inherits(Queue, EE);
92
93Queue.prototype.destroy = function (cb) {
94 cb = cb || function () {};
95 var self = this;
96
97 // Statuses
98 self._hasMore = false;
99 self._isWriting = false;
100 self._writeQueue = [];
101 self._writing = {};
102 self._tasksWaitingForConnect = [];
103
104 // Clear internals
105 self._tickets = {};
106 self._workers = {};
107 self._fetching = 0;
108 self._running = {};
109 self._retries = {};
110 self._calledEmpty = true;
111 self._calledDrain = true;
112 self._connected = false;
113 self.pause();
114
115 if (typeof self._store.close === 'function') {
116 self._store.close(cb);
117 } else {
118 cb();
119 }
120}
121
122Queue.prototype.resetStats = function () {
123 this._queuedPeak = 0;
124 this._processedTotalElapsed = 0;
125 this._processedAverage = 0;
126 this._processedTotal = 0;
127 this._failedTotal = 0;
128}
129
130Queue.prototype.getStats = function () {
131 var successRate = this._processedTotal === 0 ? 0 : (1 - (this._failedTotal / this._processedTotal));
132 return {
133 successRate: successRate,
134 peak: this._queuedPeak,
135 average: this._processedAverage,
136 total: this._processedTotal
137 }
138}
139
140Queue.prototype.use = function (store, opts) {
141 var self = this;
142 if (typeof store === 'string' && stores[store]) {
143 try {
144 var Store;
145 if (store === 'memory') {
146 Store = require('./stores/memory');
147 } else if (store === 'sql') {
148 Store = require('./stores/SqlStore');
149 } else if (store === 'sqlite') {
150 Store = require('./stores/sqlite');
151 } else {
152 Store = require(stores[store]);
153 }
154 self._store = new Store(opts);
155 } catch (e) { throw e }
156 } else if (typeof store === 'object' && typeof store.type === 'string' && stores[store.type]) {
157 try {
158 var Store;
159 if (store.type === 'memory') {
160 Store = require('./stores/memory');
161 } else if (store.type === 'sql') {
162 Store = require('./stores/SqlStore');
163 } else if (store.type === 'sqlite') {
164 Store = require('./stores/sqlite');
165 } else {
166 Store = require(stores[store.type]);
167 }
168 self._store = new Store(store);
169 } catch (e) { throw e }
170 } else if (typeof store === 'object' &&
171 store.putTask &&
172 store.getTask &&
173 ((self.filo && store.takeLastN) ||
174 (!self.filo && store.takeFirstN))) {
175 self._store = store;
176 } else {
177 throw new Error('unknown_store');
178 }
179 self._connected = false;
180 self._tasksWaitingForConnect = [];
181 self._connectToStore();
182}
183
184Queue.prototype._connectToStore = function () {
185 var self = this;
186 if (self._connected) return;
187 if (self._storeRetries >= self.storeMaxRetries) {
188 return self.emit('error', new Error('failed_connect_to_store'));
189 }
190 self._storeRetries++;
191 self._store.connect(function (err, len) {
192 if (err) return setTimeout(function () {
193 self._connectToStore();
194 }, self.storeRetryTimeout);
195 if (len === undefined || len === null) throw new Error("store_not_returning_length");
196 self.length = parseInt(len);
197 if (isNaN(self.length)) throw new Error("length_is_not_a_number");
198 if (self.length) self._calledDrain = false;
199 self._connected = true;
200 self._storeRetries = 0;
201 self._store.getRunningTasks(function (err, running) {
202 if (!self._stopped && self.autoResume) {
203 Object.keys(running).forEach(function (lockId) {
204 self._running++;
205 self._startBatch(running[lockId], {}, lockId);
206 })
207 self.resume();
208 }
209 for (var i = 0; i < self._tasksWaitingForConnect.length; i++) {
210 self.push(self._tasksWaitingForConnect[i].input, self._tasksWaitingForConnect[i].ticket);
211 }
212 })
213 })
214
215}
216
217Queue.prototype.resume = function () {
218 var self = this;
219 self._stopped = false;
220 self._getWorkers().forEach(function (worker) {
221 if (typeof worker.resume === 'function') {
222 worker.resume();
223 }
224 })
225 setTimeout(function () {
226 self._processNextAfterTimeout();
227 }, 0)
228}
229
230Queue.prototype.pause = function () {
231 this._stopped = true;
232 this._getWorkers().forEach(function (worker) {
233 if (typeof worker.pause === 'function') {
234 worker.pause();
235 }
236 })
237}
238
239Queue.prototype.cancel = function (taskId, cb) {
240 cb = cb || function(){};
241 var self = this;
242 var worker = self._workers[taskId];
243 if (worker) {
244 worker.cancel();
245 }
246 self._store.deleteTask(taskId, cb);
247}
248
249Queue.prototype.push = function (input, cb) {
250 var self = this;
251 var ticket = new Ticket();
252 if (cb instanceof Ticket) {
253 ticket = cb;
254 } else if (cb) {
255 ticket
256 .on('finish', function (result) { cb(null, result) })
257 .on('failed', function (err) { cb(err) })
258 }
259
260 if (!self._connected) {
261 self._tasksWaitingForConnect.push({ input: input, ticket: ticket });
262 return ticket;
263 }
264
265 self.filter(input, function (err, task) {
266 if (err || task === undefined || task === false || task === null) {
267 return ticket.failed('input_rejected');
268 }
269 var acceptTask = function (taskId) {
270 setTimeout(function () {
271 self._queueTask(taskId, task, ticket);
272 }, 0)
273 }
274 if (typeof self.id === 'function') {
275 self.id(task, function (err, id) {
276 if (err) return ticket.failed('id_error');
277 acceptTask(id);
278 })
279 } else if (typeof self.id === 'string' && typeof task === 'object') {
280 acceptTask(task[self.id])
281 } else {
282 acceptTask();
283 }
284 })
285 return ticket;
286}
287
288Queue.prototype._getWorkers = function () {
289 var self = this;
290 var workers = [];
291 Object.keys(self._workers).forEach(function (taskId) {
292 var worker = self._workers[taskId];
293 if (worker && workers.indexOf(worker) === -1) {
294 workers.push(worker);
295 }
296 })
297 return workers;
298}
299
300Queue.prototype._writeNextTask = function () {
301 var self = this;
302 if (self._isWriting) return;
303 if (!self._writeQueue.length) return;
304 self._isWriting = true;
305
306 var taskId = self._writeQueue.shift();
307 var finishedWrite = function () {
308 self._isWriting = false;
309 setImmediate(function () {
310 self._writeNextTask();
311 })
312 }
313
314 if (!self._writing[taskId]) {
315 delete self._writing[taskId];
316 return finishedWrite();
317 }
318
319 var task = self._writing[taskId].task;
320 var priority = self._writing[taskId].priority;
321 var isNew = self._writing[taskId].isNew;
322 var writeId = self._writing[taskId].id;
323 var tickets = self._writing[taskId].tickets;
324
325 self._store.putTask(taskId, task, priority, function (err) {
326
327 // Check if task has changed since put
328 if (self._writing[taskId] && self._writing[taskId].id !== writeId) {
329 self._writeQueue.unshift(taskId);
330 return finishedWrite();
331 }
332 delete self._writing[taskId];
333
334 // If something else has written to taskId, then wait.
335 if (err) {
336 tickets.failed('failed_to_put_task');
337 return finishedWrite();
338 }
339
340 // Task is in the queue -- update stats
341 if (isNew) {
342 self.length++;
343 if (self._queuedPeak < self.length) {
344 self._queuedPeak = self.length;
345 }
346 self._queuedTime[taskId] = new Date().getTime();
347 }
348
349 // Notify the ticket
350 if (self._tickets[taskId]) {
351 self._tickets[taskId].push(tickets);
352 } else {
353 self._tickets[taskId] = tickets;
354 }
355 self.emit('task_queued', taskId, task);
356 tickets.queued();
357
358 // If it's a new task, make sure to call drain after.
359 if (isNew) {
360 self._calledDrain = false;
361 self._calledEmpty = false;
362 }
363
364 // If already fetching, mark that there are additions to the queue
365 if (self._fetching > 0) {
366 self._hasMore = true;
367 }
368
369 // Clear batchDelayTimeout
370 if (self.batchDelayTimeout < Infinity) {
371 if (self._batchDelayTimeoutId) clearTimeout(self._batchDelayTimeoutId)
372 self._batchDelayTimeoutId = setTimeout(function () {
373 self._batchDelayTimeoutId = null;
374 if (self._batchTimeoutId) clearTimeout(self._batchTimeoutId);
375 self._batchTimeoutId = null;
376 self._processNextIfAllowed();
377 }, self.batchDelayTimeout)
378 }
379
380 // Finish writing
381 finishedWrite();
382 self._processNextAfterTimeout();
383 })
384}
385
386Queue.prototype._queueTask = function (taskId, newTask, ticket) {
387 var self = this;
388 var emptyTicket = new Ticket();
389 ticket = ticket || emptyTicket;
390 var isUUID = false;
391 if (!taskId) {
392 taskId = uuid.v4();
393 isUUID = true;
394 }
395 var priority;
396 var oldTask = null;
397 var isNew = true;
398 var putTask = function () {
399 if (!self._connected) return;
400
401 // Save ticket
402 var tickets = (self._writing[taskId] && self._writing[taskId].tickets) || new Tickets();
403 if (ticket !== emptyTicket) {
404 tickets.push(ticket);
405 }
406
407 // Add to queue
408 var alreadyQueued = !!self._writing[taskId];
409 self._writing[taskId] = {
410 id: uuid.v4(),
411 isNew: isNew,
412 task: newTask,
413 priority: priority,
414 tickets: tickets
415 };
416 if (!alreadyQueued) {
417 self._writeQueue.push(taskId);
418 }
419
420 self._writeNextTask();
421 }
422 var updateTask = function () {
423 ticket.accept();
424 self.emit('task_accepted', taskId, newTask);
425
426 if (!self.priority) return putTask();
427 self.priority(newTask, function (err, p) {
428 if (err) return ticket.failed('failed_to_prioritize');
429 priority = p;
430 putTask();
431 })
432 }
433 var mergeTask = function () {
434 if (!oldTask) return updateTask();
435 self.merge(oldTask, newTask, function (err, mergedTask) {
436 if (err) return ticket.failed('failed_task_merge');
437 if (mergedTask === undefined) return;
438 newTask = mergedTask;
439 updateTask();
440 })
441 }
442
443 if (isUUID) {
444 return updateTask();
445 }
446
447 var worker = self._workers[taskId];
448 if (self.cancelIfRunning && worker) {
449 worker.cancel();
450 }
451
452 // Check if task is writing
453 if (self._writing[taskId]) {
454 oldTask = self._writing[taskId].task;
455 return mergeTask();
456 }
457
458 // Check store for task
459 self._store.getTask(taskId, function (err, savedTask) {
460 if (err) return ticket.failed('failed_to_get');
461
462 // Check if it's already in the store
463 if (savedTask !== undefined) {
464 isNew = false;
465 }
466
467 // Check if task is writing
468 if (self._writing[taskId]) {
469 oldTask = self._writing[taskId].task;
470 return mergeTask();
471 }
472
473 // No task before
474 if (savedTask === undefined) {
475 return updateTask();
476 }
477
478 oldTask = savedTask;
479 mergeTask();
480 })
481}
482
483Queue.prototype._emptied = function () {
484 if (this._calledEmpty) return;
485 this._calledEmpty = true;
486 this.emit('empty');
487}
488
489Queue.prototype._drained = function () {
490 if (this._calledDrain) return;
491 this._calledDrain = true;
492 this.emit('drain');
493}
494
495Queue.prototype._getNextBatch = function (cb) {
496 this._store[this.filo ? 'takeLastN' : 'takeFirstN'](this.batchSize, cb)
497}
498
499Queue.prototype._processNextAfterTimeout = function () {
500 var self = this;
501 if (self.batchSize === 1 && self.batchDelay > 0) {
502 if (!self._batchTimeoutId) {
503 self._batchTimeoutId = setTimeout(function () {
504 self._batchTimeoutId = null;
505 self._processNextIfAllowed();
506 }, self.batchDelay)
507 }
508 } else if (self.length >= self.batchSize) {
509 if (self._batchTimeoutId) {
510 clearTimeout(self._batchTimeoutId);
511 self._batchTimeoutId = null;
512 }
513 setImmediate(function () {
514 self._processNextIfAllowed();
515 })
516 } else if (!self._batchTimeoutId && self.batchDelay < Infinity) {
517 self._batchTimeoutId = setTimeout(function () {
518 self._batchTimeoutId = null;
519 self._processNextIfAllowed();
520 }, self.batchDelay)
521 }
522}
523
524Queue.prototype._processNextIfAllowed = function () {
525 var self = this;
526 if (!self._connected) return;
527 if (self._stopped) return;
528
529 self._saturated = (self._running + self._fetching >= self.concurrent);
530 if (self._saturated) return;
531 if (!self.length) {
532 if (!self._hasMore) {
533 self._emptied();
534 if (!self._running) {
535 self._drained();
536 }
537 }
538 return;
539 }
540
541 self.precondition(function (err, pass) {
542 if (err || !pass) {
543 if (!self._preconditionRetryTimeoutId && self.preconditionRetryTimeout) {
544 self._preconditionRetryTimeoutId = setTimeout(function () {
545 self._preconditionRetryTimeoutId = null;
546 self._processNextIfAllowed();
547 }, self.preconditionRetryTimeout)
548 }
549 } else {
550 self._processNext();
551 }
552 })
553}
554
555Queue.prototype._processNext = function () {
556 var self = this;
557 // FIXME: There may still be things writing
558 self._hasMore = false;
559 self._fetching++;
560 self._getNextBatch(function (err, lockId) {
561 self._fetching--;
562 if (err || !lockId) return;
563 self._store.getLock(lockId, function (err, batch) {
564 if (err || !batch) return;
565 var batchSize = Object.keys(batch).length;
566 var isEmpty = (batchSize === 0);
567
568 if (self.length < batchSize) {
569 self.length = batchSize;
570 }
571
572 if (!self._hasMore && isEmpty) {
573 self._emptied();
574 if (!self._running) {
575 self._drained();
576 }
577 return;
578 }
579
580 // The write queue wasn't empty on fetch, so we should fetch more.
581 if (self._hasMore && isEmpty) {
582 return self._processNextAfterTimeout()
583 }
584
585 var tickets = {};
586 Object.keys(batch).forEach(function (taskId) {
587 var ticket = self._tickets[taskId];
588 if (ticket) {
589 ticket.started();
590 tickets[taskId] = ticket;
591 delete self._tickets[taskId];
592 }
593 })
594
595 // Acquire lock on process
596 self._running++;
597
598 if (self.concurrent - self._running > 1) {
599 // Continue processing until saturated
600 self._processNextIfAllowed();
601 }
602 self._startBatch(batch, tickets, lockId);
603 });
604 });
605}
606
607Queue.prototype._startBatch = function (batch, tickets, lockId) {
608 var self = this;
609 var taskIds = Object.keys(batch);
610 var timeout = null;
611 var worker = new Worker({
612 fn: self.process,
613 batch: batch,
614 single: (self.batchSize === 1),
615 failTaskOnProcessException: self.failTaskOnProcessException
616 })
617 var updateStatsForEndedTask = function (taskId) {
618 self._processedTotal++;
619 var stats = {};
620 if (!self._queuedTime[taskId]) return stats;
621
622 var elapsed = (new Date().getTime() - self._queuedTime[taskId]);
623 delete self._queuedTime[taskId];
624
625 if (elapsed > 0) {
626 stats.elapsed = elapsed;
627 self._processedTotalElapsed += elapsed;
628 self._processedAverage = self._processedTotalElapsed/self._processedTotal;
629 }
630 return stats;
631 }
632
633 if (self.maxTimeout < Infinity) {
634 timeout = setTimeout(function () {
635 worker.failedBatch('task_timeout');
636 }, self.maxTimeout);
637 }
638 worker.on('task_failed', function (id, msg) {
639 var taskId = taskIds[id];
640 self._retries[taskId] = self._retries[taskId] || 0;
641 self._retries[taskId]++;
642 if (worker.cancelled || self._retries[taskId] >= self.maxRetries) {
643 var stats = updateStatsForEndedTask(taskId);
644 if (tickets[taskId]) {
645 // Mark as a failure
646 tickets[taskId].failed(msg);
647 delete tickets[taskId];
648 }
649 self._failedTotal++;
650 self.emit('task_failed', taskId, msg, stats);
651 } else {
652 if (self.retryDelay) {
653 // Pop back onto queue and retry
654 setTimeout(function () {
655 self.emit('task_retry', taskId, self._retries[taskId]);
656 self._queueTask(taskId, batch[taskId], tickets[taskId]);
657 }, self.retryDelay)
658 } else {
659 setImmediate(function () {
660 self.emit('task_retry', taskId, self._retries[taskId]);
661 self._queueTask(taskId, batch[taskId], tickets[taskId]);
662 });
663 }
664 }
665 })
666 worker.on('task_finish', function (id, result) {
667 var taskId = taskIds[id];
668 var stats = updateStatsForEndedTask(taskId);
669 if (tickets[taskId]) {
670 tickets[taskId].finish(result);
671 delete tickets[taskId];
672 }
673 self.emit('task_finish', taskId, result, stats);
674 })
675 worker.on('task_progress', function (id, progress) {
676 var taskId = taskIds[id];
677 if (tickets[taskId]) {
678 tickets[taskId].progress(progress);
679 delete tickets[taskId];
680 }
681 self.emit('task_progress', taskId, progress);
682 })
683 worker.on('progress', function (progress) {
684 self.emit('batch_progress', progress);
685 })
686 worker.on('finish', function (result) {
687 self.emit('batch_finish', result);
688 })
689 worker.on('failed', function (err) {
690 self.emit('batch_failed', err);
691 })
692 worker.on('end', function () {
693 self.length -= Object.keys(batch).length;
694 if (timeout) {
695 clearTimeout(timeout);
696 }
697 var finishAndGetNext = function () {
698 if (!self._connected) return;
699 self._store.releaseLock(lockId, function (err) {
700 if (err) {
701 // If we cannot release the lock then retry
702 return setTimeout(function () {
703 finishAndGetNext();
704 }, 1)
705 }
706 self._running--;
707 taskIds.forEach(function (taskId) {
708 if (self._workers[taskId] && !self._workers[taskId].active) {
709 delete self._workers[taskId];
710 }
711 });
712 self._processNextAfterTimeout();
713 })
714 }
715 if (self.afterProcessDelay) {
716 setTimeout(function () {
717 finishAndGetNext()
718 }, self.afterProcessDelay);
719 } else {
720 setImmediate(function () {
721 finishAndGetNext()
722 })
723 }
724 })
725
726 taskIds.forEach(function (taskId) {
727 self._workers[taskId] = worker;
728 });
729
730 try {
731 worker.start();
732 taskIds.forEach(function (taskId) {
733 self.emit('task_started', taskId, batch[taskId])
734 });
735 } catch (e) {
736 self.emit('error', e);
737 }
738
739}
740
741module.exports = Queue;