1 |
|
2 | var util = require('util');
|
3 | var EE = require('events').EventEmitter;
|
4 | var ETA = require('node-eta');
|
5 |
|
6 | function Worker(opts) {
|
7 | this.fn = opts.fn;
|
8 | this.batch = opts.batch;
|
9 | this.single = opts.single;
|
10 | this.active = false;
|
11 | this.cancelled = false;
|
12 | this.failTaskOnProcessException = opts.failTaskOnProcessException;
|
13 | }
|
14 |
|
15 | util.inherits(Worker, EE);
|
16 |
|
17 | Worker.prototype.setup = function () {
|
18 | var self = this;
|
19 |
|
20 |
|
21 | self._taskIds = Object.keys(self.batch);
|
22 | self._process = {};
|
23 | self._waiting = {};
|
24 | self._eta = new ETA();
|
25 |
|
26 |
|
27 | self.counts = {
|
28 | finished: 0,
|
29 | failed: 0,
|
30 | completed: 0,
|
31 | total: self._taskIds.length,
|
32 | };
|
33 |
|
34 |
|
35 | self.status = 'ready';
|
36 | self.progress = {
|
37 | tasks: {},
|
38 | complete: 0,
|
39 | total: self._taskIds.length,
|
40 | eta: '',
|
41 | };
|
42 |
|
43 |
|
44 | self._taskIds.forEach(function (taskId, id) {
|
45 | self._waiting[id] = true;
|
46 | self.progress.tasks[id] = {
|
47 | pct: 0,
|
48 | complete: 0,
|
49 | total: 1,
|
50 | }
|
51 | })
|
52 | }
|
53 |
|
54 | Worker.prototype.start = function () {
|
55 | var self = this;
|
56 | if (self.active) return;
|
57 |
|
58 | self.setup();
|
59 | self._eta.count = self.progress.total;
|
60 | self._eta.start();
|
61 |
|
62 | self.active = true;
|
63 | self.status = 'in-progress';
|
64 | var tasks = self._taskIds.map(function (taskId) { return self.batch[taskId] });
|
65 | if (self.single) {
|
66 | tasks = tasks[0]
|
67 | }
|
68 | try {
|
69 | self._process = self.fn.call(self, tasks, function (err, result) {
|
70 | if (!self.active) return;
|
71 | if (err) {
|
72 | self.failedBatch(err.message || err);
|
73 | } else {
|
74 | self.finishBatch(result);
|
75 | }
|
76 | })
|
77 | } catch (err) {
|
78 | if (self.failTaskOnProcessException) {
|
79 | self.failedBatch(err.message || err);
|
80 | } else {
|
81 | throw new Error(err);
|
82 | }
|
83 | }
|
84 | self._process = self._process || {};
|
85 | }
|
86 |
|
87 | Worker.prototype.end = function () {
|
88 | if (!this.active) return;
|
89 | this.status = 'finished';
|
90 | this.active = false;
|
91 | this.emit('end');
|
92 | }
|
93 |
|
94 | Worker.prototype.resume = function () {
|
95 | if (typeof this._process.resume === 'function') {
|
96 | this._process.resume();
|
97 | }
|
98 | this.status = 'in-progress';
|
99 | }
|
100 |
|
101 | Worker.prototype.pause = function () {
|
102 | if (typeof this._process.pause === 'function') {
|
103 | this._process.pause();
|
104 | }
|
105 | this.status = 'paused';
|
106 | }
|
107 |
|
108 | Worker.prototype.cancel = function () {
|
109 | this.cancelled = true;
|
110 | if (typeof this._process.cancel === 'function') {
|
111 | this._process.cancel();
|
112 | }
|
113 | if (typeof this._process.abort === 'function') {
|
114 | this._process.abort();
|
115 | }
|
116 | this.failedBatch('cancelled');
|
117 | }
|
118 |
|
119 | Worker.prototype.failedBatch = function (msg) {
|
120 | var self = this;
|
121 | if (!self.active) return;
|
122 | Object.keys(self._waiting).forEach(function (id) {
|
123 | if (!self._waiting[id]) return;
|
124 | self.failedTask(id, msg);
|
125 | })
|
126 | self.emit('failed', msg);
|
127 | self.end();
|
128 | }
|
129 |
|
130 | Worker.prototype.failedTask = function (id, msg) {
|
131 | var self = this;
|
132 | if (!self.active) return;
|
133 | if (self._waiting[id]) {
|
134 | self._waiting[id] = false;
|
135 | self.counts.failed++;
|
136 | self.counts.completed++;
|
137 | self.emit('task_failed', id, msg);
|
138 | }
|
139 | }
|
140 |
|
141 | Worker.prototype.finishBatch = function (result) {
|
142 | var self = this;
|
143 | if (!self.active) return;
|
144 | Object.keys(self._waiting).forEach(function (id) {
|
145 | if (!self._waiting[id]) return;
|
146 | self.finishTask(id, result);
|
147 | })
|
148 | self.emit('finish', result);
|
149 | self.end();
|
150 | }
|
151 |
|
152 | Worker.prototype.finishTask = function (id, result) {
|
153 | var self = this;
|
154 | if (!self.active) return;
|
155 | if (self._waiting[id]) {
|
156 | self._waiting[id] = false;
|
157 | self.counts.finished++;
|
158 | self.counts.completed++;
|
159 | self.emit('task_finish', id, result);
|
160 | }
|
161 | }
|
162 |
|
163 | Worker.prototype.progressBatch = function (complete, total, msg) {
|
164 | var self = this;
|
165 | if (!self.active) return;
|
166 | Object.keys(self._waiting).forEach(function (id) {
|
167 | if (!self._waiting[id]) return;
|
168 | self.progressTask(id, complete, total);
|
169 | })
|
170 | self.progress.complete = 0;
|
171 | self._taskIds.forEach(function (taskId, id) {
|
172 | self.progress.complete += self.progress.tasks[id].pct;
|
173 | })
|
174 | self._eta.done = self.progress.complete;
|
175 | self.progress.eta = self._eta.format('{{etah}}')
|
176 | self.progress.message = msg || '';
|
177 | self.emit('progress', self.progress);
|
178 | }
|
179 |
|
180 | Worker.prototype.progressTask = function (id, complete, total, msg) {
|
181 | var self = this;
|
182 | if (!self.active) return;
|
183 | if (self._waiting[id]) {
|
184 | self.progress.tasks[id].complete = complete;
|
185 | self.progress.tasks[id].total = self.progress.tasks[id].total || total;
|
186 | self.progress.tasks[id].message = self.progress.tasks[id].message || msg;
|
187 | self.progress.tasks[id].pct = Math.max(0, Math.min(1, complete/total));
|
188 | self.emit('task_progress', id, self.progress.tasks[id]);
|
189 | }
|
190 | }
|
191 |
|
192 | module.exports = Worker;
|