1 | 'use strict';
|
2 |
|
3 | const Emitter = require('events').EventEmitter;
|
4 |
|
5 | const helpers = require('./helpers');
|
6 | const strategies = require('./backoff');
|
7 |
|
8 | class Job extends Emitter {
|
9 | constructor(queue, jobId, data, options) {
|
10 | super();
|
11 |
|
12 | this.queue = queue;
|
13 | this.id = jobId;
|
14 | this.progress = 0;
|
15 | this.data = data || {};
|
16 | this.options = options || {};
|
17 | this.options.timestamp = this.options.timestamp || Date.now();
|
18 | this.options.stacktraces = this.options.stacktraces || [];
|
19 | this.status = 'created';
|
20 | }
|
21 |
|
22 | static fromId(queue, jobId, cb) {
|
23 | const promise = queue._commandable().then((client) => {
|
24 | const jobPromise = helpers.deferred();
|
25 | client.hget(queue.toKey('jobs'), jobId, jobPromise.defer());
|
26 | return jobPromise;
|
27 | }).then((data) => data ? Job.fromData(queue, jobId, data) : null);
|
28 |
|
29 | if (cb) helpers.asCallback(promise, cb);
|
30 | return promise;
|
31 | }
|
32 |
|
33 | static fromData(queue, jobId, data) {
|
34 |
|
35 | data = JSON.parse(data);
|
36 | const job = new Job(queue, jobId, data.data, data.options);
|
37 | job.status = data.status;
|
38 | return job;
|
39 | }
|
40 |
|
41 | toData() {
|
42 | return JSON.stringify({
|
43 | data: this.data,
|
44 | options: this.options,
|
45 | status: this.status
|
46 | });
|
47 | }
|
48 |
|
49 | save(cb) {
|
50 | const toKey = this.queue.toKey.bind(this.queue);
|
51 |
|
52 | let promise;
|
53 | if (this.options.delay) {
|
54 | promise = this.queue._evalScript('addDelayedJob', 4,
|
55 | toKey('id'), toKey('jobs'), toKey('delayed'), toKey('earlierDelayed'),
|
56 | this.id || '', this.toData(), this.options.delay);
|
57 |
|
58 | if (this.queue.settings.activateDelayedJobs) {
|
59 | promise = promise.then((jobId) => {
|
60 |
|
61 | if (jobId) {
|
62 | this.queue._delayedTimer.schedule(this.options.delay);
|
63 | }
|
64 | return jobId;
|
65 | });
|
66 | }
|
67 | } else {
|
68 | promise = this.queue._evalScript('addJob', 3,
|
69 | toKey('id'), toKey('jobs'), toKey('waiting'),
|
70 | this.id || '', this.toData());
|
71 | }
|
72 |
|
73 | promise = promise.then((jobId) => {
|
74 | this.id = jobId;
|
75 |
|
76 | if (jobId && this.queue.settings.storeJobs) {
|
77 | this.queue.jobs.set(jobId, this);
|
78 | }
|
79 | return this;
|
80 | });
|
81 |
|
82 | if (cb) helpers.asCallback(promise, cb);
|
83 | return promise;
|
84 | }
|
85 |
|
86 | setId(id) {
|
87 | this.id = id;
|
88 | return this;
|
89 | }
|
90 |
|
91 | retries(n) {
|
92 | if (n < 0) {
|
93 | throw new Error('Retries cannot be negative');
|
94 | }
|
95 | this.options.retries = n;
|
96 | return this;
|
97 | }
|
98 |
|
99 | delayUntil(timestamp) {
|
100 |
|
101 | if (timestamp && typeof timestamp.getTime === 'function') {
|
102 | timestamp = timestamp.getTime();
|
103 | } else {
|
104 | timestamp = parseInt(timestamp, 10);
|
105 | }
|
106 | if (isNaN(timestamp) || timestamp < 0) {
|
107 | throw new Error('invalid delay timestamp');
|
108 | }
|
109 | if (timestamp > Date.now()) {
|
110 | this.options.delay = timestamp;
|
111 | }
|
112 | return this;
|
113 | }
|
114 |
|
115 | timeout(ms) {
|
116 | if (ms < 0) {
|
117 | throw new Error('Timeout cannot be negative');
|
118 | }
|
119 | this.options.timeout = ms;
|
120 | return this;
|
121 | }
|
122 |
|
123 | backoff(strategy, delay) {
|
124 | if (!strategies.has(strategy)) {
|
125 | throw new Error('unknown strategy');
|
126 | }
|
127 | if (!Number.isSafeInteger(delay) || delay <= 0) {
|
128 | throw new Error('delay must be a positive integer');
|
129 | }
|
130 | this.options.backoff = {
|
131 | strategy,
|
132 | delay
|
133 | };
|
134 | return this;
|
135 | }
|
136 |
|
137 | reportProgress(progress, cb) {
|
138 |
|
139 |
|
140 | progress = parseInt(progress, 10);
|
141 |
|
142 | let promise;
|
143 | if (progress >= 0 && progress <= 100) {
|
144 | this.progress = progress;
|
145 | promise = this.queue._commandable().then((client) => {
|
146 | const publishPromise = helpers.deferred();
|
147 | const payload = JSON.stringify({
|
148 | id: this.id,
|
149 | event: 'progress',
|
150 | data: progress
|
151 | });
|
152 | client.publish(this.queue.toKey('events'), payload,
|
153 | publishPromise.defer());
|
154 | return publishPromise;
|
155 | });
|
156 | } else {
|
157 | promise = Promise.reject(new Error('Progress must be between 0 and 100'));
|
158 | }
|
159 |
|
160 | if (cb) helpers.asCallback(promise, cb);
|
161 | return promise;
|
162 | }
|
163 |
|
164 | remove(cb) {
|
165 | const promise = this.queue.removeJob(this.id).then(() => this);
|
166 | if (cb) helpers.asCallback(promise, cb);
|
167 | return promise;
|
168 | }
|
169 |
|
170 | retry(cb) {
|
171 | const promise = this.queue._commandable().then((client) => {
|
172 | const retryPromise = helpers.deferred();
|
173 | client.multi()
|
174 | .srem(this.queue.toKey('failed'), this.id)
|
175 | .lpush(this.queue.toKey('waiting'), this.id)
|
176 | .exec(retryPromise.defer());
|
177 | return retryPromise;
|
178 | });
|
179 |
|
180 | if (cb) helpers.asCallback(promise, cb);
|
181 | return promise;
|
182 | }
|
183 |
|
184 | isInSet(set, cb) {
|
185 | const promise = this.queue._commandable().then((client) => {
|
186 | const memberPromise = helpers.deferred();
|
187 | client.sismember(this.queue.toKey(set), this.id, memberPromise.defer());
|
188 | return memberPromise;
|
189 | }).then((result) => result === 1);
|
190 |
|
191 | if (cb) helpers.asCallback(promise, cb);
|
192 | return promise;
|
193 | }
|
194 | }
|
195 |
|
196 | module.exports = Job;
|