UNPKG

6.72 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4 return new (P || (P = Promise))(function (resolve, reject) {
5 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8 step((generator = generator.apply(thisArg, _arguments || [])).next());
9 });
10};
11var __importDefault = (this && this.__importDefault) || function (mod) {
12 return (mod && mod.__esModule) ? mod : { "default": mod };
13};
14Object.defineProperty(exports, "__esModule", { value: true });
15exports.spawn = void 0;
16const debug_1 = __importDefault(require("debug"));
17const observable_fns_1 = require("observable-fns");
18const common_1 = require("../common");
19const promise_1 = require("../promise");
20const symbols_1 = require("../symbols");
21const master_1 = require("../types/master");
22const invocation_proxy_1 = require("./invocation-proxy");
23const debugMessages = debug_1.default("threads:master:messages");
24const debugSpawn = debug_1.default("threads:master:spawn");
25const debugThreadUtils = debug_1.default("threads:master:thread-utils");
26const isInitMessage = (data) => data && data.type === "init";
27const isUncaughtErrorMessage = (data) => data && data.type === "uncaughtError";
28const initMessageTimeout = typeof process !== "undefined" && process.env.THREADS_WORKER_INIT_TIMEOUT
29 ? Number.parseInt(process.env.THREADS_WORKER_INIT_TIMEOUT, 10)
30 : 10000;
31function withTimeout(promise, timeoutInMs, errorMessage) {
32 return __awaiter(this, void 0, void 0, function* () {
33 let timeoutHandle;
34 const timeout = new Promise((resolve, reject) => {
35 timeoutHandle = setTimeout(() => reject(Error(errorMessage)), timeoutInMs);
36 });
37 const result = yield Promise.race([
38 promise,
39 timeout
40 ]);
41 clearTimeout(timeoutHandle);
42 return result;
43 });
44}
45function receiveInitMessage(worker) {
46 return new Promise((resolve, reject) => {
47 const messageHandler = ((event) => {
48 debugMessages("Message from worker before finishing initialization:", event.data);
49 if (isInitMessage(event.data)) {
50 worker.removeEventListener("message", messageHandler);
51 resolve(event.data);
52 }
53 else if (isUncaughtErrorMessage(event.data)) {
54 worker.removeEventListener("message", messageHandler);
55 reject(common_1.deserialize(event.data.error));
56 }
57 });
58 worker.addEventListener("message", messageHandler);
59 });
60}
61function createEventObservable(worker, workerTermination) {
62 return new observable_fns_1.Observable(observer => {
63 const messageHandler = ((messageEvent) => {
64 const workerEvent = {
65 type: master_1.WorkerEventType.message,
66 data: messageEvent.data
67 };
68 observer.next(workerEvent);
69 });
70 const rejectionHandler = ((errorEvent) => {
71 debugThreadUtils("Unhandled promise rejection event in thread:", errorEvent);
72 const workerEvent = {
73 type: master_1.WorkerEventType.internalError,
74 error: Error(errorEvent.reason)
75 };
76 observer.next(workerEvent);
77 });
78 worker.addEventListener("message", messageHandler);
79 worker.addEventListener("unhandledrejection", rejectionHandler);
80 workerTermination.then(() => {
81 const terminationEvent = {
82 type: master_1.WorkerEventType.termination
83 };
84 worker.removeEventListener("message", messageHandler);
85 worker.removeEventListener("unhandledrejection", rejectionHandler);
86 observer.next(terminationEvent);
87 observer.complete();
88 });
89 });
90}
91function createTerminator(worker) {
92 const [termination, resolver] = promise_1.createPromiseWithResolver();
93 const terminate = () => __awaiter(this, void 0, void 0, function* () {
94 debugThreadUtils("Terminating worker");
95 // Newer versions of worker_threads workers return a promise
96 yield worker.terminate();
97 resolver();
98 });
99 return { terminate, termination };
100}
101function setPrivateThreadProps(raw, worker, workerEvents, terminate) {
102 const workerErrors = workerEvents
103 .filter(event => event.type === master_1.WorkerEventType.internalError)
104 .map(errorEvent => errorEvent.error);
105 // tslint:disable-next-line prefer-object-spread
106 return Object.assign(raw, {
107 [symbols_1.$errors]: workerErrors,
108 [symbols_1.$events]: workerEvents,
109 [symbols_1.$terminate]: terminate,
110 [symbols_1.$worker]: worker
111 });
112}
113/**
114 * Spawn a new thread. Takes a fresh worker instance, wraps it in a thin
115 * abstraction layer to provide the transparent API and verifies that
116 * the worker has initialized successfully.
117 *
118 * @param worker Instance of `Worker`. Either a web worker, `worker_threads` worker or `tiny-worker` worker.
119 * @param [options]
120 * @param [options.timeout] Init message timeout. Default: 10000 or set by environment variable.
121 */
122function spawn(worker, options) {
123 return __awaiter(this, void 0, void 0, function* () {
124 debugSpawn("Initializing new thread");
125 const timeout = options && options.timeout ? options.timeout : initMessageTimeout;
126 const initMessage = yield withTimeout(receiveInitMessage(worker), timeout, `Timeout: Did not receive an init message from worker after ${timeout}ms. Make sure the worker calls expose().`);
127 const exposed = initMessage.exposed;
128 const { termination, terminate } = createTerminator(worker);
129 const events = createEventObservable(worker, termination);
130 if (exposed.type === "function") {
131 const proxy = invocation_proxy_1.createProxyFunction(worker);
132 return setPrivateThreadProps(proxy, worker, events, terminate);
133 }
134 else if (exposed.type === "module") {
135 const proxy = invocation_proxy_1.createProxyModule(worker, exposed.methods);
136 return setPrivateThreadProps(proxy, worker, events, terminate);
137 }
138 else {
139 const type = exposed.type;
140 throw Error(`Worker init message states unexpected type of expose(): ${type}`);
141 }
142 });
143}
144exports.spawn = spawn;