1 | "use strict";
|
2 | var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
|
3 | function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
|
4 | return new (P || (P = Promise))(function (resolve, reject) {
|
5 | function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
|
6 | function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
|
7 | function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
|
8 | step((generator = generator.apply(thisArg, _arguments || [])).next());
|
9 | });
|
10 | };
|
11 | var __importDefault = (this && this.__importDefault) || function (mod) {
|
12 | return (mod && mod.__esModule) ? mod : { "default": mod };
|
13 | };
|
14 | Object.defineProperty(exports, "__esModule", { value: true });
|
15 | exports.Pool = exports.Thread = exports.PoolEventType = void 0;
|
16 | const debug_1 = __importDefault(require("debug"));
|
17 | const observable_fns_1 = require("observable-fns");
|
18 | const ponyfills_1 = require("../ponyfills");
|
19 | const implementation_1 = require("./implementation");
|
20 | const pool_types_1 = require("./pool-types");
|
21 | Object.defineProperty(exports, "PoolEventType", { enumerable: true, get: function () { return pool_types_1.PoolEventType; } });
|
22 | const thread_1 = require("./thread");
|
23 | Object.defineProperty(exports, "Thread", { enumerable: true, get: function () { return thread_1.Thread; } });
|
24 | let nextPoolID = 1;
|
25 | function createArray(size) {
|
26 | const array = [];
|
27 | for (let index = 0; index < size; index++) {
|
28 | array.push(index);
|
29 | }
|
30 | return array;
|
31 | }
|
32 | function delay(ms) {
|
33 | return new Promise(resolve => setTimeout(resolve, ms));
|
34 | }
|
35 | function flatMap(array, mapper) {
|
36 | return array.reduce((flattened, element) => [...flattened, ...mapper(element)], []);
|
37 | }
|
38 | function slugify(text) {
|
39 | return text.replace(/\W/g, " ").trim().replace(/\s+/g, "-");
|
40 | }
|
41 | function spawnWorkers(spawnWorker, count) {
|
42 | return createArray(count).map(() => ({
|
43 | init: spawnWorker(),
|
44 | runningTasks: []
|
45 | }));
|
46 | }
|
47 | class WorkerPool {
|
48 | constructor(spawnWorker, optionsOrSize) {
|
49 | this.eventSubject = new observable_fns_1.Subject();
|
50 | this.initErrors = [];
|
51 | this.isClosing = false;
|
52 | this.nextTaskID = 1;
|
53 | this.taskQueue = [];
|
54 | const options = typeof optionsOrSize === "number"
|
55 | ? { size: optionsOrSize }
|
56 | : optionsOrSize || {};
|
57 | const { size = implementation_1.defaultPoolSize } = options;
|
58 | this.debug = debug_1.default(`threads:pool:${slugify(options.name || String(nextPoolID++))}`);
|
59 | this.options = options;
|
60 | this.workers = spawnWorkers(spawnWorker, size);
|
61 | this.eventObservable = observable_fns_1.multicast(observable_fns_1.Observable.from(this.eventSubject));
|
62 | Promise.all(this.workers.map(worker => worker.init)).then(() => this.eventSubject.next({
|
63 | type: pool_types_1.PoolEventType.initialized,
|
64 | size: this.workers.length
|
65 | }), error => {
|
66 | this.debug("Error while initializing pool worker:", error);
|
67 | this.eventSubject.error(error);
|
68 | this.initErrors.push(error);
|
69 | });
|
70 | }
|
71 | findIdlingWorker() {
|
72 | const { concurrency = 1 } = this.options;
|
73 | return this.workers.find(worker => worker.runningTasks.length < concurrency);
|
74 | }
|
75 | runPoolTask(worker, task) {
|
76 | return __awaiter(this, void 0, void 0, function* () {
|
77 | const workerID = this.workers.indexOf(worker) + 1;
|
78 | this.debug(`Running task #${task.id} on worker #${workerID}...`);
|
79 | this.eventSubject.next({
|
80 | type: pool_types_1.PoolEventType.taskStart,
|
81 | taskID: task.id,
|
82 | workerID
|
83 | });
|
84 | try {
|
85 | const returnValue = yield task.run(yield worker.init);
|
86 | this.debug(`Task #${task.id} completed successfully`);
|
87 | this.eventSubject.next({
|
88 | type: pool_types_1.PoolEventType.taskCompleted,
|
89 | returnValue,
|
90 | taskID: task.id,
|
91 | workerID
|
92 | });
|
93 | }
|
94 | catch (error) {
|
95 | this.debug(`Task #${task.id} failed`);
|
96 | this.eventSubject.next({
|
97 | type: pool_types_1.PoolEventType.taskFailed,
|
98 | taskID: task.id,
|
99 | error,
|
100 | workerID
|
101 | });
|
102 | }
|
103 | });
|
104 | }
|
105 | run(worker, task) {
|
106 | return __awaiter(this, void 0, void 0, function* () {
|
107 | const runPromise = (() => __awaiter(this, void 0, void 0, function* () {
|
108 | const removeTaskFromWorkersRunningTasks = () => {
|
109 | worker.runningTasks = worker.runningTasks.filter(someRunPromise => someRunPromise !== runPromise);
|
110 | };
|
111 |
|
112 | yield delay(0);
|
113 | try {
|
114 | yield this.runPoolTask(worker, task);
|
115 | }
|
116 | finally {
|
117 | removeTaskFromWorkersRunningTasks();
|
118 | if (!this.isClosing) {
|
119 | this.scheduleWork();
|
120 | }
|
121 | }
|
122 | }))();
|
123 | worker.runningTasks.push(runPromise);
|
124 | });
|
125 | }
|
126 | scheduleWork() {
|
127 | this.debug(`Attempt de-queueing a task in order to run it...`);
|
128 | const availableWorker = this.findIdlingWorker();
|
129 | if (!availableWorker)
|
130 | return;
|
131 | const nextTask = this.taskQueue.shift();
|
132 | if (!nextTask) {
|
133 | this.debug(`Task queue is empty`);
|
134 | this.eventSubject.next({ type: pool_types_1.PoolEventType.taskQueueDrained });
|
135 | return;
|
136 | }
|
137 | this.run(availableWorker, nextTask);
|
138 | }
|
139 | taskCompletion(taskID) {
|
140 | return new Promise((resolve, reject) => {
|
141 | const eventSubscription = this.events().subscribe(event => {
|
142 | if (event.type === pool_types_1.PoolEventType.taskCompleted && event.taskID === taskID) {
|
143 | eventSubscription.unsubscribe();
|
144 | resolve(event.returnValue);
|
145 | }
|
146 | else if (event.type === pool_types_1.PoolEventType.taskFailed && event.taskID === taskID) {
|
147 | eventSubscription.unsubscribe();
|
148 | reject(event.error);
|
149 | }
|
150 | else if (event.type === pool_types_1.PoolEventType.terminated) {
|
151 | eventSubscription.unsubscribe();
|
152 | reject(Error("Pool has been terminated before task was run."));
|
153 | }
|
154 | });
|
155 | });
|
156 | }
|
157 | settled(allowResolvingImmediately = false) {
|
158 | return __awaiter(this, void 0, void 0, function* () {
|
159 | const getCurrentlyRunningTasks = () => flatMap(this.workers, worker => worker.runningTasks);
|
160 | const taskFailures = [];
|
161 | const failureSubscription = this.eventObservable.subscribe(event => {
|
162 | if (event.type === pool_types_1.PoolEventType.taskFailed) {
|
163 | taskFailures.push(event.error);
|
164 | }
|
165 | });
|
166 | if (this.initErrors.length > 0) {
|
167 | return Promise.reject(this.initErrors[0]);
|
168 | }
|
169 | if (allowResolvingImmediately && this.taskQueue.length === 0) {
|
170 | yield ponyfills_1.allSettled(getCurrentlyRunningTasks());
|
171 | return taskFailures;
|
172 | }
|
173 | yield new Promise((resolve, reject) => {
|
174 | const subscription = this.eventObservable.subscribe({
|
175 | next(event) {
|
176 | if (event.type === pool_types_1.PoolEventType.taskQueueDrained) {
|
177 | subscription.unsubscribe();
|
178 | resolve(void 0);
|
179 | }
|
180 | },
|
181 | error: reject
|
182 | });
|
183 | });
|
184 | yield ponyfills_1.allSettled(getCurrentlyRunningTasks());
|
185 | failureSubscription.unsubscribe();
|
186 | return taskFailures;
|
187 | });
|
188 | }
|
189 | completed(allowResolvingImmediately = false) {
|
190 | return __awaiter(this, void 0, void 0, function* () {
|
191 | const settlementPromise = this.settled(allowResolvingImmediately);
|
192 | const earlyExitPromise = new Promise((resolve, reject) => {
|
193 | const subscription = this.eventObservable.subscribe({
|
194 | next(event) {
|
195 | if (event.type === pool_types_1.PoolEventType.taskQueueDrained) {
|
196 | subscription.unsubscribe();
|
197 | resolve(settlementPromise);
|
198 | }
|
199 | else if (event.type === pool_types_1.PoolEventType.taskFailed) {
|
200 | subscription.unsubscribe();
|
201 | reject(event.error);
|
202 | }
|
203 | },
|
204 | error: reject
|
205 | });
|
206 | });
|
207 | const errors = yield Promise.race([
|
208 | settlementPromise,
|
209 | earlyExitPromise
|
210 | ]);
|
211 | if (errors.length > 0) {
|
212 | throw errors[0];
|
213 | }
|
214 | });
|
215 | }
|
216 | events() {
|
217 | return this.eventObservable;
|
218 | }
|
219 | queue(taskFunction) {
|
220 | const { maxQueuedJobs = Infinity } = this.options;
|
221 | if (this.isClosing) {
|
222 | throw Error(`Cannot schedule pool tasks after terminate() has been called.`);
|
223 | }
|
224 | if (this.initErrors.length > 0) {
|
225 | throw this.initErrors[0];
|
226 | }
|
227 | const taskID = this.nextTaskID++;
|
228 | const taskCompletion = this.taskCompletion(taskID);
|
229 | taskCompletion.catch((error) => {
|
230 |
|
231 |
|
232 | this.debug(`Task #${taskID} errored:`, error);
|
233 | });
|
234 | const task = {
|
235 | id: taskID,
|
236 | run: taskFunction,
|
237 | cancel: () => {
|
238 | if (this.taskQueue.indexOf(task) === -1)
|
239 | return;
|
240 | this.taskQueue = this.taskQueue.filter(someTask => someTask !== task);
|
241 | this.eventSubject.next({
|
242 | type: pool_types_1.PoolEventType.taskCanceled,
|
243 | taskID: task.id
|
244 | });
|
245 | },
|
246 | then: taskCompletion.then.bind(taskCompletion)
|
247 | };
|
248 | if (this.taskQueue.length >= maxQueuedJobs) {
|
249 | throw Error("Maximum number of pool tasks queued. Refusing to queue another one.\n" +
|
250 | "This usually happens for one of two reasons: We are either at peak " +
|
251 | "workload right now or some tasks just won't finish, thus blocking the pool.");
|
252 | }
|
253 | this.debug(`Queueing task #${task.id}...`);
|
254 | this.taskQueue.push(task);
|
255 | this.eventSubject.next({
|
256 | type: pool_types_1.PoolEventType.taskQueued,
|
257 | taskID: task.id
|
258 | });
|
259 | this.scheduleWork();
|
260 | return task;
|
261 | }
|
262 | terminate(force) {
|
263 | return __awaiter(this, void 0, void 0, function* () {
|
264 | this.isClosing = true;
|
265 | if (!force) {
|
266 | yield this.completed(true);
|
267 | }
|
268 | this.eventSubject.next({
|
269 | type: pool_types_1.PoolEventType.terminated,
|
270 | remainingQueue: [...this.taskQueue]
|
271 | });
|
272 | this.eventSubject.complete();
|
273 | yield Promise.all(this.workers.map((worker) => __awaiter(this, void 0, void 0, function* () { return thread_1.Thread.terminate(yield worker.init); })));
|
274 | });
|
275 | }
|
276 | }
|
277 | WorkerPool.EventType = pool_types_1.PoolEventType;
|
278 |
|
279 |
|
280 |
|
281 | function PoolConstructor(spawnWorker, optionsOrSize) {
|
282 |
|
283 |
|
284 | return new WorkerPool(spawnWorker, optionsOrSize);
|
285 | }
|
286 | PoolConstructor.EventType = pool_types_1.PoolEventType;
|
287 |
|
288 |
|
289 |
|
290 | exports.Pool = PoolConstructor;
|