UNPKG

95.8 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.logUrl = exports.googlePacker = exports.cleanup = exports.getRequestSubscription = exports.getResponseSubscription = exports.getResponseQueueTopic = exports.initialize = exports.initializeGoogleServices = exports.GoogleImpl = exports.defaults = exports.defaultGcWorker = exports.GoogleMetrics = void 0;
4const abort_controller_1 = require("abort-controller");
5const gaxios_1 = require("gaxios");
6const googleapis_1 = require("googleapis");
7const https = require("https");
8const util = require("util");
9const cache_1 = require("../cache");
10const cost_1 = require("../cost");
11const error_1 = require("../error");
12const log_1 = require("../log");
13const packer_1 = require("../packer");
14const provider_1 = require("../provider");
15const serialize_1 = require("../serialize");
16const shared_1 = require("../shared");
17const throttle_1 = require("../throttle");
18const google_queue_1 = require("./google-queue");
19const google_shared_1 = require("./google-shared");
20const googleTrampolineHttps = require("./google-trampoline-https");
21const googleTrampolineQueue = require("./google-trampoline-queue");
22const gaxios = new gaxios_1.Gaxios({
23 retryConfig: {
24 retry: 3,
25 noResponseRetries: 3,
26 shouldRetry: (0, google_shared_1.shouldRetryRequest)(log_1.log.info)
27 }
28});
29const GoogleCloudFunctionsMemorySizes = [128, 256, 512, 1024, 2048];
30class GoogleMetrics {
31 constructor() {
32 this.outboundBytes = 0;
33 this.pubSubBytes = 0;
34 }
35}
36exports.GoogleMetrics = GoogleMetrics;
37function defaultGcWorker(resources, services) {
38 return deleteResources(services, resources, log_1.log.gc);
39}
40exports.defaultGcWorker = defaultGcWorker;
41exports.defaults = {
42 ...provider_1.commonDefaults,
43 region: "us-central1",
44 googleCloudFunctionOptions: {},
45 _gcWorker: defaultGcWorker
46};
47exports.GoogleImpl = {
48 name: "google",
49 initialize,
50 defaults: exports.defaults,
51 cleanup,
52 costSnapshot,
53 logUrl,
54 invoke,
55 poll,
56 responseQueueId
57};
58async function initializeGoogleServices() {
59 const auth = await googleapis_1.google.auth.getClient({
60 scopes: ["https://www.googleapis.com/auth/cloud-platform"]
61 });
62 googleapis_1.google.options({
63 auth,
64 retryConfig: {
65 retry: 8,
66 retryDelay: 250,
67 noResponseRetries: 3,
68 shouldRetry: (0, google_shared_1.shouldRetryRequest)(log_1.log.info)
69 }
70 });
71 return {
72 cloudFunctions: googleapis_1.google.cloudfunctions("v1"),
73 pubsub: googleapis_1.google.pubsub("v1"),
74 cloudBilling: googleapis_1.google.cloudbilling("v1"),
75 google: googleapis_1.google
76 };
77}
78exports.initializeGoogleServices = initializeGoogleServices;
79async function defaultPollDelay(retries) {
80 if (retries > 5) {
81 await (0, shared_1.sleep)(5 * 1000);
82 }
83 await (0, shared_1.sleep)((retries + 1) * 500);
84}
85async function pollOperation({ request, checkDone, delay = defaultPollDelay, maxRetries = 50 }) {
86 let retries = 0;
87 await delay(retries);
88 while (true) {
89 log_1.log.info(`Polling...`);
90 const result = await request();
91 if (checkDone(result)) {
92 log_1.log.info(`Done.`);
93 return result;
94 }
95 if (retries++ >= maxRetries) {
96 throw new error_1.FaastError(`Timed out after ${retries} attempts.`);
97 }
98 await delay(retries);
99 }
100}
101async function quietly(promise) {
102 try {
103 const result = await promise;
104 return result.data;
105 }
106 catch (err) {
107 return;
108 }
109}
110const throttleGoogleWrite = (0, throttle_1.throttle)({
111 concurrency: 4,
112 rate: 3,
113 retry: (err, n) => {
114 const { message } = err;
115 return (n < 6 &&
116 (message.match(/Build failed/) !== null ||
117 message.match(/Quota/) !== null ||
118 message.match(/load attempt timed out/) !== null ||
119 message.match(/ECONNRESET/) !== null ||
120 message.match(/failed on loading user code/) != null));
121 }
122}, (op) => op());
123async function waitFor(api, response) {
124 return throttleGoogleWrite(async () => {
125 let operation;
126 try {
127 operation = await response();
128 }
129 catch (err) {
130 throw new error_1.FaastError(err, "could not get operation");
131 }
132 const operationName = operation.data.name;
133 try {
134 return pollOperation({
135 request: () => quietly(api.operations.get({ name: operationName })),
136 checkDone: result => {
137 /* istanbul ignore if */
138 if (!result) {
139 return false;
140 }
141 /* istanbul ignore if */
142 if (result.error) {
143 const underlying = new error_1.FaastError(result.error.message ?? undefined);
144 underlying.stack = "";
145 throw new error_1.FaastError(underlying, "Error polling operation");
146 }
147 return result.done || false;
148 }
149 });
150 }
151 catch (err) {
152 throw new error_1.FaastError(err, "poll operation failed");
153 }
154 });
155}
156async function deleteFunction(api, path) {
157 try {
158 return await waitFor(api, () => api.projects.locations.functions.delete({
159 name: path
160 }));
161 }
162 catch (err) {
163 if (err.message.match(/does not exist/)) {
164 return;
165 }
166 throw err;
167 }
168}
169async function initialize(fmodule, nonce, options) {
170 log_1.log.info(`Create google cloud function`);
171 const services = await initializeGoogleServices();
172 const project = await googleapis_1.google.auth.getProjectId();
173 const { cloudFunctions, pubsub } = services;
174 const { region } = options;
175 const location = `projects/${project}/locations/${region}`;
176 const functionName = "faast-" + nonce;
177 const { timeout } = options;
178 const { wrapperVerbose } = options.debugOptions;
179 async function createCodeBundle() {
180 const wrapperOptions = {
181 childProcessTimeoutMs: Math.max(1000, (timeout - 5) * 1000),
182 wrapperVerbose
183 };
184 const { archive } = await googlePacker(fmodule, options, wrapperOptions, functionName);
185 const uploadUrlResponse = await throttleGoogleWrite(() => cloudFunctions.projects.locations.functions.generateUploadUrl({
186 parent: location
187 }));
188 const uploadResult = await uploadZip(uploadUrlResponse.data.uploadUrl, archive);
189 log_1.log.info(`Upload zip file response: ${uploadResult?.statusText}`);
190 return uploadUrlResponse.data.uploadUrl;
191 }
192 const trampoline = `projects/${project}/locations/${region}/functions/${functionName}`;
193 const resources = {
194 trampoline,
195 region
196 };
197 const state = {
198 resources,
199 services,
200 project,
201 functionName,
202 metrics: new GoogleMetrics(),
203 options
204 };
205 const { gc, retentionInDays, _gcWorker: gcWorker } = options;
206 if (gc === "auto" || gc === "force") {
207 log_1.log.gc(`Starting garbage collector`);
208 state.gcPromise = collectGarbage(gcWorker, services, project, retentionInDays).catch(err => {
209 log_1.log.gc(`Garbage collection error: ${err}`);
210 });
211 }
212 const pricingPromise = getGoogleCloudFunctionsPricing(services.cloudBilling, region);
213 const { mode } = options;
214 const responseQueuePromise = (async () => {
215 const topic = await pubsub.projects.topics.create({
216 name: getResponseQueueTopic(project, functionName)
217 });
218 resources.responseQueueTopic = topic.data.name ?? undefined;
219 resources.responseSubscription = getResponseSubscription(project, functionName);
220 log_1.log.info(`Creating response queue subscription`);
221 await pubsub.projects.subscriptions.create({
222 name: resources.responseSubscription,
223 requestBody: {
224 topic: resources.responseQueueTopic
225 }
226 });
227 })();
228 let requestQueuePromise;
229 if (mode === "queue") {
230 log_1.log.info(`Initializing queue`);
231 resources.requestQueueTopic = getRequestQueueTopic(project, functionName);
232 requestQueuePromise = pubsub.projects.topics.create({
233 name: resources.requestQueueTopic
234 });
235 resources.requestSubscription = getRequestSubscription(project, functionName, region);
236 }
237 const sourceUploadUrl = await createCodeBundle();
238 const { memorySize, googleCloudFunctionOptions, env } = options;
239 if (!GoogleCloudFunctionsMemorySizes.find(size => size === memorySize)) {
240 log_1.log.warn(`Invalid memorySize ${memorySize} for Google Cloud Functions`);
241 }
242 const requestBody = {
243 name: trampoline,
244 entryPoint: "trampoline",
245 timeout: `${timeout}s`,
246 availableMemoryMb: memorySize,
247 sourceUploadUrl,
248 environmentVariables: env,
249 runtime: "nodejs14",
250 ...googleCloudFunctionOptions
251 };
252 if (mode === "queue") {
253 await requestQueuePromise;
254 requestBody.eventTrigger = {
255 eventType: "providers/cloud.pubsub/eventTypes/topic.publish",
256 resource: resources.requestQueueTopic
257 };
258 }
259 else {
260 requestBody.httpsTrigger = {};
261 }
262 log_1.log.info(`Create function at ${location}`);
263 log_1.log.info(`Request body: %O`, requestBody);
264 await (0, throttle_1.retryOp)(3, async () => {
265 try {
266 log_1.log.info(`create function ${requestBody.name} [${options.description}]`);
267 await waitFor(cloudFunctions, () => cloudFunctions.projects.locations.functions.create({
268 location,
269 requestBody
270 }));
271 await cloudFunctions.projects.locations.functions.setIamPolicy({
272 resource: trampoline,
273 requestBody: {
274 policy: {
275 bindings: [
276 {
277 members: ["allUsers"],
278 role: "roles/cloudfunctions.invoker"
279 }
280 ]
281 }
282 }
283 });
284 }
285 catch (err) {
286 /* istanbul ignore next */
287 await deleteFunction(cloudFunctions, trampoline).catch(() => { });
288 throw new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.ECREATE }, "failed to create google cloud function");
289 }
290 });
291 if (mode === "https" || mode === "auto") {
292 try {
293 const func = await cloudFunctions.projects.locations.functions.get({
294 name: trampoline
295 });
296 if (!func.data.httpsTrigger) {
297 throw new error_1.FaastError("Could not get http trigger url");
298 }
299 const { url } = func.data.httpsTrigger;
300 if (!url) {
301 throw new error_1.FaastError("Could not get http trigger url");
302 }
303 log_1.log.info(`Function URL: ${url}`);
304 state.url = url;
305 }
306 catch (err) {
307 throw new error_1.FaastError(err, `Could not get function ${trampoline} or its url, despite it being created`);
308 }
309 }
310 await pricingPromise;
311 await responseQueuePromise;
312 return state;
313}
314exports.initialize = initialize;
315function getRequestQueueTopic(project, functionName) {
316 return `projects/${project}/topics/${functionName}-Requests`;
317}
318function getResponseQueueTopic(project, functionName) {
319 return `projects/${project}/topics/${functionName}-Responses`;
320}
321exports.getResponseQueueTopic = getResponseQueueTopic;
322function getResponseSubscription(project, functionName) {
323 return `projects/${project}/subscriptions/${functionName}-Responses`;
324}
325exports.getResponseSubscription = getResponseSubscription;
326function getRequestSubscription(project, functionName, region) {
327 return `projects/${project}/subscriptions/gcf-${functionName}-${region}-${functionName}-Requests`;
328}
329exports.getRequestSubscription = getRequestSubscription;
330const agent = new https.Agent({ keepAlive: true, timeout: 0, maxSockets: 1000 });
331async function callFunctionHttps(url, call, metrics, cancel) {
332 const source = new abort_controller_1.AbortController();
333 try {
334 const axiosConfig = {
335 method: "POST",
336 url,
337 headers: { "Content-Type": "application/json" },
338 body: (0, serialize_1.serialize)(call),
339 signal: source.signal,
340 responseType: "json",
341 retry: false,
342 agent
343 };
344 const rawResponse = await Promise.race([
345 gaxios.request(axiosConfig),
346 cancel
347 ]);
348 if (!rawResponse) {
349 log_1.log.info(`cancelling gcp invoke`);
350 source.abort();
351 return;
352 }
353 try {
354 metrics.outboundBytes += (0, shared_1.computeHttpResponseBytes)(rawResponse.headers);
355 }
356 catch (err) {
357 throw new error_1.FaastError(err, `Could not parse ${util.inspect(rawResponse.data)}`);
358 }
359 }
360 catch (err) {
361 const { response } = err;
362 if (response) {
363 if (response.status === 503) {
364 throw new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.EMEMORY }, "google cloud function: possibly out of memory");
365 }
366 throw new error_1.FaastError(err, `when invoking google cloud function: %s\nDetails: %s`, response.statusText, response.data);
367 }
368 throw new error_1.FaastError(err, `when invoking google cloud function`);
369 }
370}
371async function invoke(state, call, cancel) {
372 const { options, resources, services, url, metrics } = state;
373 switch (options.mode) {
374 case "auto":
375 case "https":
376 return callFunctionHttps(url, call, metrics, cancel);
377 case "queue":
378 const { requestQueueTopic } = resources;
379 const { pubsub } = services;
380 const serialized = (0, serialize_1.serialize)(call);
381 return (0, google_queue_1.publishPubSub)(pubsub, requestQueueTopic, serialized);
382 }
383}
384function poll(state, cancel) {
385 return (0, google_queue_1.receiveMessages)(state.services.pubsub, state.resources.responseSubscription, state.metrics, cancel);
386}
387function responseQueueId(state) {
388 return state.resources.responseQueueTopic;
389}
390async function deleteResources(services, resources, output = log_1.log.info) {
391 const { trampoline, requestQueueTopic, requestSubscription, responseSubscription, responseQueueTopic, region, ...rest } = resources;
392 const _exhaustiveCheck = {};
393 const { cloudFunctions, pubsub } = services;
394 // We deliberately rethrow transient errors here, so only if all prior
395 // deletes succeed do we proceed. If there's a transient error then future
396 // garbage collection will clean up. The order is important; the function
397 // itself must be deleted last.
398 const check = async (request) => {
399 try {
400 await request;
401 }
402 catch (err) {
403 /* istanbul ignore next */
404 if (err.message.match(/Resource not found/)) {
405 return;
406 }
407 throw err;
408 }
409 };
410 if (responseSubscription) {
411 await check(pubsub.projects.subscriptions.delete({ subscription: responseSubscription }));
412 output(`Deleted response subscription: ${responseSubscription}`);
413 }
414 if (responseQueueTopic) {
415 await check(pubsub.projects.topics.delete({ topic: responseQueueTopic }));
416 output(`Deleted response queue topic: ${responseQueueTopic}`);
417 }
418 if (requestSubscription) {
419 await check(pubsub.projects.subscriptions.delete({ subscription: requestSubscription }));
420 output(`Deleted response subscription: ${requestSubscription}`);
421 }
422 if (requestQueueTopic) {
423 await check(pubsub.projects.topics.delete({ topic: requestQueueTopic }));
424 output(`Deleted request queue topic: ${requestQueueTopic}`);
425 }
426 if (trampoline) {
427 await check(deleteFunction(cloudFunctions, trampoline));
428 output(`Deleted function ${trampoline}`);
429 }
430}
431async function cleanup(state, options) {
432 log_1.log.info(`google cleanup starting.`);
433 if (state.gcPromise) {
434 log_1.log.info(`Waiting for garbage collection...`);
435 await state.gcPromise;
436 log_1.log.info(`Garbage collection done.`);
437 }
438 if (options.deleteResources) {
439 try {
440 await deleteResources(state.services, state.resources);
441 }
442 catch (err) {
443 throw new error_1.FaastError(err, "delete resources failed");
444 }
445 }
446 log_1.log.info(`google cleanup done.`);
447}
448exports.cleanup = cleanup;
449let garbageCollectorRunning = false;
450async function collectGarbage(gcWorker, services, proj, retentionInDays) {
451 if (gcWorker === defaultGcWorker) {
452 if (garbageCollectorRunning) {
453 return;
454 }
455 garbageCollectorRunning = true;
456 }
457 try {
458 const { cloudFunctions } = services;
459 let pageToken;
460 let promises = [];
461 const scheduleDeleteResources = (0, throttle_1.throttle)({
462 concurrency: 5,
463 rate: 5,
464 burst: 2
465 }, async (gServices, fn) => {
466 const { region, name, project } = parseFunctionName(fn.name);
467 const resources = {
468 region,
469 trampoline: fn.name,
470 requestQueueTopic: getRequestQueueTopic(project, name),
471 requestSubscription: getRequestSubscription(project, name, region),
472 responseQueueTopic: getResponseQueueTopic(project, name),
473 responseSubscription: getResponseSubscription(project, name)
474 };
475 await gcWorker(resources, gServices);
476 });
477 const fnPattern = new RegExp(`/functions/faast-${shared_1.uuidv4Pattern}$`);
478 do {
479 const funcListResponse = await cloudFunctions.projects.locations.functions.list({
480 parent: `projects/${proj}/locations/-`,
481 pageToken
482 });
483 pageToken = funcListResponse.data.nextPageToken ?? undefined;
484 const garbageFunctions = (funcListResponse.data.functions || [])
485 .filter(fn => (0, shared_1.hasExpired)(fn.updateTime, retentionInDays))
486 .filter(fn => fn.name.match(fnPattern));
487 promises = garbageFunctions.map(fn => scheduleDeleteResources(services, fn));
488 } while (pageToken);
489 await Promise.all(promises);
490 }
491 finally {
492 if (gcWorker === defaultGcWorker) {
493 garbageCollectorRunning = false;
494 }
495 }
496}
497function parseFunctionName(path) {
498 const match = path.match(/^projects\/(.*)\/locations\/(.*)\/functions\/(.*)$/);
499 return match && { project: match[1], region: match[2], name: match[3] };
500}
501async function uploadZip(url, zipStream) {
502 const config = {
503 method: "PUT",
504 url,
505 body: zipStream,
506 headers: {
507 "content-type": "application/zip",
508 "x-goog-content-length-range": "0,104857600"
509 }
510 };
511 return gaxios.request(config);
512}
513async function googlePacker(functionModule, options, wrapperOptions, FunctionName) {
514 const { mode } = options;
515 const trampolineModule = mode === "queue" ? googleTrampolineQueue : googleTrampolineHttps;
516 return (0, packer_1.packer)(trampolineModule, functionModule, options, wrapperOptions, FunctionName);
517}
518exports.googlePacker = googlePacker;
519let getGooglePrice;
520function ensureGooglePriceCache(cloudBilling) {
521 if (getGooglePrice) {
522 return;
523 }
524 getGooglePrice = (0, throttle_1.throttle)({
525 concurrency: 1,
526 rate: 3,
527 memoize: true,
528 cache: cache_1.caches.googlePrices
529 }, async (region, serviceName, description, conversionFactor) => {
530 try {
531 const skusResponse = await cloudBilling.services.skus.list({
532 parent: serviceName
533 });
534 const { skus = [] } = skusResponse.data;
535 const matchingSkus = skus.filter(sku => sku.description === description);
536 log_1.log.provider(`matching SKUs: ${util.inspect(matchingSkus, { depth: null })}`);
537 const regionOrGlobalSku = matchingSkus.find(sku => sku.serviceRegions.find(r => r === region)) ??
538 matchingSkus.find(sku => sku.serviceRegions.find(r => r === "global"));
539 const pexp = regionOrGlobalSku.pricingInfo[0].pricingExpression;
540 const prices = pexp.tieredRates.map(rate => Number(rate.unitPrice.units ?? "0") +
541 rate.unitPrice.nanos / 1e9);
542 const price = Math.max(...prices) *
543 (conversionFactor / pexp.baseUnitConversionFactor);
544 log_1.log.provider(`Found price for ${serviceName}, ${description}, ${region}: ${price}`);
545 return price;
546 }
547 catch (err) {
548 throw new error_1.FaastError(err, `failed to get google pricing for "${description}"`);
549 }
550 });
551}
552let googleServices;
553const listGoogleServices = (0, throttle_1.throttle)({ concurrency: 1 }, async (cloudBilling) => {
554 if (googleServices) {
555 return googleServices;
556 }
557 const response = await cloudBilling.services.list();
558 googleServices = response.data.services;
559 return googleServices;
560});
561async function getGoogleCloudFunctionsPricing(cloudBilling, region) {
562 const services = await listGoogleServices(cloudBilling);
563 ensureGooglePriceCache(cloudBilling);
564 const getPricing = (serviceName, description, conversionFactor = 1) => {
565 const service = services.find(s => s.displayName === serviceName);
566 return getGooglePrice(region, service.name, description, conversionFactor);
567 };
568 return {
569 perInvocation: await getPricing("Cloud Functions", "Invocations"),
570 perGhzSecond: await getPricing("Cloud Functions", "CPU Time"),
571 perGbSecond: await getPricing("Cloud Functions", "Memory Time", 2 ** 30),
572 perGbOutboundData: await getPricing("Cloud Functions", `Network Egress from ${region}`, 2 ** 30),
573 perGbPubSub: await getPricing("Cloud Pub/Sub", "Message Delivery Basic", 2 ** 30)
574 };
575}
576// https://cloud.google.com/functions/pricing
577const gcfProvisonableMemoryTable = {
578 128: 0.2,
579 256: 0.4,
580 512: 0.8,
581 1024: 1.4,
582 2048: 2.4
583};
584async function costSnapshot(state, stats) {
585 const costs = new cost_1.CostSnapshot("google", state.options, stats);
586 const { memorySize = exports.defaults.memorySize } = state.options;
587 const provisionableSizes = (0, shared_1.keysOf)(gcfProvisonableMemoryTable)
588 .map(n => Number(n))
589 .sort((a, b) => a - b);
590 const provisionedMb = provisionableSizes.find(size => memorySize <= size);
591 if (!provisionedMb) {
592 log_1.log.warn(`Could not determine provisioned memory or CPU for requested memory size ${memorySize}`);
593 }
594 const provisionedGhz = gcfProvisonableMemoryTable[provisionedMb];
595 const billedTimeStats = stats.estimatedBilledTime;
596 const seconds = (billedTimeStats.mean / 1000) * billedTimeStats.samples;
597 const { region } = state.resources;
598 const prices = await getGoogleCloudFunctionsPricing(state.services.cloudBilling, region);
599 const provisionedGb = provisionedMb / 1024;
600 const functionCallDuration = new cost_1.CostMetric({
601 name: "functionCallDuration",
602 pricing: prices.perGbSecond * provisionedGb + prices.perGhzSecond * provisionedGhz,
603 unit: "second",
604 measured: seconds,
605 comment: `https://cloud.google.com/functions/pricing#compute_time (${provisionedMb} MB, ${provisionedGhz} GHz)`
606 });
607 costs.push(functionCallDuration);
608 const functionCallRequests = new cost_1.CostMetric({
609 name: "functionCallRequests",
610 pricing: prices.perInvocation,
611 measured: stats.invocations,
612 unit: "request",
613 comment: "https://cloud.google.com/functions/pricing#invocations"
614 });
615 costs.push(functionCallRequests);
616 const outboundDataTransfer = new cost_1.CostMetric({
617 name: "outboundDataTransfer",
618 pricing: prices.perGbOutboundData,
619 measured: state.metrics.outboundBytes / 2 ** 30,
620 unit: "GB",
621 comment: "https://cloud.google.com/functions/pricing#networking"
622 });
623 costs.push(outboundDataTransfer);
624 const pubsub = new cost_1.CostMetric({
625 name: "pubsub",
626 pricing: prices.perGbPubSub,
627 measured: state.metrics.pubSubBytes / 2 ** 30,
628 unit: "GB",
629 comment: "https://cloud.google.com/pubsub/pricing"
630 });
631 costs.push(pubsub);
632 return costs;
633}
634function logUrl(state) {
635 const { project, functionName } = state;
636 return `https://console.cloud.google.com/logs/viewer?project=${project}&resource=cloud_function%2Ffunction_name%2F${functionName}`;
637}
638exports.logUrl = logUrl;
639//# sourceMappingURL=data:application/json;base64,
\No newline at end of file