UNPKG

22.1 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = void 0;
7
8var _events = _interopRequireDefault(require("events"));
9
10var _autoBind = _interopRequireDefault(require("auto-bind"));
11
12var _AsyncQueue = _interopRequireWildcard(require("./AsyncQueue"));
13
14var _asyncSyncUtils = require("./asyncSyncUtils");
15
16var _errorUtils = require("./errorUtils");
17
18function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { var desc = Object.defineProperty && Object.getOwnPropertyDescriptor ? Object.getOwnPropertyDescriptor(obj, key) : {}; if (desc.get || desc.set) { Object.defineProperty(newObj, key, desc); } else { newObj[key] = obj[key]; } } } } newObj.default = obj; return newObj; } }
19
20function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
21
22function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
23
24/**
25 * A thread-pool abstraction for ES6 async operations",
26 *
27 * @export
28 * @class ThreadPool
29 * @extends {EventEmitter}
30 */
31class ThreadPool extends _events.default {
32 static async run(options) {
33 options.errorHandler = options.errorHandler || (err => {
34 throw err;
35 });
36
37 const tp = new ThreadPool(options);
38
39 if (options.items) {
40 await tp.queueItems(options.items);
41 }
42
43 return await tp.runAllQueued();
44 }
45
46 static async all(items, task, threads) {
47 const tp = new ThreadPool({
48 task: task,
49 threads: threads,
50 items: items
51 });
52
53 if (items) {
54 await tp.queueItems(items);
55 }
56
57 return await tp.runAllQueued();
58 }
59 /**
60 * Creates an instance of ThreadPool
61 */
62
63
64 constructor(options) {
65 super();
66
67 _defineProperty(this, "queuedCount", 0);
68
69 _defineProperty(this, "startedCount", 0);
70
71 _defineProperty(this, "endedCount", 0);
72
73 _defineProperty(this, "_options", void 0);
74
75 _defineProperty(this, "_errorHandler", void 0);
76
77 _defineProperty(this, "_uncaughtErrors", []);
78
79 _defineProperty(this, "_closed", false);
80
81 _defineProperty(this, "_queuedTasks", void 0);
82
83 _defineProperty(this, "_threadsSemaphore", void 0);
84
85 _defineProperty(this, "_completeEvent", void 0);
86
87 _defineProperty(this, "_allTasksCompleteOrSomeFailedEvent", void 0);
88
89 _defineProperty(this, "_res", []);
90
91 this._options = options;
92 this._options.threads = this._options.threads || Infinity;
93
94 this._errorHandler = this._options.errorHandler || (err => {
95 throw err;
96 }); // eslint-disable-line no-console
97
98
99 this._queuedTasks = new _AsyncQueue.default({
100 name: options.name,
101 maxSize: options.queueMaxSize
102 });
103 this._threadsSemaphore = new _asyncSyncUtils.Semaphore(this._options.threads);
104 this._completeEvent = new _asyncSyncUtils.OneTimeBroadcastEvent(false);
105 this._allTasksCompleteOrSomeFailedEvent = new _asyncSyncUtils.OneTimeBroadcastEvent(false);
106 (0, _autoBind.default)(this);
107 }
108 /**
109 * Queues an itme from the ThreadPool
110 */
111
112
113 async queueItem(item) {
114 if (this._closed) throw new Error(`Trying to queue a job to a closed ThreadPool`);
115 const index = this.queuedCount++;
116 await this._queuedTasks.enqueue({
117 func: async () => this._res[index] = await this._options.task(item),
118 index: index
119 });
120 }
121 /**
122 * Queues an itme from the ThreadPool
123 */
124
125
126 async queueItems(queueItem) {
127 for (const item of queueItem) {
128 await this.queueItem(item);
129 }
130 }
131 /**
132 * Starts executing all queued tasks
133 *
134 * This function should be awaited - it will return after the ThreadPool has been closed and all it's tasks completed, or after a task threw an error.
135 */
136
137
138 async run() {
139 try {
140 while (true) {
141 // wait for an available task
142 // TODO: also wait for errors here?
143 let task;
144
145 try {
146 task = await this._queuedTasks.dequeue();
147 } catch (err) {
148 if (err instanceof _AsyncQueue.QueueClosedError) break;
149 throw err;
150 } // wait for an available 'thread'
151
152
153 await this._threadsSemaphore.enter();
154
155 if (this._uncaughtErrors.length) {
156 break;
157 }
158
159 this._allTasksCompleteOrSomeFailedEvent.reset(); // NOTE: no await
160
161
162 this._runTask(task);
163 } // wait for completion
164
165
166 if (this.startedCount > 0) {
167 await this._allTasksCompleteOrSomeFailedEvent.wait();
168 }
169
170 this._throwUncaughtErrors();
171 } finally {
172 this._completeEvent.signal();
173 }
174 }
175 /**
176 * Starts executing all queued tasks
177 *
178 * This function should not be awaited - it will return immediatly
179 */
180
181
182 startRun() {
183 (async () => {
184 try {
185 await this.run();
186 } catch (err) {} // eslint-disable-line no-empty
187
188 })();
189 }
190 /**
191 * Closes the ThreadPool for further task queueing, the ThreadPool's completion can be awaited afte'r it's called
192 */
193
194
195 close() {
196 this._closed = true;
197
198 this._queuedTasks.close(); // NOTE: NO AWAIT
199 // TODO: await completion
200
201 }
202 /**
203 * Closes the ThreadPool, runs it's tasks and awaits their completion
204 */
205
206
207 async runAllQueued() {
208 this.close();
209 await this.run();
210 return this._res;
211 }
212 /**
213 * Awaits the closing and completion of all ThreadPool tasks
214 */
215
216
217 async waitComplete() {
218 await this._completeEvent.wait();
219
220 this._throwUncaughtErrors();
221
222 return this._res;
223 }
224 /**
225 * Closes the ThreadPool and awaits the running and completion of all ThreadPool tasks
226 */
227
228
229 async closeAndWaitComplete() {
230 this.close();
231 return await this.waitComplete();
232 }
233 /****************** privates ******************/
234
235
236 _throwUncaughtErrors() {
237 if (this._uncaughtErrors.length > 0) {
238 throw new _errorUtils.ExtendedError(`Errors were thrown during execution of ThreadPool`, {
239 threadPoolName: this._options.name,
240 errorCount: this._uncaughtErrors.length,
241 errorMessages: this._uncaughtErrors.map(e => e.message),
242 uncaughtErrors: this._uncaughtErrors
243 });
244 }
245 }
246
247 async _runTask(task) {
248 try {
249 this.startedCount++;
250 await task.func();
251 } catch (err) {
252 try {
253 this._errorHandler(err);
254 } catch (err2) {
255 this._uncaughtErrors.push(err2);
256
257 await this._allTasksCompleteOrSomeFailedEvent.signal();
258 }
259 } finally {
260 this.endedCount++; // fire 'progress' event
261
262 this.emit('progress', {
263 endedCount: this.endedCount
264 });
265
266 this._threadsSemaphore.exit();
267
268 if (this._threadsSemaphore.takenCount === 0) {
269 this._allTasksCompleteOrSomeFailedEvent.signal();
270 }
271 }
272 }
273
274}
275
276exports.default = ThreadPool;
277//# sourceMappingURL=data:application/json;charset=utf-8;base64,
\No newline at end of file