UNPKG

12.6 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4 return new (P || (P = Promise))(function (resolve, reject) {
5 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8 step((generator = generator.apply(thisArg, _arguments || [])).next());
9 });
10};
11var __importDefault = (this && this.__importDefault) || function (mod) {
12 return (mod && mod.__esModule) ? mod : { "default": mod };
13};
14Object.defineProperty(exports, "__esModule", { value: true });
15exports.Pool = exports.Thread = exports.PoolEventType = void 0;
16const debug_1 = __importDefault(require("debug"));
17const observable_fns_1 = require("observable-fns");
18const ponyfills_1 = require("../ponyfills");
19const implementation_1 = require("./implementation");
20const pool_types_1 = require("./pool-types");
21Object.defineProperty(exports, "PoolEventType", { enumerable: true, get: function () { return pool_types_1.PoolEventType; } });
22const thread_1 = require("./thread");
23Object.defineProperty(exports, "Thread", { enumerable: true, get: function () { return thread_1.Thread; } });
24let nextPoolID = 1;
25function createArray(size) {
26 const array = [];
27 for (let index = 0; index < size; index++) {
28 array.push(index);
29 }
30 return array;
31}
32function delay(ms) {
33 return new Promise(resolve => setTimeout(resolve, ms));
34}
35function flatMap(array, mapper) {
36 return array.reduce((flattened, element) => [...flattened, ...mapper(element)], []);
37}
38function slugify(text) {
39 return text.replace(/\W/g, " ").trim().replace(/\s+/g, "-");
40}
41function spawnWorkers(spawnWorker, count) {
42 return createArray(count).map(() => ({
43 init: spawnWorker(),
44 runningTasks: []
45 }));
46}
47class WorkerPool {
48 constructor(spawnWorker, optionsOrSize) {
49 this.eventSubject = new observable_fns_1.Subject();
50 this.initErrors = [];
51 this.isClosing = false;
52 this.nextTaskID = 1;
53 this.taskQueue = [];
54 const options = typeof optionsOrSize === "number"
55 ? { size: optionsOrSize }
56 : optionsOrSize || {};
57 const { size = implementation_1.defaultPoolSize } = options;
58 this.debug = debug_1.default(`threads:pool:${slugify(options.name || String(nextPoolID++))}`);
59 this.options = options;
60 this.workers = spawnWorkers(spawnWorker, size);
61 this.eventObservable = observable_fns_1.multicast(observable_fns_1.Observable.from(this.eventSubject));
62 Promise.all(this.workers.map(worker => worker.init)).then(() => this.eventSubject.next({
63 type: pool_types_1.PoolEventType.initialized,
64 size: this.workers.length
65 }), error => {
66 this.debug("Error while initializing pool worker:", error);
67 this.eventSubject.error(error);
68 this.initErrors.push(error);
69 });
70 }
71 findIdlingWorker() {
72 const { concurrency = 1 } = this.options;
73 return this.workers.find(worker => worker.runningTasks.length < concurrency);
74 }
75 runPoolTask(worker, task) {
76 return __awaiter(this, void 0, void 0, function* () {
77 const workerID = this.workers.indexOf(worker) + 1;
78 this.debug(`Running task #${task.id} on worker #${workerID}...`);
79 this.eventSubject.next({
80 type: pool_types_1.PoolEventType.taskStart,
81 taskID: task.id,
82 workerID
83 });
84 try {
85 const returnValue = yield task.run(yield worker.init);
86 this.debug(`Task #${task.id} completed successfully`);
87 this.eventSubject.next({
88 type: pool_types_1.PoolEventType.taskCompleted,
89 returnValue,
90 taskID: task.id,
91 workerID
92 });
93 }
94 catch (error) {
95 this.debug(`Task #${task.id} failed`);
96 this.eventSubject.next({
97 type: pool_types_1.PoolEventType.taskFailed,
98 taskID: task.id,
99 error,
100 workerID
101 });
102 }
103 });
104 }
105 run(worker, task) {
106 return __awaiter(this, void 0, void 0, function* () {
107 const runPromise = (() => __awaiter(this, void 0, void 0, function* () {
108 const removeTaskFromWorkersRunningTasks = () => {
109 worker.runningTasks = worker.runningTasks.filter(someRunPromise => someRunPromise !== runPromise);
110 };
111 // Defer task execution by one tick to give handlers time to subscribe
112 yield delay(0);
113 try {
114 yield this.runPoolTask(worker, task);
115 }
116 finally {
117 removeTaskFromWorkersRunningTasks();
118 if (!this.isClosing) {
119 this.scheduleWork();
120 }
121 }
122 }))();
123 worker.runningTasks.push(runPromise);
124 });
125 }
126 scheduleWork() {
127 this.debug(`Attempt de-queueing a task in order to run it...`);
128 const availableWorker = this.findIdlingWorker();
129 if (!availableWorker)
130 return;
131 const nextTask = this.taskQueue.shift();
132 if (!nextTask) {
133 this.debug(`Task queue is empty`);
134 this.eventSubject.next({ type: pool_types_1.PoolEventType.taskQueueDrained });
135 return;
136 }
137 this.run(availableWorker, nextTask);
138 }
139 taskCompletion(taskID) {
140 return new Promise((resolve, reject) => {
141 const eventSubscription = this.events().subscribe(event => {
142 if (event.type === pool_types_1.PoolEventType.taskCompleted && event.taskID === taskID) {
143 eventSubscription.unsubscribe();
144 resolve(event.returnValue);
145 }
146 else if (event.type === pool_types_1.PoolEventType.taskFailed && event.taskID === taskID) {
147 eventSubscription.unsubscribe();
148 reject(event.error);
149 }
150 else if (event.type === pool_types_1.PoolEventType.terminated) {
151 eventSubscription.unsubscribe();
152 reject(Error("Pool has been terminated before task was run."));
153 }
154 });
155 });
156 }
157 settled(allowResolvingImmediately = false) {
158 return __awaiter(this, void 0, void 0, function* () {
159 const getCurrentlyRunningTasks = () => flatMap(this.workers, worker => worker.runningTasks);
160 const taskFailures = [];
161 const failureSubscription = this.eventObservable.subscribe(event => {
162 if (event.type === pool_types_1.PoolEventType.taskFailed) {
163 taskFailures.push(event.error);
164 }
165 });
166 if (this.initErrors.length > 0) {
167 return Promise.reject(this.initErrors[0]);
168 }
169 if (allowResolvingImmediately && this.taskQueue.length === 0) {
170 yield ponyfills_1.allSettled(getCurrentlyRunningTasks());
171 return taskFailures;
172 }
173 yield new Promise((resolve, reject) => {
174 const subscription = this.eventObservable.subscribe({
175 next(event) {
176 if (event.type === pool_types_1.PoolEventType.taskQueueDrained) {
177 subscription.unsubscribe();
178 resolve(void 0);
179 }
180 },
181 error: reject // make a pool-wide error reject the completed() result promise
182 });
183 });
184 yield ponyfills_1.allSettled(getCurrentlyRunningTasks());
185 failureSubscription.unsubscribe();
186 return taskFailures;
187 });
188 }
189 completed(allowResolvingImmediately = false) {
190 return __awaiter(this, void 0, void 0, function* () {
191 const settlementPromise = this.settled(allowResolvingImmediately);
192 const earlyExitPromise = new Promise((resolve, reject) => {
193 const subscription = this.eventObservable.subscribe({
194 next(event) {
195 if (event.type === pool_types_1.PoolEventType.taskQueueDrained) {
196 subscription.unsubscribe();
197 resolve(settlementPromise);
198 }
199 else if (event.type === pool_types_1.PoolEventType.taskFailed) {
200 subscription.unsubscribe();
201 reject(event.error);
202 }
203 },
204 error: reject // make a pool-wide error reject the completed() result promise
205 });
206 });
207 const errors = yield Promise.race([
208 settlementPromise,
209 earlyExitPromise
210 ]);
211 if (errors.length > 0) {
212 throw errors[0];
213 }
214 });
215 }
216 events() {
217 return this.eventObservable;
218 }
219 queue(taskFunction) {
220 const { maxQueuedJobs = Infinity } = this.options;
221 if (this.isClosing) {
222 throw Error(`Cannot schedule pool tasks after terminate() has been called.`);
223 }
224 if (this.initErrors.length > 0) {
225 throw this.initErrors[0];
226 }
227 const taskID = this.nextTaskID++;
228 const taskCompletion = this.taskCompletion(taskID);
229 taskCompletion.catch((error) => {
230 // Prevent unhandled rejections here as we assume the user will use
231 // `pool.completed()`, `pool.settled()` or `task.catch()` to handle errors
232 this.debug(`Task #${taskID} errored:`, error);
233 });
234 const task = {
235 id: taskID,
236 run: taskFunction,
237 cancel: () => {
238 if (this.taskQueue.indexOf(task) === -1)
239 return;
240 this.taskQueue = this.taskQueue.filter(someTask => someTask !== task);
241 this.eventSubject.next({
242 type: pool_types_1.PoolEventType.taskCanceled,
243 taskID: task.id
244 });
245 },
246 then: taskCompletion.then.bind(taskCompletion)
247 };
248 if (this.taskQueue.length >= maxQueuedJobs) {
249 throw Error("Maximum number of pool tasks queued. Refusing to queue another one.\n" +
250 "This usually happens for one of two reasons: We are either at peak " +
251 "workload right now or some tasks just won't finish, thus blocking the pool.");
252 }
253 this.debug(`Queueing task #${task.id}...`);
254 this.taskQueue.push(task);
255 this.eventSubject.next({
256 type: pool_types_1.PoolEventType.taskQueued,
257 taskID: task.id
258 });
259 this.scheduleWork();
260 return task;
261 }
262 terminate(force) {
263 return __awaiter(this, void 0, void 0, function* () {
264 this.isClosing = true;
265 if (!force) {
266 yield this.completed(true);
267 }
268 this.eventSubject.next({
269 type: pool_types_1.PoolEventType.terminated,
270 remainingQueue: [...this.taskQueue]
271 });
272 this.eventSubject.complete();
273 yield Promise.all(this.workers.map((worker) => __awaiter(this, void 0, void 0, function* () { return thread_1.Thread.terminate(yield worker.init); })));
274 });
275 }
276}
277WorkerPool.EventType = pool_types_1.PoolEventType;
278/**
279 * Thread pool constructor. Creates a new pool and spawns its worker threads.
280 */
281function PoolConstructor(spawnWorker, optionsOrSize) {
282 // The function exists only so we don't need to use `new` to create a pool (we still can, though).
283 // If the Pool is a class or not is an implementation detail that should not concern the user.
284 return new WorkerPool(spawnWorker, optionsOrSize);
285}
286PoolConstructor.EventType = pool_types_1.PoolEventType;
287/**
288 * Thread pool constructor. Creates a new pool and spawns its worker threads.
289 */
290exports.Pool = PoolConstructor;