UNPKG

96.2 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.logUrl = exports.googlePacker = exports.cleanup = exports.getRequestSubscription = exports.getResponseSubscription = exports.getResponseQueueTopic = exports.initialize = exports.initializeGoogleServices = exports.GoogleImpl = exports.defaults = exports.defaultGcWorker = exports.GoogleMetrics = void 0;
4const abort_controller_1 = require("abort-controller");
5const gaxios_1 = require("gaxios");
6const googleapis_1 = require("googleapis");
7const https = require("https");
8const util = require("util");
9const cache_1 = require("../cache");
10const cost_1 = require("../cost");
11const error_1 = require("../error");
12const log_1 = require("../log");
13const packer_1 = require("../packer");
14const provider_1 = require("../provider");
15const serialize_1 = require("../serialize");
16const shared_1 = require("../shared");
17const throttle_1 = require("../throttle");
18const google_queue_1 = require("./google-queue");
19const google_shared_1 = require("./google-shared");
20const googleTrampolineHttps = require("./google-trampoline-https");
21const googleTrampolineQueue = require("./google-trampoline-queue");
22const gaxios = new gaxios_1.Gaxios({
23 retryConfig: {
24 retry: 3,
25 noResponseRetries: 3,
26 shouldRetry: (0, google_shared_1.shouldRetryRequest)(log_1.log.info)
27 }
28});
29const GoogleCloudFunctionsMemorySizes = [128, 256, 512, 1024, 2048];
30class GoogleMetrics {
31 constructor() {
32 this.outboundBytes = 0;
33 this.pubSubBytes = 0;
34 }
35}
36exports.GoogleMetrics = GoogleMetrics;
37function defaultGcWorker(resources, services) {
38 return deleteResources(services, resources, log_1.log.gc);
39}
40exports.defaultGcWorker = defaultGcWorker;
41exports.defaults = {
42 ...provider_1.commonDefaults,
43 region: "us-central1",
44 googleCloudFunctionOptions: {},
45 _gcWorker: defaultGcWorker
46};
47exports.GoogleImpl = {
48 name: "google",
49 initialize,
50 defaults: exports.defaults,
51 cleanup,
52 costSnapshot,
53 logUrl,
54 invoke,
55 poll,
56 responseQueueId
57};
58async function initializeGoogleServices() {
59 const auth = await googleapis_1.google.auth.getClient({
60 scopes: ["https://www.googleapis.com/auth/cloud-platform"]
61 });
62 googleapis_1.google.options({
63 auth,
64 retryConfig: {
65 retry: 8,
66 retryDelay: 250,
67 noResponseRetries: 3,
68 shouldRetry: (0, google_shared_1.shouldRetryRequest)(log_1.log.info)
69 }
70 });
71 return {
72 cloudFunctions: googleapis_1.google.cloudfunctions("v1"),
73 pubsub: googleapis_1.google.pubsub("v1"),
74 cloudBilling: googleapis_1.google.cloudbilling("v1"),
75 google: googleapis_1.google
76 };
77}
78exports.initializeGoogleServices = initializeGoogleServices;
79async function defaultPollDelay(retries) {
80 if (retries > 5) {
81 await (0, shared_1.sleep)(5 * 1000);
82 }
83 await (0, shared_1.sleep)((retries + 1) * 500);
84}
85async function pollOperation({ request, checkDone, delay = defaultPollDelay, maxRetries = 50 }) {
86 let retries = 0;
87 await delay(retries);
88 while (true) {
89 log_1.log.info(`Polling...`);
90 const result = await request();
91 if (checkDone(result)) {
92 log_1.log.info(`Done.`);
93 return result;
94 }
95 if (retries++ >= maxRetries) {
96 throw new error_1.FaastError(`Timed out after ${retries} attempts.`);
97 }
98 await delay(retries);
99 }
100}
101async function quietly(promise) {
102 try {
103 const result = await promise;
104 return result.data;
105 }
106 catch (err) {
107 return;
108 }
109}
110const throttleGoogleWrite = (0, throttle_1.throttle)({
111 concurrency: 4,
112 rate: 3,
113 retry: (err, n) => {
114 const { message } = err;
115 return (n < 6 &&
116 (message.match(/Build failed/) !== null ||
117 message.match(/Quota/) !== null ||
118 message.match(/load attempt timed out/) !== null ||
119 message.match(/ECONNRESET/) !== null ||
120 message.match(/failed on loading user code/) != null));
121 }
122}, (op) => op());
123async function waitFor(api, response) {
124 return throttleGoogleWrite(async () => {
125 let operation;
126 try {
127 operation = await response();
128 }
129 catch (err) {
130 throw new error_1.FaastError(err, "could not get operation");
131 }
132 const operationName = operation.data.name;
133 try {
134 return pollOperation({
135 request: () => quietly(api.operations.get({ name: operationName })),
136 checkDone: result => {
137 /* istanbul ignore if */
138 if (!result) {
139 return false;
140 }
141 /* istanbul ignore if */
142 if (result.error) {
143 const underlying = new error_1.FaastError(result.error.message ?? undefined);
144 underlying.stack = "";
145 throw new error_1.FaastError(underlying, "Error polling operation");
146 }
147 return result.done || false;
148 }
149 });
150 }
151 catch (err) {
152 throw new error_1.FaastError(err, "poll operation failed");
153 }
154 });
155}
156async function deleteFunction(api, path) {
157 try {
158 return await waitFor(api, () => api.projects.locations.functions.delete({
159 name: path
160 }));
161 }
162 catch (err) {
163 if (err.message.match(/does not exist/)) {
164 return;
165 }
166 throw err;
167 }
168}
169async function initialize(fmodule, nonce, options) {
170 log_1.log.info(`Create google cloud function`);
171 const services = await initializeGoogleServices();
172 const project = await googleapis_1.google.auth.getProjectId();
173 const { cloudFunctions, pubsub } = services;
174 const { region } = options;
175 const location = `projects/${project}/locations/${region}`;
176 const functionName = "faast-" + nonce;
177 const { timeout } = options;
178 const { wrapperVerbose } = options.debugOptions;
179 async function createCodeBundle() {
180 const childProcessMemoryLimitMb = options.childProcessMemoryMb;
181 const wrapperOptions = {
182 childProcessTimeoutMs: Math.max(1000, (timeout - 5) * 1000),
183 wrapperVerbose,
184 childProcessMemoryLimitMb
185 };
186 const { archive } = await googlePacker(fmodule, options, wrapperOptions, functionName);
187 const uploadUrlResponse = await throttleGoogleWrite(() => cloudFunctions.projects.locations.functions.generateUploadUrl({
188 parent: location
189 }));
190 const uploadResult = await uploadZip(uploadUrlResponse.data.uploadUrl, archive);
191 log_1.log.info(`Upload zip file response: ${uploadResult?.statusText}`);
192 return uploadUrlResponse.data.uploadUrl;
193 }
194 const trampoline = `projects/${project}/locations/${region}/functions/${functionName}`;
195 const resources = {
196 trampoline,
197 region
198 };
199 const state = {
200 resources,
201 services,
202 project,
203 functionName,
204 metrics: new GoogleMetrics(),
205 options
206 };
207 const { gc, retentionInDays, _gcWorker: gcWorker } = options;
208 if (gc === "auto" || gc === "force") {
209 log_1.log.gc(`Starting garbage collector`);
210 state.gcPromise = collectGarbage(gcWorker, services, project, retentionInDays).catch(err => {
211 log_1.log.gc(`Garbage collection error: ${err}`);
212 });
213 }
214 const pricingPromise = getGoogleCloudFunctionsPricing(services.cloudBilling, region);
215 const { mode } = options;
216 const responseQueuePromise = (async () => {
217 const topic = await pubsub.projects.topics.create({
218 name: getResponseQueueTopic(project, functionName)
219 });
220 resources.responseQueueTopic = topic.data.name ?? undefined;
221 resources.responseSubscription = getResponseSubscription(project, functionName);
222 log_1.log.info(`Creating response queue subscription`);
223 await pubsub.projects.subscriptions.create({
224 name: resources.responseSubscription,
225 requestBody: {
226 topic: resources.responseQueueTopic
227 }
228 });
229 })();
230 let requestQueuePromise;
231 if (mode === "queue") {
232 log_1.log.info(`Initializing queue`);
233 resources.requestQueueTopic = getRequestQueueTopic(project, functionName);
234 requestQueuePromise = pubsub.projects.topics.create({
235 name: resources.requestQueueTopic
236 });
237 resources.requestSubscription = getRequestSubscription(project, functionName, region);
238 }
239 const sourceUploadUrl = await createCodeBundle();
240 const { memorySize, googleCloudFunctionOptions, env } = options;
241 if (!GoogleCloudFunctionsMemorySizes.find(size => size === memorySize)) {
242 log_1.log.warn(`Invalid memorySize ${memorySize} for Google Cloud Functions`);
243 }
244 const requestBody = {
245 name: trampoline,
246 entryPoint: "trampoline",
247 timeout: `${timeout}s`,
248 availableMemoryMb: memorySize,
249 sourceUploadUrl,
250 environmentVariables: env,
251 runtime: "nodejs14",
252 ...googleCloudFunctionOptions
253 };
254 if (mode === "queue") {
255 await requestQueuePromise;
256 requestBody.eventTrigger = {
257 eventType: "providers/cloud.pubsub/eventTypes/topic.publish",
258 resource: resources.requestQueueTopic
259 };
260 }
261 else {
262 requestBody.httpsTrigger = {};
263 }
264 log_1.log.info(`Create function at ${location}`);
265 log_1.log.info(`Request body: %O`, requestBody);
266 await (0, throttle_1.retryOp)(3, async () => {
267 try {
268 log_1.log.info(`create function ${requestBody.name} [${options.description}]`);
269 await waitFor(cloudFunctions, () => cloudFunctions.projects.locations.functions.create({
270 location,
271 requestBody
272 }));
273 await cloudFunctions.projects.locations.functions.setIamPolicy({
274 resource: trampoline,
275 requestBody: {
276 policy: {
277 bindings: [
278 {
279 members: ["allUsers"],
280 role: "roles/cloudfunctions.invoker"
281 }
282 ]
283 }
284 }
285 });
286 }
287 catch (err) {
288 /* istanbul ignore next */
289 await deleteFunction(cloudFunctions, trampoline).catch(() => { });
290 throw new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.ECREATE }, "failed to create google cloud function");
291 }
292 });
293 if (mode === "https" || mode === "auto") {
294 try {
295 const func = await cloudFunctions.projects.locations.functions.get({
296 name: trampoline
297 });
298 if (!func.data.httpsTrigger) {
299 throw new error_1.FaastError("Could not get http trigger url");
300 }
301 const { url } = func.data.httpsTrigger;
302 if (!url) {
303 throw new error_1.FaastError("Could not get http trigger url");
304 }
305 log_1.log.info(`Function URL: ${url}`);
306 state.url = url;
307 }
308 catch (err) {
309 throw new error_1.FaastError(err, `Could not get function ${trampoline} or its url, despite it being created`);
310 }
311 }
312 await pricingPromise;
313 await responseQueuePromise;
314 return state;
315}
316exports.initialize = initialize;
317function getRequestQueueTopic(project, functionName) {
318 return `projects/${project}/topics/${functionName}-Requests`;
319}
320function getResponseQueueTopic(project, functionName) {
321 return `projects/${project}/topics/${functionName}-Responses`;
322}
323exports.getResponseQueueTopic = getResponseQueueTopic;
324function getResponseSubscription(project, functionName) {
325 return `projects/${project}/subscriptions/${functionName}-Responses`;
326}
327exports.getResponseSubscription = getResponseSubscription;
328function getRequestSubscription(project, functionName, region) {
329 return `projects/${project}/subscriptions/gcf-${functionName}-${region}-${functionName}-Requests`;
330}
331exports.getRequestSubscription = getRequestSubscription;
332const agent = new https.Agent({ keepAlive: true, timeout: 0, maxSockets: 1000 });
333async function callFunctionHttps(url, call, metrics, cancel) {
334 const source = new abort_controller_1.AbortController();
335 try {
336 const axiosConfig = {
337 method: "POST",
338 url,
339 headers: { "Content-Type": "application/json" },
340 body: (0, serialize_1.serialize)(call),
341 signal: source.signal,
342 responseType: "json",
343 retry: false,
344 agent
345 };
346 const rawResponse = await Promise.race([
347 gaxios.request(axiosConfig),
348 cancel
349 ]);
350 if (!rawResponse) {
351 log_1.log.info(`cancelling gcp invoke`);
352 source.abort();
353 return;
354 }
355 try {
356 metrics.outboundBytes += (0, shared_1.computeHttpResponseBytes)(rawResponse.headers);
357 }
358 catch (err) {
359 throw new error_1.FaastError(err, `Could not parse ${util.inspect(rawResponse.data)}`);
360 }
361 }
362 catch (err) {
363 const { response } = err;
364 if (response) {
365 if (response.status === 503) {
366 throw new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.EMEMORY }, "google cloud function: possibly out of memory");
367 }
368 throw new error_1.FaastError(err, `when invoking google cloud function: %s\nDetails: %s`, response.statusText, response.data);
369 }
370 throw new error_1.FaastError(err, `when invoking google cloud function`);
371 }
372}
373async function invoke(state, call, cancel) {
374 const { options, resources, services, url, metrics } = state;
375 switch (options.mode) {
376 case "auto":
377 case "https":
378 return callFunctionHttps(url, call, metrics, cancel);
379 case "queue":
380 const { requestQueueTopic } = resources;
381 const { pubsub } = services;
382 const serialized = (0, serialize_1.serialize)(call);
383 return (0, google_queue_1.publishPubSub)(pubsub, requestQueueTopic, serialized);
384 }
385}
386function poll(state, cancel) {
387 return (0, google_queue_1.receiveMessages)(state.services.pubsub, state.resources.responseSubscription, state.metrics, cancel);
388}
389function responseQueueId(state) {
390 return state.resources.responseQueueTopic;
391}
392async function deleteResources(services, resources, output = log_1.log.info) {
393 const { trampoline, requestQueueTopic, requestSubscription, responseSubscription, responseQueueTopic, region, ...rest } = resources;
394 const _exhaustiveCheck = {};
395 const { cloudFunctions, pubsub } = services;
396 // We deliberately rethrow transient errors here, so only if all prior
397 // deletes succeed do we proceed. If there's a transient error then future
398 // garbage collection will clean up. The order is important; the function
399 // itself must be deleted last.
400 const check = async (request) => {
401 try {
402 await request;
403 }
404 catch (err) {
405 /* istanbul ignore next */
406 if (err.message.match(/Resource not found/)) {
407 return;
408 }
409 throw err;
410 }
411 };
412 if (responseSubscription) {
413 await check(pubsub.projects.subscriptions.delete({ subscription: responseSubscription }));
414 output(`Deleted response subscription: ${responseSubscription}`);
415 }
416 if (responseQueueTopic) {
417 await check(pubsub.projects.topics.delete({ topic: responseQueueTopic }));
418 output(`Deleted response queue topic: ${responseQueueTopic}`);
419 }
420 if (requestSubscription) {
421 await check(pubsub.projects.subscriptions.delete({ subscription: requestSubscription }));
422 output(`Deleted response subscription: ${requestSubscription}`);
423 }
424 if (requestQueueTopic) {
425 await check(pubsub.projects.topics.delete({ topic: requestQueueTopic }));
426 output(`Deleted request queue topic: ${requestQueueTopic}`);
427 }
428 if (trampoline) {
429 await check(deleteFunction(cloudFunctions, trampoline));
430 output(`Deleted function ${trampoline}`);
431 }
432}
433async function cleanup(state, options) {
434 log_1.log.info(`google cleanup starting.`);
435 if (state.gcPromise) {
436 log_1.log.info(`Waiting for garbage collection...`);
437 await state.gcPromise;
438 log_1.log.info(`Garbage collection done.`);
439 }
440 if (options.deleteResources) {
441 try {
442 await deleteResources(state.services, state.resources);
443 }
444 catch (err) {
445 throw new error_1.FaastError(err, "delete resources failed");
446 }
447 }
448 log_1.log.info(`google cleanup done.`);
449}
450exports.cleanup = cleanup;
451let garbageCollectorRunning = false;
452async function collectGarbage(gcWorker, services, proj, retentionInDays) {
453 if (gcWorker === defaultGcWorker) {
454 if (garbageCollectorRunning) {
455 return;
456 }
457 garbageCollectorRunning = true;
458 }
459 try {
460 const { cloudFunctions } = services;
461 let pageToken;
462 let promises = [];
463 const scheduleDeleteResources = (0, throttle_1.throttle)({
464 concurrency: 5,
465 rate: 5,
466 burst: 2
467 }, async (gServices, fn) => {
468 const { region, name, project } = parseFunctionName(fn.name);
469 const resources = {
470 region,
471 trampoline: fn.name,
472 requestQueueTopic: getRequestQueueTopic(project, name),
473 requestSubscription: getRequestSubscription(project, name, region),
474 responseQueueTopic: getResponseQueueTopic(project, name),
475 responseSubscription: getResponseSubscription(project, name)
476 };
477 await gcWorker(resources, gServices);
478 });
479 const fnPattern = new RegExp(`/functions/faast-${shared_1.uuidv4Pattern}$`);
480 do {
481 const funcListResponse = await cloudFunctions.projects.locations.functions.list({
482 parent: `projects/${proj}/locations/-`,
483 pageToken
484 });
485 pageToken = funcListResponse.data.nextPageToken ?? undefined;
486 const garbageFunctions = (funcListResponse.data.functions || [])
487 .filter(fn => (0, shared_1.hasExpired)(fn.updateTime, retentionInDays))
488 .filter(fn => fn.name.match(fnPattern));
489 promises = garbageFunctions.map(fn => scheduleDeleteResources(services, fn));
490 } while (pageToken);
491 await Promise.all(promises);
492 }
493 finally {
494 if (gcWorker === defaultGcWorker) {
495 garbageCollectorRunning = false;
496 }
497 }
498}
499function parseFunctionName(path) {
500 const match = path.match(/^projects\/(.*)\/locations\/(.*)\/functions\/(.*)$/);
501 return match && { project: match[1], region: match[2], name: match[3] };
502}
503async function uploadZip(url, zipStream) {
504 const config = {
505 method: "PUT",
506 url,
507 body: zipStream,
508 headers: {
509 "content-type": "application/zip",
510 "x-goog-content-length-range": "0,104857600"
511 }
512 };
513 return gaxios.request(config);
514}
515async function googlePacker(functionModule, options, wrapperOptions, FunctionName) {
516 const { mode } = options;
517 const trampolineModule = mode === "queue" ? googleTrampolineQueue : googleTrampolineHttps;
518 return (0, packer_1.packer)(trampolineModule, functionModule, options, wrapperOptions, FunctionName);
519}
520exports.googlePacker = googlePacker;
521let getGooglePrice;
522function ensureGooglePriceCache(cloudBilling) {
523 if (getGooglePrice) {
524 return;
525 }
526 getGooglePrice = (0, throttle_1.throttle)({
527 concurrency: 1,
528 rate: 3,
529 memoize: true,
530 cache: cache_1.caches.googlePrices
531 }, async (region, serviceName, description, conversionFactor) => {
532 try {
533 const skusResponse = await cloudBilling.services.skus.list({
534 parent: serviceName
535 });
536 const { skus = [] } = skusResponse.data;
537 const matchingSkus = skus.filter(sku => sku.description === description);
538 log_1.log.provider(`matching SKUs: ${util.inspect(matchingSkus, { depth: null })}`);
539 const regionOrGlobalSku = matchingSkus.find(sku => sku.serviceRegions.find(r => r === region)) ??
540 matchingSkus.find(sku => sku.serviceRegions.find(r => r === "global"));
541 const pexp = regionOrGlobalSku.pricingInfo[0].pricingExpression;
542 const prices = pexp.tieredRates.map(rate => Number(rate.unitPrice.units ?? "0") +
543 rate.unitPrice.nanos / 1e9);
544 const price = Math.max(...prices) *
545 (conversionFactor / pexp.baseUnitConversionFactor);
546 log_1.log.provider(`Found price for ${serviceName}, ${description}, ${region}: ${price}`);
547 return price;
548 }
549 catch (err) {
550 throw new error_1.FaastError(err, `failed to get google pricing for "${description}"`);
551 }
552 });
553}
554let googleServices;
555const listGoogleServices = (0, throttle_1.throttle)({ concurrency: 1 }, async (cloudBilling) => {
556 if (googleServices) {
557 return googleServices;
558 }
559 const response = await cloudBilling.services.list();
560 googleServices = response.data.services;
561 return googleServices;
562});
563async function getGoogleCloudFunctionsPricing(cloudBilling, region) {
564 const services = await listGoogleServices(cloudBilling);
565 ensureGooglePriceCache(cloudBilling);
566 const getPricing = (serviceName, description, conversionFactor = 1) => {
567 const service = services.find(s => s.displayName === serviceName);
568 return getGooglePrice(region, service.name, description, conversionFactor);
569 };
570 return {
571 perInvocation: await getPricing("Cloud Functions", "Invocations"),
572 perGhzSecond: await getPricing("Cloud Functions", "CPU Time"),
573 perGbSecond: await getPricing("Cloud Functions", "Memory Time", 2 ** 30),
574 perGbOutboundData: await getPricing("Cloud Functions", `Network Egress from ${region}`, 2 ** 30),
575 perGbPubSub: await getPricing("Cloud Pub/Sub", "Message Delivery Basic", 2 ** 30)
576 };
577}
578// https://cloud.google.com/functions/pricing
579const gcfProvisonableMemoryTable = {
580 128: 0.2,
581 256: 0.4,
582 512: 0.8,
583 1024: 1.4,
584 2048: 2.4
585};
586async function costSnapshot(state, stats) {
587 const costs = new cost_1.CostSnapshot("google", state.options, stats);
588 const { memorySize = exports.defaults.memorySize } = state.options;
589 const provisionableSizes = (0, shared_1.keysOf)(gcfProvisonableMemoryTable)
590 .map(n => Number(n))
591 .sort((a, b) => a - b);
592 const provisionedMb = provisionableSizes.find(size => memorySize <= size);
593 if (!provisionedMb) {
594 log_1.log.warn(`Could not determine provisioned memory or CPU for requested memory size ${memorySize}`);
595 }
596 const provisionedGhz = gcfProvisonableMemoryTable[provisionedMb];
597 const billedTimeStats = stats.estimatedBilledTime;
598 const seconds = (billedTimeStats.mean / 1000) * billedTimeStats.samples;
599 const { region } = state.resources;
600 const prices = await getGoogleCloudFunctionsPricing(state.services.cloudBilling, region);
601 const provisionedGb = provisionedMb / 1024;
602 const functionCallDuration = new cost_1.CostMetric({
603 name: "functionCallDuration",
604 pricing: prices.perGbSecond * provisionedGb + prices.perGhzSecond * provisionedGhz,
605 unit: "second",
606 measured: seconds,
607 comment: `https://cloud.google.com/functions/pricing#compute_time (${provisionedMb} MB, ${provisionedGhz} GHz)`
608 });
609 costs.push(functionCallDuration);
610 const functionCallRequests = new cost_1.CostMetric({
611 name: "functionCallRequests",
612 pricing: prices.perInvocation,
613 measured: stats.invocations,
614 unit: "request",
615 comment: "https://cloud.google.com/functions/pricing#invocations"
616 });
617 costs.push(functionCallRequests);
618 const outboundDataTransfer = new cost_1.CostMetric({
619 name: "outboundDataTransfer",
620 pricing: prices.perGbOutboundData,
621 measured: state.metrics.outboundBytes / 2 ** 30,
622 unit: "GB",
623 comment: "https://cloud.google.com/functions/pricing#networking"
624 });
625 costs.push(outboundDataTransfer);
626 const pubsub = new cost_1.CostMetric({
627 name: "pubsub",
628 pricing: prices.perGbPubSub,
629 measured: state.metrics.pubSubBytes / 2 ** 30,
630 unit: "GB",
631 comment: "https://cloud.google.com/pubsub/pricing"
632 });
633 costs.push(pubsub);
634 return costs;
635}
636function logUrl(state) {
637 const { project, functionName } = state;
638 return `https://console.cloud.google.com/logs/viewer?project=${project}&resource=cloud_function%2Ffunction_name%2F${functionName}`;
639}
640exports.logUrl = logUrl;
641//# sourceMappingURL=data:application/json;base64,
\No newline at end of file