UNPKG

6.3 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, '__esModule', {
4 value: true
5});
6exports.default = void 0;
7
8function _stream() {
9 const data = require('stream');
10
11 _stream = function () {
12 return data;
13 };
14
15 return data;
16}
17
18function _worker_threads() {
19 const data = require('worker_threads');
20
21 _worker_threads = function () {
22 return data;
23 };
24
25 return data;
26}
27
28function _mergeStream() {
29 const data = _interopRequireDefault(require('merge-stream'));
30
31 _mergeStream = function () {
32 return data;
33 };
34
35 return data;
36}
37
38var _types = require('../types');
39
40function _interopRequireDefault(obj) {
41 return obj && obj.__esModule ? obj : {default: obj};
42}
43
44/**
45 * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
46 *
47 * This source code is licensed under the MIT license found in the
48 * LICENSE file in the root directory of this source tree.
49 */
50class ExperimentalWorker {
51 _worker;
52 _options;
53 _request;
54 _retries;
55 _onProcessEnd;
56 _onCustomMessage;
57 _fakeStream;
58 _stdout;
59 _stderr;
60 _exitPromise;
61 _resolveExitPromise;
62 _forceExited;
63
64 constructor(options) {
65 this._options = options;
66 this._request = null;
67 this._fakeStream = null;
68 this._stdout = null;
69 this._stderr = null;
70 this._exitPromise = new Promise(resolve => {
71 this._resolveExitPromise = resolve;
72 });
73 this._forceExited = false;
74 this.initialize();
75 }
76
77 initialize() {
78 this._worker = new (_worker_threads().Worker)(
79 require.resolve('./threadChild'),
80 {
81 eval: false,
82 resourceLimits: this._options.resourceLimits,
83 stderr: true,
84 stdout: true,
85 workerData: this._options.workerData,
86 ...this._options.forkOptions
87 }
88 );
89
90 if (this._worker.stdout) {
91 if (!this._stdout) {
92 // We need to add a permanent stream to the merged stream to prevent it
93 // from ending when the subprocess stream ends
94 this._stdout = (0, _mergeStream().default)(this._getFakeStream());
95 }
96
97 this._stdout.add(this._worker.stdout);
98 }
99
100 if (this._worker.stderr) {
101 if (!this._stderr) {
102 // We need to add a permanent stream to the merged stream to prevent it
103 // from ending when the subprocess stream ends
104 this._stderr = (0, _mergeStream().default)(this._getFakeStream());
105 }
106
107 this._stderr.add(this._worker.stderr);
108 }
109
110 this._worker.on('message', this._onMessage.bind(this));
111
112 this._worker.on('exit', this._onExit.bind(this));
113
114 this._worker.postMessage([
115 _types.CHILD_MESSAGE_INITIALIZE,
116 false,
117 this._options.workerPath,
118 this._options.setupArgs,
119 String(this._options.workerId + 1) // 0-indexed workerId, 1-indexed JEST_WORKER_ID
120 ]);
121
122 this._retries++; // If we exceeded the amount of retries, we will emulate an error reply
123 // coming from the child. This avoids code duplication related with cleaning
124 // the queue, and scheduling the next call.
125
126 if (this._retries > this._options.maxRetries) {
127 const error = new Error('Call retries were exceeded');
128
129 this._onMessage([
130 _types.PARENT_MESSAGE_CLIENT_ERROR,
131 error.name,
132 error.message,
133 error.stack,
134 {
135 type: 'WorkerError'
136 }
137 ]);
138 }
139 }
140
141 _shutdown() {
142 // End the permanent stream so the merged stream end too
143 if (this._fakeStream) {
144 this._fakeStream.end();
145
146 this._fakeStream = null;
147 }
148
149 this._resolveExitPromise();
150 }
151
152 _onMessage(response) {
153 let error;
154
155 switch (response[0]) {
156 case _types.PARENT_MESSAGE_OK:
157 this._onProcessEnd(null, response[1]);
158
159 break;
160
161 case _types.PARENT_MESSAGE_CLIENT_ERROR:
162 error = response[4];
163
164 if (error != null && typeof error === 'object') {
165 const extra = error; // @ts-expect-error: no index
166
167 const NativeCtor = globalThis[response[1]];
168 const Ctor = typeof NativeCtor === 'function' ? NativeCtor : Error;
169 error = new Ctor(response[2]);
170 error.type = response[1];
171 error.stack = response[3];
172
173 for (const key in extra) {
174 // @ts-expect-error: no index
175 error[key] = extra[key];
176 }
177 }
178
179 this._onProcessEnd(error, null);
180
181 break;
182
183 case _types.PARENT_MESSAGE_SETUP_ERROR:
184 error = new Error(`Error when calling setup: ${response[2]}`); // @ts-expect-error: adding custom properties to errors.
185
186 error.type = response[1];
187 error.stack = response[3];
188
189 this._onProcessEnd(error, null);
190
191 break;
192
193 case _types.PARENT_MESSAGE_CUSTOM:
194 this._onCustomMessage(response[1]);
195
196 break;
197
198 default:
199 throw new TypeError(`Unexpected response from worker: ${response[0]}`);
200 }
201 }
202
203 _onExit(exitCode) {
204 if (exitCode !== 0 && !this._forceExited) {
205 this.initialize();
206
207 if (this._request) {
208 this._worker.postMessage(this._request);
209 }
210 } else {
211 this._shutdown();
212 }
213 }
214
215 waitForExit() {
216 return this._exitPromise;
217 }
218
219 forceExit() {
220 this._forceExited = true;
221
222 this._worker.terminate();
223 }
224
225 send(request, onProcessStart, onProcessEnd, onCustomMessage) {
226 onProcessStart(this);
227
228 this._onProcessEnd = (...args) => {
229 var _onProcessEnd;
230
231 // Clean the request to avoid sending past requests to workers that fail
232 // while waiting for a new request (timers, unhandled rejections...)
233 this._request = null;
234 const res =
235 (_onProcessEnd = onProcessEnd) === null || _onProcessEnd === void 0
236 ? void 0
237 : _onProcessEnd(...args); // Clean up the reference so related closures can be garbage collected.
238
239 onProcessEnd = null;
240 return res;
241 };
242
243 this._onCustomMessage = (...arg) => onCustomMessage(...arg);
244
245 this._request = request;
246 this._retries = 0;
247
248 this._worker.postMessage(request);
249 }
250
251 getWorkerId() {
252 return this._options.workerId;
253 }
254
255 getStdout() {
256 return this._stdout;
257 }
258
259 getStderr() {
260 return this._stderr;
261 }
262
263 _getFakeStream() {
264 if (!this._fakeStream) {
265 this._fakeStream = new (_stream().PassThrough)();
266 }
267
268 return this._fakeStream;
269 }
270}
271
272exports.default = ExperimentalWorker;