UNPKG

4.63 kBJavaScriptView Raw
1
2var util = require('util');
3var EE = require('events').EventEmitter;
4var ETA = require('node-eta');
5
6function Worker(opts) {
7 this.fn = opts.fn;
8 this.batch = opts.batch;
9 this.single = opts.single;
10 this.active = false;
11 this.cancelled = false;
12 this.failTaskOnProcessException = opts.failTaskOnProcessException;
13}
14
15util.inherits(Worker, EE);
16
17Worker.prototype.setup = function () {
18 var self = this;
19
20 // Internal
21 self._taskIds = Object.keys(self.batch);
22 self._process = {};
23 self._waiting = {};
24 self._eta = new ETA();
25
26 // Task counts
27 self.counts = {
28 finished: 0,
29 failed: 0,
30 completed: 0,
31 total: self._taskIds.length,
32 };
33
34 // Progress
35 self.status = 'ready';
36 self.progress = {
37 tasks: {},
38 complete: 0,
39 total: self._taskIds.length,
40 eta: '',
41 };
42
43 // Setup
44 self._taskIds.forEach(function (taskId, id) {
45 self._waiting[id] = true;
46 self.progress.tasks[id] = {
47 pct: 0,
48 complete: 0,
49 total: 1,
50 }
51 })
52}
53
54Worker.prototype.start = function () {
55 var self = this;
56 if (self.active) return;
57
58 self.setup();
59 self._eta.count = self.progress.total;
60 self._eta.start();
61
62 self.active = true;
63 self.status = 'in-progress';
64 var tasks = self._taskIds.map(function (taskId) { return self.batch[taskId] });
65 if (self.single) {
66 tasks = tasks[0]
67 }
68 try {
69 self._process = self.fn.call(self, tasks, function (err, result) {
70 if (!self.active) return;
71 if (err) {
72 self.failedBatch(err.message || err);
73 } else {
74 self.finishBatch(result);
75 }
76 })
77 } catch (err) {
78 if (self.failTaskOnProcessException) {
79 self.failedBatch(err.message || err);
80 } else {
81 throw new Error(err);
82 }
83 }
84 self._process = self._process || {};
85}
86
87Worker.prototype.end = function () {
88 if (!this.active) return;
89 this.status = 'finished';
90 this.active = false;
91 this.emit('end');
92}
93
94Worker.prototype.resume = function () {
95 if (typeof this._process.resume === 'function') {
96 this._process.resume();
97 }
98 this.status = 'in-progress';
99}
100
101Worker.prototype.pause = function () {
102 if (typeof this._process.pause === 'function') {
103 this._process.pause();
104 }
105 this.status = 'paused';
106}
107
108Worker.prototype.cancel = function () {
109 this.cancelled = true;
110 if (typeof this._process.cancel === 'function') {
111 this._process.cancel();
112 }
113 if (typeof this._process.abort === 'function') {
114 this._process.abort();
115 }
116 this.failedBatch('cancelled');
117}
118
119Worker.prototype.failedBatch = function (msg) {
120 var self = this;
121 if (!self.active) return;
122 Object.keys(self._waiting).forEach(function (id) {
123 if (!self._waiting[id]) return;
124 self.failedTask(id, msg);
125 })
126 self.emit('failed', msg);
127 self.end();
128}
129
130Worker.prototype.failedTask = function (id, msg) {
131 var self = this;
132 if (!self.active) return;
133 if (self._waiting[id]) {
134 self._waiting[id] = false;
135 self.counts.failed++;
136 self.counts.completed++;
137 self.emit('task_failed', id, msg);
138 }
139}
140
141Worker.prototype.finishBatch = function (result) {
142 var self = this;
143 if (!self.active) return;
144 Object.keys(self._waiting).forEach(function (id) {
145 if (!self._waiting[id]) return;
146 self.finishTask(id, result);
147 })
148 self.emit('finish', result);
149 self.end();
150}
151
152Worker.prototype.finishTask = function (id, result) {
153 var self = this;
154 if (!self.active) return;
155 if (self._waiting[id]) {
156 self._waiting[id] = false;
157 self.counts.finished++;
158 self.counts.completed++;
159 self.emit('task_finish', id, result);
160 }
161}
162
163Worker.prototype.progressBatch = function (complete, total, msg) {
164 var self = this;
165 if (!self.active) return;
166 Object.keys(self._waiting).forEach(function (id) {
167 if (!self._waiting[id]) return;
168 self.progressTask(id, complete, total);
169 })
170 self.progress.complete = 0;
171 self._taskIds.forEach(function (taskId, id) {
172 self.progress.complete += self.progress.tasks[id].pct;
173 })
174 self._eta.done = self.progress.complete;
175 self.progress.eta = self._eta.format('{{etah}}')
176 self.progress.message = msg || '';
177 self.emit('progress', self.progress);
178}
179
180Worker.prototype.progressTask = function (id, complete, total, msg) {
181 var self = this;
182 if (!self.active) return;
183 if (self._waiting[id]) {
184 self.progress.tasks[id].complete = complete;
185 self.progress.tasks[id].total = self.progress.tasks[id].total || total;
186 self.progress.tasks[id].message = self.progress.tasks[id].message || msg;
187 self.progress.tasks[id].pct = Math.max(0, Math.min(1, complete/total));
188 self.emit('task_progress', id, self.progress.tasks[id]);
189 }
190}
191
192module.exports = Worker;