UNPKG

5.26 kBJavaScriptView Raw
1'use strict';
2
3const Emitter = require('events').EventEmitter;
4
5const helpers = require('./helpers');
6const strategies = require('./backoff');
7
8class Job extends Emitter {
9 constructor(queue, jobId, data, options) {
10 super();
11
12 this.queue = queue;
13 this.id = jobId;
14 this.progress = 0;
15 this.data = data || {};
16 this.options = options || {};
17 this.options.timestamp = this.options.timestamp || Date.now();
18 this.options.stacktraces = this.options.stacktraces || [];
19 this.status = 'created';
20 }
21
22 static fromId(queue, jobId, cb) {
23 const promise = queue._commandable().then((client) => {
24 const jobPromise = helpers.deferred();
25 client.hget(queue.toKey('jobs'), jobId, jobPromise.defer());
26 return jobPromise;
27 }).then((data) => data ? Job.fromData(queue, jobId, data) : null);
28
29 if (cb) helpers.asCallback(promise, cb);
30 return promise;
31 }
32
33 static fromData(queue, jobId, data) {
34 // no need for try-catch here since we made the JSON ourselves in Job#toData
35 data = JSON.parse(data);
36 const job = new Job(queue, jobId, data.data, data.options);
37 job.status = data.status;
38 return job;
39 }
40
41 toData() {
42 return JSON.stringify({
43 data: this.data,
44 options: this.options,
45 status: this.status
46 });
47 }
48
49 save(cb) {
50 const toKey = this.queue.toKey.bind(this.queue);
51
52 let promise;
53 if (this.options.delay) {
54 promise = this.queue._evalScript('addDelayedJob', 4,
55 toKey('id'), toKey('jobs'), toKey('delayed'), toKey('earlierDelayed'),
56 this.id || '', this.toData(), this.options.delay);
57
58 if (this.queue.settings.activateDelayedJobs) {
59 promise = promise.then((jobId) => {
60 // Only reschedule if the job was actually created.
61 if (jobId) {
62 this.queue._delayedTimer.schedule(this.options.delay);
63 }
64 return jobId;
65 });
66 }
67 } else {
68 promise = this.queue._evalScript('addJob', 3,
69 toKey('id'), toKey('jobs'), toKey('waiting'),
70 this.id || '', this.toData());
71 }
72
73 promise = promise.then((jobId) => {
74 this.id = jobId;
75 // If the jobId is not null, then store the job in the job map.
76 if (jobId && this.queue.settings.storeJobs) {
77 this.queue.jobs.set(jobId, this);
78 }
79 return this;
80 });
81
82 if (cb) helpers.asCallback(promise, cb);
83 return promise;
84 }
85
86 setId(id) {
87 this.id = id;
88 return this;
89 }
90
91 retries(n) {
92 if (n < 0) {
93 throw new Error('Retries cannot be negative');
94 }
95 this.options.retries = n;
96 return this;
97 }
98
99 delayUntil(timestamp) {
100 // Get the timestamp from Date objects.
101 if (timestamp && typeof timestamp.getTime === 'function') {
102 timestamp = timestamp.getTime();
103 } else {
104 timestamp = parseInt(timestamp, 10);
105 }
106 if (isNaN(timestamp) || timestamp < 0) {
107 throw new Error('invalid delay timestamp');
108 }
109 if (timestamp > Date.now()) {
110 this.options.delay = timestamp;
111 }
112 return this;
113 }
114
115 timeout(ms) {
116 if (ms < 0) {
117 throw new Error('Timeout cannot be negative');
118 }
119 this.options.timeout = ms;
120 return this;
121 }
122
123 backoff(strategy, delay) {
124 if (!strategies.has(strategy)) {
125 throw new Error('unknown strategy');
126 }
127 if (!Number.isSafeInteger(delay) || delay <= 0) {
128 throw new Error('delay must be a positive integer');
129 }
130 this.options.backoff = {
131 strategy,
132 delay
133 };
134 return this;
135 }
136
137 reportProgress(progress, cb) {
138 // right now we just send the pubsub event
139 // might consider also updating the job hash for persistence
140 progress = parseInt(progress, 10);
141
142 let promise;
143 if (progress >= 0 && progress <= 100) {
144 this.progress = progress;
145 promise = this.queue._commandable().then((client) => {
146 const publishPromise = helpers.deferred();
147 const payload = JSON.stringify({
148 id: this.id,
149 event: 'progress',
150 data: progress
151 });
152 client.publish(this.queue.toKey('events'), payload,
153 publishPromise.defer());
154 return publishPromise;
155 });
156 } else {
157 promise = Promise.reject(new Error('Progress must be between 0 and 100'));
158 }
159
160 if (cb) helpers.asCallback(promise, cb);
161 return promise;
162 }
163
164 remove(cb) {
165 const promise = this.queue.removeJob(this.id).then(() => this);
166 if (cb) helpers.asCallback(promise, cb);
167 return promise;
168 }
169
170 retry(cb) {
171 const promise = this.queue._commandable().then((client) => {
172 const retryPromise = helpers.deferred();
173 client.multi()
174 .srem(this.queue.toKey('failed'), this.id)
175 .lpush(this.queue.toKey('waiting'), this.id)
176 .exec(retryPromise.defer());
177 return retryPromise;
178 });
179
180 if (cb) helpers.asCallback(promise, cb);
181 return promise;
182 }
183
184 isInSet(set, cb) {
185 const promise = this.queue._commandable().then((client) => {
186 const memberPromise = helpers.deferred();
187 client.sismember(this.queue.toKey(set), this.id, memberPromise.defer());
188 return memberPromise;
189 }).then((result) => result === 1);
190
191 if (cb) helpers.asCallback(promise, cb);
192 return promise;
193 }
194}
195
196module.exports = Job;