1 | var uuid = require('node-uuid');
|
2 | var util = require('util');
|
3 | var EE = require('events').EventEmitter;
|
4 | var Ticket = require('./ticket');
|
5 | var Worker = require('./worker');
|
6 | var Tickets = require('./tickets');
|
7 |
|
8 | var stores = {
|
9 | memory: './stores/memory',
|
10 | sqlite: './stores/sqlite',
|
11 | sql: './stores/SqlStore',
|
12 | }
|
13 |
|
14 | function Queue(process, opts) {
|
15 | var self = this;
|
16 | opts = opts || {};
|
17 | if (typeof process === 'object') {
|
18 | opts = process || {};
|
19 | }
|
20 | if (typeof process === 'function') {
|
21 | opts.process = process;
|
22 | }
|
23 | if (!opts.process) {
|
24 | throw new Error("Queue has no process function.");
|
25 | }
|
26 |
|
27 | opts = opts || {};
|
28 |
|
29 | self.process = opts.process || function (task, cb) { cb(null, {}) };
|
30 | self.filter = opts.filter || function (input, cb) { cb(null, input) };
|
31 | self.merge = opts.merge || function (oldTask, newTask, cb) { cb(null, newTask) };
|
32 | self.precondition = opts.precondition || function (cb) { cb(null, true) };
|
33 | self.id = opts.id || false;
|
34 | self.priority = opts.priority || null;
|
35 |
|
36 | self.cancelIfRunning = (opts.cancelIfRunning === undefined ? false : !!opts.cancelIfRunning);
|
37 | self.autoResume = (opts.autoResume === undefined ? true : !!opts.autoResume);
|
38 | self.failTaskOnProcessException = (opts.failTaskOnProcessException === undefined ? true : !!opts.failTaskOnProcessException);
|
39 | self.filo = opts.filo || false;
|
40 | self.batchSize = opts.batchSize || 1;
|
41 | self.batchDelay = opts.batchDelay || 0;
|
42 | self.batchDelayTimeout = opts.batchDelayTimeout || Infinity;
|
43 | self.afterProcessDelay = opts.afterProcessDelay || 0;
|
44 | self.concurrent = opts.concurrent || 1;
|
45 | self.maxTimeout = opts.maxTimeout || Infinity;
|
46 | self.maxRetries = opts.maxRetries || 0;
|
47 | self.retryDelay = opts.retryDelay || 0;
|
48 | self.storeMaxRetries = opts.storeMaxRetries || Infinity;
|
49 | self.storeRetryTimeout = opts.storeRetryTimeout || 1000;
|
50 | self.preconditionRetryTimeout = opts.preconditionRetryTimeout || 1000;
|
51 |
|
52 |
|
53 | self._queuedPeak = 0;
|
54 | self._queuedTime = {};
|
55 | self._processedTotalElapsed = 0;
|
56 | self._processedAverage = 0;
|
57 | self._processedTotal = 0;
|
58 | self._failedTotal = 0;
|
59 | self.length = 0;
|
60 | self._stopped = false;
|
61 | self._saturated = false;
|
62 |
|
63 | self._preconditionRetryTimeoutId = null;
|
64 | self._batchTimeoutId = null;
|
65 | self._batchDelayTimeoutId = null;
|
66 | self._connected = false;
|
67 | self._storeRetries = 0;
|
68 |
|
69 |
|
70 | self._hasMore = false;
|
71 | self._isWriting = false;
|
72 | self._writeQueue = [];
|
73 | self._writing = {};
|
74 | self._tasksWaitingForConnect = [];
|
75 |
|
76 | self._calledDrain = true;
|
77 | self._calledEmpty = true;
|
78 | self._fetching = 0;
|
79 | self._running = 0;
|
80 | self._retries = {};
|
81 | self._workers = {};
|
82 | self._tickets = {};
|
83 |
|
84 |
|
85 | self.use(opts.store || 'memory');
|
86 | if (!self._store) {
|
87 | throw new Error('Queue cannot continue without a valid store.')
|
88 | }
|
89 | }
|
90 |
|
91 | util.inherits(Queue, EE);
|
92 |
|
93 | Queue.prototype.destroy = function (cb) {
|
94 | cb = cb || function () {};
|
95 | var self = this;
|
96 |
|
97 |
|
98 | self._hasMore = false;
|
99 | self._isWriting = false;
|
100 | self._writeQueue = [];
|
101 | self._writing = {};
|
102 | self._tasksWaitingForConnect = [];
|
103 |
|
104 |
|
105 | self._tickets = {};
|
106 | self._workers = {};
|
107 | self._fetching = 0;
|
108 | self._running = {};
|
109 | self._retries = {};
|
110 | self._calledEmpty = true;
|
111 | self._calledDrain = true;
|
112 | self._connected = false;
|
113 | self.pause();
|
114 |
|
115 | if (typeof self._store.close === 'function') {
|
116 | self._store.close(cb);
|
117 | } else {
|
118 | cb();
|
119 | }
|
120 | }
|
121 |
|
122 | Queue.prototype.resetStats = function () {
|
123 | this._queuedPeak = 0;
|
124 | this._processedTotalElapsed = 0;
|
125 | this._processedAverage = 0;
|
126 | this._processedTotal = 0;
|
127 | this._failedTotal = 0;
|
128 | }
|
129 |
|
130 | Queue.prototype.getStats = function () {
|
131 | var successRate = this._processedTotal === 0 ? 0 : (1 - (this._failedTotal / this._processedTotal));
|
132 | return {
|
133 | successRate: successRate,
|
134 | peak: this._queuedPeak,
|
135 | average: this._processedAverage,
|
136 | total: this._processedTotal
|
137 | }
|
138 | }
|
139 |
|
140 | Queue.prototype.use = function (store, opts) {
|
141 | var self = this;
|
142 | if (typeof store === 'string' && stores[store]) {
|
143 | try {
|
144 | var Store;
|
145 | if (store === 'memory') {
|
146 | Store = require('./stores/memory');
|
147 | } else if (store === 'sql') {
|
148 | Store = require('./stores/SqlStore');
|
149 | } else if (store === 'sqlite') {
|
150 | Store = require('./stores/sqlite');
|
151 | } else {
|
152 | Store = require(stores[store]);
|
153 | }
|
154 | self._store = new Store(opts);
|
155 | } catch (e) { throw e }
|
156 | } else if (typeof store === 'object' && typeof store.type === 'string' && stores[store.type]) {
|
157 | try {
|
158 | var Store;
|
159 | if (store.type === 'memory') {
|
160 | Store = require('./stores/memory');
|
161 | } else if (store.type === 'sql') {
|
162 | Store = require('./stores/SqlStore');
|
163 | } else if (store.type === 'sqlite') {
|
164 | Store = require('./stores/sqlite');
|
165 | } else {
|
166 | Store = require(stores[store.type]);
|
167 | }
|
168 | self._store = new Store(store);
|
169 | } catch (e) { throw e }
|
170 | } else if (typeof store === 'object' &&
|
171 | store.putTask &&
|
172 | store.getTask &&
|
173 | ((self.filo && store.takeLastN) ||
|
174 | (!self.filo && store.takeFirstN))) {
|
175 | self._store = store;
|
176 | } else {
|
177 | throw new Error('unknown_store');
|
178 | }
|
179 | self._connected = false;
|
180 | self._tasksWaitingForConnect = [];
|
181 | self._connectToStore();
|
182 | }
|
183 |
|
184 | Queue.prototype._connectToStore = function () {
|
185 | var self = this;
|
186 | if (self._connected) return;
|
187 | if (self._storeRetries >= self.storeMaxRetries) {
|
188 | return self.emit('error', new Error('failed_connect_to_store'));
|
189 | }
|
190 | self._storeRetries++;
|
191 | self._store.connect(function (err, len) {
|
192 | if (err) return setTimeout(function () {
|
193 | self._connectToStore();
|
194 | }, self.storeRetryTimeout);
|
195 | if (len === undefined || len === null) throw new Error("store_not_returning_length");
|
196 | self.length = parseInt(len);
|
197 | if (isNaN(self.length)) throw new Error("length_is_not_a_number");
|
198 | if (self.length) self._calledDrain = false;
|
199 | self._connected = true;
|
200 | self._storeRetries = 0;
|
201 | self._store.getRunningTasks(function (err, running) {
|
202 | if (!self._stopped && self.autoResume) {
|
203 | Object.keys(running).forEach(function (lockId) {
|
204 | self._running++;
|
205 | self._startBatch(running[lockId], {}, lockId);
|
206 | })
|
207 | self.resume();
|
208 | }
|
209 | for (var i = 0; i < self._tasksWaitingForConnect.length; i++) {
|
210 | self.push(self._tasksWaitingForConnect[i].input, self._tasksWaitingForConnect[i].ticket);
|
211 | }
|
212 | })
|
213 | })
|
214 |
|
215 | }
|
216 |
|
217 | Queue.prototype.resume = function () {
|
218 | var self = this;
|
219 | self._stopped = false;
|
220 | self._getWorkers().forEach(function (worker) {
|
221 | if (typeof worker.resume === 'function') {
|
222 | worker.resume();
|
223 | }
|
224 | })
|
225 | setTimeout(function () {
|
226 | self._processNextAfterTimeout();
|
227 | }, 0)
|
228 | }
|
229 |
|
230 | Queue.prototype.pause = function () {
|
231 | this._stopped = true;
|
232 | this._getWorkers().forEach(function (worker) {
|
233 | if (typeof worker.pause === 'function') {
|
234 | worker.pause();
|
235 | }
|
236 | })
|
237 | }
|
238 |
|
239 | Queue.prototype.cancel = function (taskId, cb) {
|
240 | cb = cb || function(){};
|
241 | var self = this;
|
242 | var worker = self._workers[taskId];
|
243 | if (worker) {
|
244 | worker.cancel();
|
245 | }
|
246 | self._store.deleteTask(taskId, cb);
|
247 | }
|
248 |
|
249 | Queue.prototype.push = function (input, cb) {
|
250 | var self = this;
|
251 | var ticket = new Ticket();
|
252 | if (cb instanceof Ticket) {
|
253 | ticket = cb;
|
254 | } else if (cb) {
|
255 | ticket
|
256 | .on('finish', function (result) { cb(null, result) })
|
257 | .on('failed', function (err) { cb(err) })
|
258 | }
|
259 |
|
260 | if (!self._connected) {
|
261 | self._tasksWaitingForConnect.push({ input: input, ticket: ticket });
|
262 | return ticket;
|
263 | }
|
264 |
|
265 | self.filter(input, function (err, task) {
|
266 | if (err || task === undefined || task === false || task === null) {
|
267 | return ticket.failed('input_rejected');
|
268 | }
|
269 | var acceptTask = function (taskId) {
|
270 | setTimeout(function () {
|
271 | self._queueTask(taskId, task, ticket);
|
272 | }, 0)
|
273 | }
|
274 | if (typeof self.id === 'function') {
|
275 | self.id(task, function (err, id) {
|
276 | if (err) return ticket.failed('id_error');
|
277 | acceptTask(id);
|
278 | })
|
279 | } else if (typeof self.id === 'string' && typeof task === 'object') {
|
280 | acceptTask(task[self.id])
|
281 | } else {
|
282 | acceptTask();
|
283 | }
|
284 | })
|
285 | return ticket;
|
286 | }
|
287 |
|
288 | Queue.prototype._getWorkers = function () {
|
289 | var self = this;
|
290 | var workers = [];
|
291 | Object.keys(self._workers).forEach(function (taskId) {
|
292 | var worker = self._workers[taskId];
|
293 | if (worker && workers.indexOf(worker) === -1) {
|
294 | workers.push(worker);
|
295 | }
|
296 | })
|
297 | return workers;
|
298 | }
|
299 |
|
300 | Queue.prototype._writeNextTask = function () {
|
301 | var self = this;
|
302 | if (self._isWriting) return;
|
303 | if (!self._writeQueue.length) return;
|
304 | self._isWriting = true;
|
305 |
|
306 | var taskId = self._writeQueue.shift();
|
307 | var finishedWrite = function () {
|
308 | self._isWriting = false;
|
309 | setImmediate(function () {
|
310 | self._writeNextTask();
|
311 | })
|
312 | }
|
313 |
|
314 | if (!self._writing[taskId]) {
|
315 | delete self._writing[taskId];
|
316 | return finishedWrite();
|
317 | }
|
318 |
|
319 | var task = self._writing[taskId].task;
|
320 | var priority = self._writing[taskId].priority;
|
321 | var isNew = self._writing[taskId].isNew;
|
322 | var writeId = self._writing[taskId].id;
|
323 | var tickets = self._writing[taskId].tickets;
|
324 |
|
325 | self._store.putTask(taskId, task, priority, function (err) {
|
326 |
|
327 |
|
328 | if (self._writing[taskId] && self._writing[taskId].id !== writeId) {
|
329 | self._writeQueue.unshift(taskId);
|
330 | return finishedWrite();
|
331 | }
|
332 | delete self._writing[taskId];
|
333 |
|
334 |
|
335 | if (err) {
|
336 | tickets.failed('failed_to_put_task');
|
337 | return finishedWrite();
|
338 | }
|
339 |
|
340 |
|
341 | if (isNew) {
|
342 | self.length++;
|
343 | if (self._queuedPeak < self.length) {
|
344 | self._queuedPeak = self.length;
|
345 | }
|
346 | self._queuedTime[taskId] = new Date().getTime();
|
347 | }
|
348 |
|
349 |
|
350 | if (self._tickets[taskId]) {
|
351 | self._tickets[taskId].push(tickets);
|
352 | } else {
|
353 | self._tickets[taskId] = tickets;
|
354 | }
|
355 | self.emit('task_queued', taskId, task);
|
356 | tickets.queued();
|
357 |
|
358 |
|
359 | if (isNew) {
|
360 | self._calledDrain = false;
|
361 | self._calledEmpty = false;
|
362 | }
|
363 |
|
364 |
|
365 | if (self._fetching > 0) {
|
366 | self._hasMore = true;
|
367 | }
|
368 |
|
369 |
|
370 | if (self.batchDelayTimeout < Infinity) {
|
371 | if (self._batchDelayTimeoutId) clearTimeout(self._batchDelayTimeoutId)
|
372 | self._batchDelayTimeoutId = setTimeout(function () {
|
373 | self._batchDelayTimeoutId = null;
|
374 | if (self._batchTimeoutId) clearTimeout(self._batchTimeoutId);
|
375 | self._batchTimeoutId = null;
|
376 | self._processNextIfAllowed();
|
377 | }, self.batchDelayTimeout)
|
378 | }
|
379 |
|
380 |
|
381 | finishedWrite();
|
382 | self._processNextAfterTimeout();
|
383 | })
|
384 | }
|
385 |
|
386 | Queue.prototype._queueTask = function (taskId, newTask, ticket) {
|
387 | var self = this;
|
388 | var emptyTicket = new Ticket();
|
389 | ticket = ticket || emptyTicket;
|
390 | var isUUID = false;
|
391 | if (!taskId) {
|
392 | taskId = uuid.v4();
|
393 | isUUID = true;
|
394 | }
|
395 | var priority;
|
396 | var oldTask = null;
|
397 | var isNew = true;
|
398 | var putTask = function () {
|
399 | if (!self._connected) return;
|
400 |
|
401 |
|
402 | var tickets = (self._writing[taskId] && self._writing[taskId].tickets) || new Tickets();
|
403 | if (ticket !== emptyTicket) {
|
404 | tickets.push(ticket);
|
405 | }
|
406 |
|
407 |
|
408 | var alreadyQueued = !!self._writing[taskId];
|
409 | self._writing[taskId] = {
|
410 | id: uuid.v4(),
|
411 | isNew: isNew,
|
412 | task: newTask,
|
413 | priority: priority,
|
414 | tickets: tickets
|
415 | };
|
416 | if (!alreadyQueued) {
|
417 | self._writeQueue.push(taskId);
|
418 | }
|
419 |
|
420 | self._writeNextTask();
|
421 | }
|
422 | var updateTask = function () {
|
423 | ticket.accept();
|
424 | self.emit('task_accepted', taskId, newTask);
|
425 |
|
426 | if (!self.priority) return putTask();
|
427 | self.priority(newTask, function (err, p) {
|
428 | if (err) return ticket.failed('failed_to_prioritize');
|
429 | priority = p;
|
430 | putTask();
|
431 | })
|
432 | }
|
433 | var mergeTask = function () {
|
434 | if (!oldTask) return updateTask();
|
435 | self.merge(oldTask, newTask, function (err, mergedTask) {
|
436 | if (err) return ticket.failed('failed_task_merge');
|
437 | if (mergedTask === undefined) return;
|
438 | newTask = mergedTask;
|
439 | updateTask();
|
440 | })
|
441 | }
|
442 |
|
443 | if (isUUID) {
|
444 | return updateTask();
|
445 | }
|
446 |
|
447 | var worker = self._workers[taskId];
|
448 | if (self.cancelIfRunning && worker) {
|
449 | worker.cancel();
|
450 | }
|
451 |
|
452 |
|
453 | if (self._writing[taskId]) {
|
454 | oldTask = self._writing[taskId].task;
|
455 | return mergeTask();
|
456 | }
|
457 |
|
458 |
|
459 | self._store.getTask(taskId, function (err, savedTask) {
|
460 | if (err) return ticket.failed('failed_to_get');
|
461 |
|
462 |
|
463 | if (savedTask !== undefined) {
|
464 | isNew = false;
|
465 | }
|
466 |
|
467 |
|
468 | if (self._writing[taskId]) {
|
469 | oldTask = self._writing[taskId].task;
|
470 | return mergeTask();
|
471 | }
|
472 |
|
473 |
|
474 | if (savedTask === undefined) {
|
475 | return updateTask();
|
476 | }
|
477 |
|
478 | oldTask = savedTask;
|
479 | mergeTask();
|
480 | })
|
481 | }
|
482 |
|
483 | Queue.prototype._emptied = function () {
|
484 | if (this._calledEmpty) return;
|
485 | this._calledEmpty = true;
|
486 | this.emit('empty');
|
487 | }
|
488 |
|
489 | Queue.prototype._drained = function () {
|
490 | if (this._calledDrain) return;
|
491 | this._calledDrain = true;
|
492 | this.emit('drain');
|
493 | }
|
494 |
|
495 | Queue.prototype._getNextBatch = function (cb) {
|
496 | this._store[this.filo ? 'takeLastN' : 'takeFirstN'](this.batchSize, cb)
|
497 | }
|
498 |
|
499 | Queue.prototype._processNextAfterTimeout = function () {
|
500 | var self = this;
|
501 | if (self.batchSize === 1 && self.batchDelay > 0) {
|
502 | if (!self._batchTimeoutId) {
|
503 | self._batchTimeoutId = setTimeout(function () {
|
504 | self._batchTimeoutId = null;
|
505 | self._processNextIfAllowed();
|
506 | }, self.batchDelay)
|
507 | }
|
508 | } else if (self.length >= self.batchSize) {
|
509 | if (self._batchTimeoutId) {
|
510 | clearTimeout(self._batchTimeoutId);
|
511 | self._batchTimeoutId = null;
|
512 | }
|
513 | setImmediate(function () {
|
514 | self._processNextIfAllowed();
|
515 | })
|
516 | } else if (!self._batchTimeoutId && self.batchDelay < Infinity) {
|
517 | self._batchTimeoutId = setTimeout(function () {
|
518 | self._batchTimeoutId = null;
|
519 | self._processNextIfAllowed();
|
520 | }, self.batchDelay)
|
521 | }
|
522 | }
|
523 |
|
524 | Queue.prototype._processNextIfAllowed = function () {
|
525 | var self = this;
|
526 | if (!self._connected) return;
|
527 | if (self._stopped) return;
|
528 |
|
529 | self._saturated = (self._running + self._fetching >= self.concurrent);
|
530 | if (self._saturated) return;
|
531 | if (!self.length) {
|
532 | if (!self._hasMore) {
|
533 | self._emptied();
|
534 | if (!self._running) {
|
535 | self._drained();
|
536 | }
|
537 | }
|
538 | return;
|
539 | }
|
540 |
|
541 | self.precondition(function (err, pass) {
|
542 | if (err || !pass) {
|
543 | if (!self._preconditionRetryTimeoutId && self.preconditionRetryTimeout) {
|
544 | self._preconditionRetryTimeoutId = setTimeout(function () {
|
545 | self._preconditionRetryTimeoutId = null;
|
546 | self._processNextIfAllowed();
|
547 | }, self.preconditionRetryTimeout)
|
548 | }
|
549 | } else {
|
550 | self._processNext();
|
551 | }
|
552 | })
|
553 | }
|
554 |
|
555 | Queue.prototype._processNext = function () {
|
556 | var self = this;
|
557 |
|
558 | self._hasMore = false;
|
559 | self._fetching++;
|
560 | self._getNextBatch(function (err, lockId) {
|
561 | self._fetching--;
|
562 | if (err || !lockId) return;
|
563 | self._store.getLock(lockId, function (err, batch) {
|
564 | if (err || !batch) return;
|
565 | var batchSize = Object.keys(batch).length;
|
566 | var isEmpty = (batchSize === 0);
|
567 |
|
568 | if (self.length < batchSize) {
|
569 | self.length = batchSize;
|
570 | }
|
571 |
|
572 | if (!self._hasMore && isEmpty) {
|
573 | self._emptied();
|
574 | if (!self._running) {
|
575 | self._drained();
|
576 | }
|
577 | return;
|
578 | }
|
579 |
|
580 |
|
581 | if (self._hasMore && isEmpty) {
|
582 | return self._processNextAfterTimeout()
|
583 | }
|
584 |
|
585 | var tickets = {};
|
586 | Object.keys(batch).forEach(function (taskId) {
|
587 | var ticket = self._tickets[taskId];
|
588 | if (ticket) {
|
589 | ticket.started();
|
590 | tickets[taskId] = ticket;
|
591 | delete self._tickets[taskId];
|
592 | }
|
593 | })
|
594 |
|
595 |
|
596 | self._running++;
|
597 |
|
598 | if (self.concurrent - self._running > 1) {
|
599 |
|
600 | self._processNextIfAllowed();
|
601 | }
|
602 | self._startBatch(batch, tickets, lockId);
|
603 | });
|
604 | });
|
605 | }
|
606 |
|
607 | Queue.prototype._startBatch = function (batch, tickets, lockId) {
|
608 | var self = this;
|
609 | var taskIds = Object.keys(batch);
|
610 | var timeout = null;
|
611 | var worker = new Worker({
|
612 | fn: self.process,
|
613 | batch: batch,
|
614 | single: (self.batchSize === 1),
|
615 | failTaskOnProcessException: self.failTaskOnProcessException
|
616 | })
|
617 | var updateStatsForEndedTask = function (taskId) {
|
618 | self._processedTotal++;
|
619 | var stats = {};
|
620 | if (!self._queuedTime[taskId]) return stats;
|
621 |
|
622 | var elapsed = (new Date().getTime() - self._queuedTime[taskId]);
|
623 | delete self._queuedTime[taskId];
|
624 |
|
625 | if (elapsed > 0) {
|
626 | stats.elapsed = elapsed;
|
627 | self._processedTotalElapsed += elapsed;
|
628 | self._processedAverage = self._processedTotalElapsed/self._processedTotal;
|
629 | }
|
630 | return stats;
|
631 | }
|
632 |
|
633 | if (self.maxTimeout < Infinity) {
|
634 | timeout = setTimeout(function () {
|
635 | worker.failedBatch('task_timeout');
|
636 | }, self.maxTimeout);
|
637 | }
|
638 | worker.on('task_failed', function (id, msg) {
|
639 | var taskId = taskIds[id];
|
640 | self._retries[taskId] = self._retries[taskId] || 0;
|
641 | self._retries[taskId]++;
|
642 | if (worker.cancelled || self._retries[taskId] >= self.maxRetries) {
|
643 | var stats = updateStatsForEndedTask(taskId);
|
644 | if (tickets[taskId]) {
|
645 |
|
646 | tickets[taskId].failed(msg);
|
647 | delete tickets[taskId];
|
648 | }
|
649 | self._failedTotal++;
|
650 | self.emit('task_failed', taskId, msg, stats);
|
651 | } else {
|
652 | if (self.retryDelay) {
|
653 |
|
654 | setTimeout(function () {
|
655 | self.emit('task_retry', taskId, self._retries[taskId]);
|
656 | self._queueTask(taskId, batch[taskId], tickets[taskId]);
|
657 | }, self.retryDelay)
|
658 | } else {
|
659 | setImmediate(function () {
|
660 | self.emit('task_retry', taskId, self._retries[taskId]);
|
661 | self._queueTask(taskId, batch[taskId], tickets[taskId]);
|
662 | });
|
663 | }
|
664 | }
|
665 | })
|
666 | worker.on('task_finish', function (id, result) {
|
667 | var taskId = taskIds[id];
|
668 | var stats = updateStatsForEndedTask(taskId);
|
669 | if (tickets[taskId]) {
|
670 | tickets[taskId].finish(result);
|
671 | delete tickets[taskId];
|
672 | }
|
673 | self.emit('task_finish', taskId, result, stats);
|
674 | })
|
675 | worker.on('task_progress', function (id, progress) {
|
676 | var taskId = taskIds[id];
|
677 | if (tickets[taskId]) {
|
678 | tickets[taskId].progress(progress);
|
679 | delete tickets[taskId];
|
680 | }
|
681 | self.emit('task_progress', taskId, progress);
|
682 | })
|
683 | worker.on('progress', function (progress) {
|
684 | self.emit('batch_progress', progress);
|
685 | })
|
686 | worker.on('finish', function (result) {
|
687 | self.emit('batch_finish', result);
|
688 | })
|
689 | worker.on('failed', function (err) {
|
690 | self.emit('batch_failed', err);
|
691 | })
|
692 | worker.on('end', function () {
|
693 | self.length -= Object.keys(batch).length;
|
694 | if (timeout) {
|
695 | clearTimeout(timeout);
|
696 | }
|
697 | var finishAndGetNext = function () {
|
698 | if (!self._connected) return;
|
699 | self._store.releaseLock(lockId, function (err) {
|
700 | if (err) {
|
701 |
|
702 | return setTimeout(function () {
|
703 | finishAndGetNext();
|
704 | }, 1)
|
705 | }
|
706 | self._running--;
|
707 | taskIds.forEach(function (taskId) {
|
708 | if (self._workers[taskId] && !self._workers[taskId].active) {
|
709 | delete self._workers[taskId];
|
710 | }
|
711 | });
|
712 | self._processNextAfterTimeout();
|
713 | })
|
714 | }
|
715 | if (self.afterProcessDelay) {
|
716 | setTimeout(function () {
|
717 | finishAndGetNext()
|
718 | }, self.afterProcessDelay);
|
719 | } else {
|
720 | setImmediate(function () {
|
721 | finishAndGetNext()
|
722 | })
|
723 | }
|
724 | })
|
725 |
|
726 | taskIds.forEach(function (taskId) {
|
727 | self._workers[taskId] = worker;
|
728 | });
|
729 |
|
730 | try {
|
731 | worker.start();
|
732 | taskIds.forEach(function (taskId) {
|
733 | self.emit('task_started', taskId, batch[taskId])
|
734 | });
|
735 | } catch (e) {
|
736 | self.emit('error', e);
|
737 | }
|
738 |
|
739 | }
|
740 |
|
741 | module.exports = Queue;
|