UNPKG

23.4 kBJavaScriptView Raw
1'use strict';
2
3const redis = require('./redis');
4const Emitter = require('events').EventEmitter;
5
6const Job = require('./job');
7const defaults = require('./defaults');
8const lua = require('./lua');
9const helpers = require('./helpers');
10const backoff = require('./backoff');
11const EagerTimer = require('./eager-timer');
12
13class Queue extends Emitter {
14 constructor(name, settings) {
15 super();
16
17 this.name = name;
18 this.paused = false;
19 this.jobs = new Map();
20 this.activeJobs = new Set();
21 this.checkTimer = null;
22
23 this._closed = null;
24 this._isClosed = false;
25
26 this.client = null;
27 this.bclient = null;
28 this.eclient = null;
29
30 settings = settings || {};
31 this.settings = {
32 redis: settings.redis || {},
33 quitCommandClient: settings.quitCommandClient,
34 keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':'
35 };
36
37 for (let prop in defaults) {
38 const def = defaults[prop], setting = settings[prop], type = typeof def;
39 if (type === 'boolean') {
40 this.settings[prop] = typeof setting === 'boolean' ? setting : def;
41 } else if (type === 'number') {
42 this.settings[prop] = Number.isSafeInteger(setting) ? setting : def;
43 }
44 }
45
46 /* istanbul ignore if */
47 if (this.settings.redis.socket) {
48 this.settings.redis = Object.assign({}, this.settings.redis, {
49 path: this.settings.redis.socket
50 });
51 }
52
53 // By default, if we're given a redis client and no additional instructions,
54 // don't quit the connection on Queue#close.
55 if (typeof this.settings.quitCommandClient !== 'boolean') {
56 this.settings.quitCommandClient = !redis.isClient(this.settings.redis);
57 }
58
59 // To avoid changing the hidden class of the Queue.
60 this._delayedTimer = this.settings.activateDelayedJobs
61 ? new EagerTimer(this.settings.nearTermWindow)
62 : null;
63 if (this._delayedTimer) {
64 this._delayedTimer.on('trigger', this._activateDelayed.bind(this));
65 }
66
67 const makeClient = (clientName, createNew) => {
68 return redis.createClient(this.settings.redis, createNew)
69 .then((client) => {
70 client.on('error', this.emit.bind(this, 'error'));
71 return this[clientName] = client;
72 });
73 };
74
75 let eventsPromise = null;
76
77 if (this.settings.getEvents || this.settings.activateDelayedJobs) {
78 eventsPromise = makeClient('eclient', true).then(() => {
79 this.eclient.on('message', this._onMessage.bind(this));
80 const channels = [];
81 if (this.settings.getEvents) {
82 channels.push(this.toKey('events'));
83 }
84 if (this.settings.activateDelayedJobs) {
85 channels.push(this.toKey('earlierDelayed'));
86 }
87 return Promise.all(channels.map((channel) => {
88 const promise = helpers.deferred();
89 this.eclient.subscribe(channel, promise.defer());
90 return promise;
91 }));
92 });
93 }
94
95 this._isReady = false;
96
97 // Wait for Lua scripts and client connections to load. Also wait for
98 // bclient and eclient/subscribe if they're needed.
99 this._ready = Promise.all([
100 // Make the clients
101 makeClient('client', false),
102 this.settings.isWorker ? makeClient('bclient', true) : null,
103 eventsPromise
104 ]).then(() => {
105 if (this.settings.ensureScripts) {
106 return lua.buildCache(this.client);
107 }
108 }).then(() => {
109 this._isReady = true;
110 setImmediate(() => this.emit('ready'));
111 return this;
112 });
113 }
114
115 _onMessage(channel, message) {
116 if (channel === this.toKey('earlierDelayed')) {
117 // We should only receive these messages if activateDelayedJobs is
118 // enabled.
119 this._delayedTimer.schedule(parseInt(message, 10));
120 return;
121 }
122
123 message = JSON.parse(message);
124 if (message.event === 'failed' || message.event === 'retrying') {
125 message.data = new Error(message.data);
126 }
127
128 this.emit('job ' + message.event, message.id, message.data);
129
130 const job = this.jobs.get(message.id);
131 if (job) {
132 if (message.event === 'progress') {
133 job.progress = message.data;
134 } else if (message.event === 'retrying') {
135 job.options.retries -= 1;
136 }
137
138 job.emit(message.event, message.data);
139
140 if (message.event === 'succeeded' || message.event === 'failed') {
141 this.jobs.delete(message.id);
142 }
143 }
144 }
145
146 isRunning() {
147 return !this.paused;
148 }
149
150 ready(cb) {
151 if (cb) this._ready.then(() => cb(null), cb);
152
153 return this._ready;
154 }
155
156 _commandable(requireBlocking) {
157 if (requireBlocking ? this.paused : this._isClosed) {
158 return Promise.reject(new Error('closed'));
159 }
160
161 if (this._isReady) {
162 return Promise.resolve(requireBlocking ? this.bclient : this.client);
163 }
164
165 return this._ready.then(() => this._commandable(requireBlocking));
166 }
167
168 close(timeout, cb) {
169 if (typeof timeout === 'function') {
170 cb = timeout;
171 timeout = defaults['#close'].timeout;
172 } else if (!Number.isSafeInteger(timeout) || timeout <= 0) {
173 timeout = defaults['#close'].timeout;
174 }
175
176 if (this.paused) {
177 return this._closed;
178 }
179
180 this.paused = true;
181
182 if (this.checkTimer) {
183 clearTimeout(this.checkTimer);
184 this.checkTimer = null;
185 }
186
187 if (this._delayedTimer) {
188 this._delayedTimer.stop();
189 }
190
191 const cleanup = () => {
192 this._isClosed = true;
193
194 const clients = [];
195 if (this.settings.quitCommandClient) {
196 clients.push(this.client);
197 }
198 if (this.settings.getEvents) {
199 clients.push(this.eclient);
200 }
201
202 return Promise.all(clients.map((client) => {
203 const promise = helpers.deferred();
204 client.quit(promise.defer());
205 return promise;
206 }));
207 };
208
209 const closed = helpers.withTimeout(this._ready.then(() => {
210 // Stop the blocking connection, ensures that we don't accept additional
211 // jobs while waiting for the ongoing jobs to terminate.
212 if (this.settings.isWorker) {
213 redis.disconnect(this.bclient);
214 }
215
216 // Wait for all the jobs to complete. Ignore job errors during shutdown.
217 const waitJobs = Array.from(this.activeJobs);
218 return Promise.all(waitJobs.map((promise) => promise.catch(() => {})));
219 }), timeout).then(() => {
220 return cleanup().then(() => undefined);
221 }, (err) => {
222 return cleanup().then(() => Promise.reject(err));
223 });
224
225 this._closed = closed;
226
227 if (cb) helpers.asCallback(closed, cb);
228 return closed;
229 }
230
231 destroy(cb) {
232 const promise = this._commandable().then((client) => {
233 const deleted = helpers.deferred();
234 const args = ['id', 'jobs', 'stallBlock', 'stalling', 'waiting', 'active',
235 'succeeded', 'failed', 'delayed']
236 .map((key) => this.toKey(key));
237 args.push(deleted.defer());
238 client.del.apply(client, args);
239 return deleted;
240 });
241
242 if (cb) helpers.asCallback(promise, cb);
243 return promise;
244 }
245
246 checkHealth(cb) {
247 const promise = this._commandable().then((client) => {
248 const multi = helpers.deferred();
249 client.multi()
250 .llen(this.toKey('waiting'))
251 .llen(this.toKey('active'))
252 .scard(this.toKey('succeeded'))
253 .scard(this.toKey('failed'))
254 .zcard(this.toKey('delayed'))
255 .get(this.toKey('id'))
256 .exec(multi.defer());
257 return multi;
258 }).then((results) => ({
259 waiting: results[0],
260 active: results[1],
261 succeeded: results[2],
262 failed: results[3],
263 delayed: results[4],
264 newestJob: results[5] ? parseInt(results[5], 10) : 0
265 }));
266
267 if (cb) helpers.asCallback(promise, cb);
268 return promise;
269 }
270
271 _scanForJobs(key, cursor, size, set, cb) {
272 const batchCount = Math.min(size, this.settings.redisScanCount);
273 this.client.sscan(key, cursor, 'COUNT', batchCount, (err, results) => {
274 /* istanbul ignore if */
275 if (err) {
276 return cb(err);
277 }
278
279 const nextCursor = results[0];
280 const ids = results[1];
281
282 // A given element may be returned multiple times in SSCAN.
283 // So, we use a set to remove duplicates.
284 // https://redis.io/commands/scan#scan-guarantees
285 for (let id of ids) {
286 // For small sets, encoded as intsets, SSCAN will ignore COUNT.
287 // https://redis.io/commands/scan#the-count-option
288 if (set.size === size) break;
289
290 set.add(id);
291 }
292
293 if (nextCursor === '0' || set.size >= size) {
294 return cb(null, set);
295 }
296
297 this._scanForJobs(key, nextCursor, size, set, cb);
298 });
299 }
300
301 _addJobsByIds(jobs, ids) {
302 // We need to re-ensure the queue is commandable, as we might be shutting
303 // down during this operation.
304 return this._commandable().then((client) => {
305 const got = helpers.deferred();
306 const commandArgs = [this.toKey('jobs')].concat(ids, got.defer());
307 client.hmget.apply(client, commandArgs);
308 return got;
309 }).then((dataArray) => {
310 const count = ids.length;
311 // Some jobs returned by the scan may have already been removed, so filter
312 // them out.
313 for (let i = 0; i < count; ++i) {
314 const jobData = dataArray[i];
315 /* istanbul ignore else: not worth unit-testing this edge case */
316 if (jobData) {
317 jobs.push(Job.fromData(this, ids[i], jobData));
318 }
319 }
320 return jobs;
321 });
322 }
323
324 /**
325 * Get jobs from queue type.
326 *
327 * @param {String} type The queue type (failed, succeeded, waiting, etc.)
328 * @param {?Object=} page An object containing some of the following fields.
329 * @param {Number=} page.start Start of query range for waiting/active/delayed
330 * queue types. Defaults to 0.
331 * @param {Number=} page.end End of query range for waiting/active/delayed
332 * queue types. Defaults to 100.
333 * @param {Number=} page.size Number jobs to return for failed/succeeded (SET)
334 * types. Defaults to 100.
335 * @param {Function=} callback Called with the equivalent of the returned
336 * promise.
337 * @return {Promise<Job[]>} Resolves to the jobs the function found.
338 */
339 getJobs(type, page, cb) {
340 if (typeof page === 'function') {
341 cb = page;
342 page = null;
343 }
344 // Set defaults.
345 page = Object.assign({
346 size: 100,
347 start: 0,
348 end: 100
349 }, page);
350 const promise = this._commandable().then((client) => {
351 const idsPromise = helpers.deferred(), next = idsPromise.defer();
352 const key = this.toKey(type);
353 switch (type) {
354 case 'failed':
355 case 'succeeded':
356 this._scanForJobs(key, '0', page.size, new Set(), next);
357 break;
358 case 'waiting':
359 case 'active':
360 client.lrange(key, page.start, page.end, next);
361 break;
362 case 'delayed':
363 client.zrange(key, page.start, page.end, next);
364 break;
365 default:
366 throw new Error('Improper queue type');
367 }
368
369 return idsPromise;
370 }).then((ids) => {
371 const jobs = [], idsToFetch = [];
372 // ids might be a Set or an Array, but this will iterate just the same.
373 for (let jobId of ids) {
374 const job = this.jobs.get(jobId);
375 if (job) {
376 jobs.push(job);
377 } else {
378 idsToFetch.push(jobId);
379 }
380 }
381 if (!idsToFetch.length) {
382 return jobs;
383 }
384 return this._addJobsByIds(jobs, idsToFetch);
385 });
386
387 if (cb) helpers.asCallback(promise, cb);
388 return promise;
389 }
390
391 createJob(data) {
392 return new Job(this, null, data);
393 }
394
395 getJob(jobId, cb) {
396 const promise = this._commandable().then(() => {
397 if (this.jobs.has(jobId)) {
398 return this.jobs.get(jobId);
399 }
400 return Job.fromId(this, jobId);
401 }).then((job) => {
402 if (job && this.settings.storeJobs) {
403 this.jobs.set(jobId, job);
404 }
405 return job;
406 });
407
408 if (cb) helpers.asCallback(promise, cb);
409 return promise;
410 }
411
412 removeJob(jobId, cb) {
413 const promise = this._evalScript('removeJob', 7,
414 this.toKey('succeeded'), this.toKey('failed'), this.toKey('waiting'),
415 this.toKey('active'), this.toKey('stalling'), this.toKey('jobs'),
416 this.toKey('delayed'), jobId)
417 .then(() => this);
418
419 if (cb) helpers.asCallback(promise, cb);
420 return promise;
421 }
422
423 _waitForJob() {
424 const idPromise = helpers.deferred();
425 this.bclient.brpoplpush(this.toKey('waiting'), this.toKey('active'), 0,
426 idPromise.defer());
427
428 return idPromise.then((jobId) => {
429 // Note that the job may be null in the case that the client has removed
430 // the job before processing can take place, but after the brpoplpush has
431 // returned the job id.
432 return Job.fromId(this, jobId);
433 }, (err) => {
434 if (redis.isAbortError(err) && this.paused) {
435 return null;
436 }
437 throw err;
438 });
439 }
440
441 _getNextJob() {
442 // Under normal calling conditions, commandable will not reject because we
443 // will have just checked paused in Queue#process.
444 return this._commandable(true).then(() => this._waitForJob());
445 }
446
447 _runJob(job) {
448 let psTimeout = null, completed = false;
449
450 const preventStalling = () => {
451 psTimeout = null;
452 if (this._isClosed) return;
453 this._preventStall(job.id).then(() => {
454 if (completed || this._isClosed) return;
455 const interval = this.settings.stallInterval / 2;
456 psTimeout = setTimeout(preventStalling, interval);
457 });
458 };
459 preventStalling();
460
461 const handleOutcome = (err, data) => {
462 completed = true;
463 if (psTimeout) {
464 clearTimeout(psTimeout);
465 psTimeout = null;
466 }
467
468 return this._finishJob(err, data, job);
469 };
470
471 let promise = this.handler(job);
472
473 if (job.options.timeout) {
474 const message = `Job ${job.id} timed out (${job.options.timeout} ms)`;
475 promise = helpers.withTimeout(promise, job.options.timeout, message);
476 }
477
478 const jobPromise = promise
479 .then((data) => handleOutcome(null, data), handleOutcome)
480 .then((data) => {
481 this.activeJobs.delete(jobPromise);
482 return data;
483 }, (err) => {
484 // The only error that can happen here is either network- or
485 // Redis-related, or if Queue#close times out while a job is processing,
486 // and the job later finishes.
487 this.activeJobs.delete(jobPromise);
488 throw err;
489 });
490
491 // We specifically use the value produced by then to avoid cases where the
492 // process handler returns the same Promise object each invocation.
493 this.activeJobs.add(jobPromise);
494 return jobPromise;
495 }
496
497 _preventStall(jobId) {
498 const promise = helpers.deferred(), cb = promise.defer();
499 this.client.srem(this.toKey('stalling'), jobId, cb);
500 /* istanbul ignore next: these errors are only redis or network errors */
501 return promise.catch((err) => this.emit('error', err));
502 }
503
504 _finishJob(err, data, job) {
505 const status = err ? 'failed' : 'succeeded';
506
507 if (this._isClosed) {
508 throw new Error(`unable to update the status of ${status} job ${job.id}`);
509 }
510
511 const multi = this.client.multi()
512 .lrem(this.toKey('active'), 0, job.id)
513 .srem(this.toKey('stalling'), job.id);
514
515 const jobEvent = {
516 id: job.id,
517 event: status,
518 data: err ? err.message : data
519 };
520
521 if (err) {
522 const errInfo = err.stack || err.message || err;
523 job.options.stacktraces.unshift(errInfo);
524
525 const strategyName = job.options.backoff
526 ? job.options.backoff.strategy
527 : 'immediate';
528 const strategy = job.options.retries > 0
529 ? backoff.get(strategyName)
530 : null;
531 const delay = strategy ? strategy(job) : -1;
532 if (delay < 0) {
533 job.status = 'failed';
534 if (this.settings.removeOnFailure) {
535 multi.hdel(this.toKey('jobs'), job.id);
536 } else {
537 multi.hset(this.toKey('jobs'), job.id, job.toData());
538 multi.sadd(this.toKey('failed'), job.id);
539 }
540 } else {
541 job.options.retries -= 1;
542 job.status = 'retrying';
543 jobEvent.event = 'retrying';
544 multi.hset(this.toKey('jobs'), job.id, job.toData());
545 if (delay === 0) {
546 multi.lpush(this.toKey('waiting'), job.id);
547 } else {
548 const time = Date.now() + delay;
549 multi.zadd(this.toKey('delayed'), time, job.id)
550 .publish(this.toKey('earlierDelayed'), time);
551 }
552 }
553 } else {
554 job.status = 'succeeded';
555 if (this.settings.removeOnSuccess) {
556 multi.hdel(this.toKey('jobs'), job.id);
557 } else {
558 multi.hset(this.toKey('jobs'), job.id, job.toData());
559 multi.sadd(this.toKey('succeeded'), job.id);
560 }
561 }
562
563 if (this.settings.sendEvents) {
564 multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
565 }
566
567 const result = err || data;
568
569 const promise = helpers.deferred();
570 multi.exec(promise.defer());
571
572 return promise.then(() => [status, result]);
573 }
574
575 process(concurrency, handler) {
576 if (!this.settings.isWorker) {
577 throw new Error('Cannot call Queue#process on a non-worker');
578 }
579
580 if (this.handler) {
581 throw new Error('Cannot call Queue#process twice');
582 }
583
584 if (this.paused) {
585 throw new Error('closed');
586 }
587
588 if (typeof concurrency === 'function') {
589 handler = concurrency;
590 concurrency = defaults['#process'].concurrency;
591 }
592
593 // If the handler throws a synchronous exception (only applicable to
594 // non-`async` functions), catch it, and fail the job.
595 const catchExceptions = true;
596 this.handler = helpers.wrapAsync(handler, catchExceptions);
597 this.running = 0;
598 this.queued = 1;
599 this.concurrency = concurrency;
600
601 const jobTick = () => {
602 if (this.paused) {
603 this.queued -= 1;
604 return;
605 }
606
607 // invariant: in this code path, this.running < this.concurrency, always
608 // after spoolup, this.running + this.queued === this.concurrency
609 this._getNextJob().then((job) => {
610 // We're shutting down.
611 if (this.paused) {
612 // This job will get picked up later as a stalled job if we happen to
613 // get here. We can't easily process this job because at this point
614 // Queue#close has already captured the activeJobs set in a
615 // Promise.all call, and we'd prefer to delay a job than half-process
616 // it.
617 this.queued -= 1;
618 return;
619 }
620
621 this.running += 1;
622 this.queued -= 1;
623 if (this.running + this.queued < this.concurrency) {
624 this.queued += 1;
625 setImmediate(jobTick);
626 }
627
628 if (!job) {
629 // Per comment in Queue#_waitForJob, this branch is possible when the
630 // job is removed before processing can take place, but after being
631 // initially acquired.
632 setImmediate(jobTick);
633 return;
634 }
635
636 return this._runJob(job).catch((err) => {
637 this.emit('error', err);
638 }).then((results) => {
639 this.running -= 1;
640 this.queued += 1;
641
642 setImmediate(jobTick);
643
644 /* istanbul ignore else */
645 if (results) {
646 const status = results[0], result = results[1];
647 this.emit(status, job, result);
648 }
649 });
650 }, (err) => {
651 setImmediate(jobTick);
652 throw err;
653 }).catch((err) => this.emit('error', err));
654 };
655
656 this.checkStalledJobs(jobTick);
657 this._activateDelayed();
658
659 return this;
660 }
661
662 _doStalledJobCheck() {
663 return this._evalScript('checkStalledJobs', 4, this.toKey('stallBlock'),
664 this.toKey('stalling'), this.toKey('waiting'), this.toKey('active'),
665 this.settings.stallInterval)
666 .then((stalled) => {
667 for (let jobId of stalled) {
668 this.emit('stalled', jobId);
669 }
670 return stalled.length;
671 });
672 }
673
674 _checkStalledJobs(interval, cb) {
675 const promise = this._doStalledJobCheck();
676 if (cb) helpers.asCallback(promise, cb);
677
678 if (interval && !this.checkTimer) {
679 promise.then(() => {
680 if (this.checkTimer || this.paused) return;
681 this.checkTimer = setTimeout(() => {
682 // The checkTimer is removed when Queue#close is called, so we don't
683 // need to check for it here.
684 this.checkTimer = null;
685 const postStalled = this._checkStalledJobs(interval, cb);
686 // If it's not the first call, and a callback is not defined, then we
687 // must emit errors to avoid unnecessary unhandled rejections.
688 /* istanbul ignore next: these are only redis and connection errors */
689 postStalled.catch(cb ? (err) => this.emit('error', err) : null);
690 }, interval);
691 });
692 }
693
694 return promise;
695 }
696
697 /**
698 * Check for stalled jobs.
699 *
700 * @param {Number=} interval The interval on which to check for stalled jobs.
701 * This should be set to half the stallInterval setting, to avoid
702 * unnecessary work.
703 * @param {Function=} callback Called with the equivalent of the returned
704 * promise. If interval is provided, the callback will be invoked after each
705 * invocation of checkStalledJobs.
706 * @return {Promise<Number>} Resolves to the number of stalled jobs the
707 * function found.
708 */
709 checkStalledJobs(interval, cb) {
710 if (typeof interval === 'function') {
711 cb = interval;
712 interval = null;
713 } else if (!Number.isSafeInteger(interval)) {
714 interval = null;
715 }
716 return this._checkStalledJobs(interval, cb);
717 }
718
719 _activateDelayed() {
720 if (!this.settings.activateDelayedJobs) return;
721 this._evalScript('raiseDelayedJobs', 2,
722 this.toKey('delayed'),
723 this.toKey('waiting'),
724 Date.now(), this.settings.delayedDebounce)
725 .then((results) => {
726 const numRaised = results[0], nextOpportunity = results[1];
727 if (numRaised) {
728 this.emit('raised jobs', numRaised);
729 }
730 this._delayedTimer.schedule(parseInt(nextOpportunity, 10));
731 }, /* istanbul ignore next */ (err) => {
732 // Handle aborted redis connections.
733 if (redis.isAbortError(err)) {
734 if (this.paused) return;
735 // Retry.
736 return this._activateDelayed();
737 }
738 this.emit('error', err);
739 });
740 }
741
742 toKey(str) {
743 return this.settings.keyPrefix + str;
744 }
745
746 /**
747 * Evaluate the named script, return a promise with its results.
748 *
749 * Same parameter list/syntax as evalsha, except for the name.
750 */
751 _evalScript(name) {
752 // Avoid deoptimization by leaking arguments: store them directly in an
753 // array instead of passing them to a helper.
754 const args = new Array(arguments.length);
755 // Skip the first because it's just the name, and it'll get filled in within
756 // the promise.
757 for (let i = 1; i < arguments.length; ++i) {
758 args[i] = arguments[i];
759 }
760
761 return this._commandable().then((client) => {
762 // Get the sha for the script after we're ready to avoid a race condition
763 // with the lua script loader.
764 args[0] = lua.shas[name];
765
766 const promise = helpers.deferred();
767 args.push(promise.defer());
768 client.evalsha.apply(client, args);
769 return promise;
770 });
771 }
772}
773
774module.exports = Queue;