UNPKG

56.2 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.Wrapper = exports.WrapperOptionDefaults = exports.createErrorResponse = exports.filename = exports.isGenerator = void 0;
4const childProcess = require("child_process");
5const process = require("process");
6const proctor = require("process-doctor");
7const util_1 = require("util");
8const serialize_1 = require("./serialize");
9const throttle_1 = require("./throttle");
10const error_1 = require("./error");
11const p = (val) => (0, util_1.inspect)(val, { compact: true, breakLength: Infinity });
12const isGenerator = (fn) => fn instanceof function* () { }.constructor ||
13 fn instanceof async function* () { }.constructor;
14exports.isGenerator = isGenerator;
15exports.filename = module.filename;
16function createErrorResponse(err, { call, startTime, logUrl, executionId }) {
17 return {
18 kind: "promise",
19 type: "reject",
20 value: (0, serialize_1.serializeReturnValue)(call.name, err, false),
21 isErrorObject: typeof err === "object" && err instanceof Error,
22 callId: call.callId,
23 remoteExecutionStartTime: startTime,
24 remoteExecutionEndTime: Date.now(),
25 logUrl,
26 executionId
27 };
28}
29exports.createErrorResponse = createErrorResponse;
30exports.WrapperOptionDefaults = {
31 wrapperLog: console.log,
32 childProcess: true,
33 childProcessMemoryLimitMb: 0,
34 childProcessTimeoutMs: 0,
35 childProcessEnvironment: {},
36 childDir: ".",
37 wrapperVerbose: false,
38 validateSerialization: true
39};
40const oomPattern = /Allocation failed - JavaScript heap out of memory/;
41const FAAST_CHILD_ENV = "FAAST_CHILD";
42class Wrapper {
43 constructor(fModule, options = {}) {
44 this.executing = false;
45 this.verbose = false;
46 this.funcs = {};
47 this.logLines = (msg) => {
48 let lines = msg.split("\n");
49 if (lines[lines.length - 1] === "") {
50 lines = lines.slice(0, lines.length - 1);
51 }
52 for (const line of lines) {
53 this.log(`[${this.childPid}]: ${line}`);
54 }
55 };
56 this.options = { ...exports.WrapperOptionDefaults, ...options };
57 this.log = this.options.wrapperLog;
58 this.verbose = this.options.wrapperVerbose;
59 this.funcs = fModule;
60 this.queue = new throttle_1.AsyncIterableQueue();
61 /* istanbul ignore if */
62 if (process.env[FAAST_CHILD_ENV]) {
63 this.options.childProcess = false;
64 this.log(`faast: started child process for module wrapper.`);
65 process.on("message", async (cc) => {
66 const startTime = Date.now();
67 try {
68 await this.execute({ ...cc, startTime }, {
69 onMessage: async (msg) => {
70 this.log(`Received message ${msg.kind}`);
71 process.send({ done: false, value: msg });
72 }
73 });
74 this.log(`Done with this.execute()`);
75 }
76 catch (err) {
77 this.log(err);
78 }
79 finally {
80 process.send({ done: true });
81 }
82 });
83 }
84 else {
85 if (!process.env.FAAST_SILENT) {
86 this.log(`faast: successful cold start.`);
87 }
88 }
89 }
90 lookupFunction(request) {
91 const { name, args } = request;
92 if (!name) {
93 throw new Error("Invalid function call request: no name");
94 }
95 const func = this.funcs[name];
96 if (!func) {
97 throw new Error(`Function named "${name}" not found`);
98 }
99 if (!args) {
100 throw new Error("Invalid arguments to function call");
101 }
102 return func;
103 }
104 stopCpuMonitoring() {
105 this.monitoringTimer && clearInterval(this.monitoringTimer);
106 this.monitoringTimer = undefined;
107 }
108 startCpuMonitoring(pid, callId) {
109 if (this.monitoringTimer) {
110 this.stopCpuMonitoring();
111 }
112 this.monitoringTimer = cpuMonitor(pid, 1000, (err, result) => {
113 if (err) {
114 this.log(`cpu monitor error: ${err}`);
115 }
116 if (result) {
117 this.queue.push({ kind: "cpumetrics", callId, metrics: result });
118 }
119 });
120 }
121 stop() {
122 this.stopCpuMonitoring();
123 if (this.child) {
124 this.log(`Stopping child process.`);
125 this.child.stdout.removeListener("data", this.logLines);
126 this.child.stderr.removeListener("data", this.logLines);
127 this.child.disconnect();
128 this.child.kill();
129 this.child = undefined;
130 this.executing = false;
131 }
132 }
133 async execute(callingContext, { errorCallback, onMessage, measureCpuUsage }) {
134 const processError = (err) => err instanceof Error && errorCallback ? errorCallback(err) : err;
135 try {
136 /* istanbul ignore if */
137 if (this.executing) {
138 this.log(`faast: warning: module wrapper execute is not re-entrant`);
139 throw new Error(`faast: module wrapper is not re-entrant`);
140 }
141 this.executing = true;
142 const { call, startTime, logUrl, executionId, instanceId } = callingContext;
143 const detail = { logUrl, executionId, instanceId };
144 const { callId } = call;
145 this.log(`calling: ${call.name}`);
146 this.log(` args: ${call.args}`);
147 this.log(` callId: ${callId}`);
148 // let startedMessageTimer: NodeJS.Timeout | undefined = setTimeout(
149 // () => messageCallback({ kind: "functionstarted", callId }),
150 // 2 * 1000
151 // );
152 // TODO: Add this code after the execute returns or yields its first value...
153 // if (startedMessageTimer) {
154 // clearTimeout(startedMessageTimer);
155 // startedMessageTimer = undefined;
156 // }
157 const memoryUsage = process.memoryUsage();
158 const memInfo = p(memoryUsage);
159 if (this.options.childProcess) {
160 if (!this.child) {
161 this.child = this.setupChildProcess();
162 }
163 this.verbose &&
164 this.log(`faast: invoking '${call.name}' in child process, memory: ${memInfo}`);
165 this.child.send(callingContext, err => {
166 /* istanbul ignore if */
167 if (err) {
168 this.log(`child send error: rejecting with ${err}`);
169 this.queue.push(Promise.reject(err));
170 }
171 });
172 if (measureCpuUsage) {
173 this.verbose &&
174 this.log(`Starting CPU monitor for pid ${this.child.pid}`);
175 // XXX CPU Monitoring not enabled for now.
176 // this.startCpuMonitoring(this.child.pid, callId);
177 }
178 let timer;
179 const timeout = this.options.childProcessTimeoutMs;
180 if (timeout) {
181 this.verbose && this.log(`Setting timeout: ${timeout}`);
182 timer = setTimeout(() => {
183 const error = new error_1.FaastError({
184 name: error_1.FaastErrorNames.ETIMEOUT,
185 info: { ...detail, functionName: call.name }
186 }, `Request exceeded timeout of ${timeout}ms`);
187 this.queue.push(Promise.reject(error));
188 this.stop();
189 }, timeout);
190 }
191 this.verbose && this.log(`awaiting async dequeue`);
192 try {
193 const promises = [];
194 for await (const result of this.queue) {
195 this.verbose && this.log(`Dequeuing ${p(result)}`);
196 if (result.kind === "promise" || result.kind === "iterator") {
197 result.logUrl = logUrl;
198 }
199 promises.push(onMessage(result));
200 }
201 await Promise.all(promises);
202 }
203 finally {
204 this.verbose && this.log(`Finalizing queue`);
205 this.stopCpuMonitoring();
206 timer && clearTimeout(timer);
207 this.queue.clear();
208 }
209 }
210 else {
211 this.verbose &&
212 this.log(`faast: Invoking '${call.name}', memory: ${memInfo}`);
213 const func = this.lookupFunction(call);
214 if (!func) {
215 throw new Error(`faast module wrapper: could not find function '${call.name}'`);
216 }
217 const args = (0, serialize_1.deserialize)(call.args);
218 let value;
219 try {
220 value = await func.apply(undefined, args);
221 this.verbose && this.log(`Finished call function`);
222 }
223 catch (err) {
224 this.log(`Function ${call.name} threw error: ${err}`);
225 throw err;
226 }
227 this.verbose &&
228 this.log(`returned value: ${p(value)}, type: ${typeof value}`);
229 const validate = this.options.validateSerialization;
230 const context = { type: "fulfill", callId, ...detail };
231 // Check for iterable.
232 if (value !== null && value !== undefined) {
233 if ((0, exports.isGenerator)(func)) {
234 let next = await value.next();
235 let sequence = 0;
236 while (true) {
237 this.verbose && this.log(`next: ${p(next)}`);
238 let result = {
239 ...context,
240 kind: "iterator",
241 value: (0, serialize_1.serializeReturnValue)(call.name, [next], validate),
242 sequence
243 };
244 if (next.done) {
245 result.remoteExecutionStartTime = startTime;
246 result.remoteExecutionEndTime = Date.now();
247 result.memoryUsage = memoryUsage;
248 }
249 await onMessage(result);
250 if (next.done) {
251 return;
252 }
253 sequence++;
254 next = await value.next();
255 }
256 }
257 }
258 await onMessage({
259 ...context,
260 kind: "promise",
261 value: (0, serialize_1.serializeReturnValue)(call.name, [value], validate),
262 remoteExecutionStartTime: startTime,
263 remoteExecutionEndTime: Date.now(),
264 memoryUsage
265 });
266 }
267 }
268 catch (err) {
269 this.log(`faast: wrapped function exception or promise rejection: ${err}`);
270 const response = createErrorResponse(processError(err), callingContext);
271 this.log(`Error response: ${(0, util_1.inspect)(response)}`);
272 await onMessage(response);
273 }
274 finally {
275 this.verbose && this.log(`Exiting execute`);
276 this.executing = false;
277 }
278 }
279 setupChildProcess() {
280 this.verbose && this.log(`faast: creating child process`);
281 let execArgv = process.execArgv.slice();
282 if (this.options.childProcessMemoryLimitMb) {
283 /* istanbul ignore next */
284 execArgv = process.execArgv.filter(arg => !arg.match(/^--max-old-space-size/) && !arg.match(/^--inspect/));
285 execArgv.push(`--max-old-space-size=${this.options.childProcessMemoryLimitMb}`);
286 }
287 const { childProcessEnvironment } = this.options;
288 const env = {
289 ...process.env,
290 ...childProcessEnvironment,
291 [FAAST_CHILD_ENV]: "true"
292 };
293 this.verbose && this.log(`Env: ${JSON.stringify(env)}`);
294 const forkOptions = {
295 silent: true,
296 env,
297 cwd: this.options.childDir,
298 execArgv
299 };
300 const child = childProcess.fork("./index.js", [], forkOptions);
301 this.childPid = child.pid;
302 child.stdout.setEncoding("utf8");
303 child.stderr.setEncoding("utf8");
304 let oom;
305 const detectOom = (chunk) => {
306 if (oomPattern.test(chunk)) {
307 oom = chunk;
308 }
309 };
310 child.stdout.on("data", this.logLines);
311 child.stderr.on("data", this.logLines);
312 child.stderr.on("data", detectOom);
313 child.on("message", (message) => {
314 this.verbose && this.log(`child message: resolving with ${p(message)}`);
315 if (message.done) {
316 this.queue.done();
317 }
318 else {
319 this.queue.push(message.value);
320 }
321 });
322 /* istanbul ignore next */
323 child.on("error", err => {
324 this.verbose && this.log(`child error: rejecting with ${err}`);
325 this.child = undefined;
326 this.queue.push(Promise.reject(err));
327 });
328 child.on("exit", (code, signal) => {
329 this.verbose && this.log(`child exit: code: ${code}, signal: ${signal}`);
330 this.child = undefined;
331 if (code) {
332 this.queue.push(Promise.reject(new Error(`Exited with error code ${code}`)));
333 }
334 else if (signal !== null && signal !== "SIGTERM") {
335 let errorMessage = `Aborted with signal ${signal}`;
336 if (signal === "SIGABRT" && oom) {
337 errorMessage += ` (${oom})`;
338 oom = undefined;
339 }
340 this.queue.push(Promise.reject(new Error(errorMessage)));
341 }
342 else {
343 this.verbose && this.log(`child exiting normally`);
344 }
345 });
346 return child;
347 }
348}
349exports.Wrapper = Wrapper;
350function cpuMonitor(pid, interval, callback) {
351 const start = Date.now();
352 const timer = setInterval(() => proctor.lookup(pid, (err, result) => {
353 if (err) {
354 callback(err);
355 return;
356 }
357 const { stime, utime } = result;
358 callback(err, result && {
359 stime: stime * 10,
360 utime: utime * 10,
361 elapsed: Date.now() - start
362 });
363 }), interval);
364 return timer;
365}
366//# sourceMappingURL=data:application/json;base64,
\No newline at end of file