UNPKG

23.9 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.receiveMessages = exports.processAwsErrorMessage = exports.createSQSQueue = exports.publishFunctionCallMessage = exports.sendResponseQueueMessage = exports.createSNSTopic = void 0;
4const error_1 = require("../error");
5const log_1 = require("../log");
6const serialize_1 = require("../serialize");
7const shared_1 = require("../shared");
8const throttle_1 = require("../throttle");
9const wrapper_1 = require("../wrapper");
10async function createSNSTopic(sns, Name) {
11 const topic = await sns.createTopic({ Name }).promise();
12 return topic.TopicArn;
13}
14exports.createSNSTopic = createSNSTopic;
15function countRequests(bytes) {
16 return Math.ceil(bytes / (64 * 1024));
17}
18async function sendResponseQueueMessage(sqs, QueueUrl, message) {
19 try {
20 const request = { QueueUrl, MessageBody: (0, serialize_1.serialize)(message) };
21 await sqs.sendMessage(request).promise();
22 }
23 catch (err) {
24 log_1.log.warn(err);
25 }
26}
27exports.sendResponseQueueMessage = sendResponseQueueMessage;
28function publishFunctionCallMessage(sns, TopicArn, message, metrics) {
29 const serialized = (0, serialize_1.serialize)(message);
30 metrics.sns64kRequests += countRequests(serialized.length);
31 return (0, throttle_1.retryOp)((err, n) => n < 6 && err?.message?.match(/does not exist/), () => sns
32 .publish({
33 TopicArn,
34 Message: serialized
35 })
36 .promise());
37}
38exports.publishFunctionCallMessage = publishFunctionCallMessage;
39async function createSQSQueue(QueueName, VTimeout, sqs) {
40 try {
41 const createQueueRequest = {
42 QueueName,
43 Attributes: {
44 VisibilityTimeout: `${VTimeout}`
45 }
46 };
47 const response = await sqs.createQueue(createQueueRequest).promise();
48 const QueueUrl = response.QueueUrl;
49 const arnResponse = await sqs
50 .getQueueAttributes({ QueueUrl, AttributeNames: ["QueueArn"] })
51 .promise();
52 const QueueArn = arnResponse.Attributes?.QueueArn;
53 return { QueueUrl, QueueArn };
54 }
55 catch (err) {
56 throw new error_1.FaastError(err, "create sqs queue");
57 }
58}
59exports.createSQSQueue = createSQSQueue;
60/* istanbul ignore next */
61function processAwsErrorMessage(message) {
62 let err = new error_1.FaastError(message);
63 if (message?.match(/Process exited before completing/) ||
64 message?.match(/signal: killed/)) {
65 err = new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.EMEMORY }, "possibly out of memory");
66 }
67 else if (message?.match(/time/)) {
68 err = new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.ETIMEOUT }, "timeout");
69 }
70 else if (message?.match(/EventAgeExceeded/)) {
71 err = new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.ECONCURRENCY }, "concurrency limit exceeded");
72 }
73 return err;
74}
75exports.processAwsErrorMessage = processAwsErrorMessage;
76async function receiveMessages(sqs, ResponseQueueUrl, metrics, cancel) {
77 try {
78 const MaxNumberOfMessages = 10;
79 const request = sqs.receiveMessage({
80 QueueUrl: ResponseQueueUrl,
81 WaitTimeSeconds: 20,
82 MaxNumberOfMessages,
83 MessageAttributeNames: ["All"],
84 AttributeNames: ["SentTimestamp"]
85 });
86 const response = await Promise.race([request.promise(), cancel]);
87 if (!response) {
88 request.abort();
89 return { Messages: [] };
90 }
91 const { Messages = [] } = response;
92 const { httpResponse } = response.$response;
93 const receivedBytes = (0, shared_1.computeHttpResponseBytes)(httpResponse.headers);
94 metrics.outboundBytes += receivedBytes;
95 const inferredSqsRequestsReceived = countRequests(receivedBytes);
96 const inferredSqsRequestsSent = (0, shared_1.sum)(Messages.map(m => countRequests(m.Body?.length ?? 1)));
97 metrics.sqs64kRequests += inferredSqsRequestsSent + inferredSqsRequestsReceived;
98 if (Messages.length > 0) {
99 sqs.deleteMessageBatch({
100 QueueUrl: ResponseQueueUrl,
101 Entries: Messages.map(m => ({
102 Id: m.MessageId,
103 ReceiptHandle: m.ReceiptHandle
104 }))
105 })
106 .promise()
107 .catch(_ => { });
108 metrics.sqs64kRequests++;
109 }
110 return {
111 Messages: Messages.map(processIncomingQueueMessage).filter(shared_1.defined),
112 isFullMessageBatch: Messages.length === MaxNumberOfMessages
113 };
114 }
115 catch (err) {
116 throw new error_1.FaastError(err, "receiveMessages");
117 }
118}
119exports.receiveMessages = receiveMessages;
120function processIncomingQueueMessage(m) {
121 // AWS Lambda Destinations
122 // (https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/)
123 // are used to route failures to the response queue. These
124 // messages are generated by AWS Lambda and are constrained to the format it
125 // provides.
126 const raw = (0, serialize_1.deserialize)(m.Body);
127 if (raw.responseContext) {
128 const message = raw;
129 const snsMessage = message.requestPayload;
130 const record = snsMessage.Records[0];
131 const sCall = (0, serialize_1.deserialize)(record.Sns.Message);
132 let error;
133 const destinationError = message.responsePayload;
134 if (destinationError) {
135 error = processAwsErrorMessage(destinationError.errorMessage);
136 error.stack = destinationError.stackTrace?.join("\n");
137 }
138 else {
139 error = processAwsErrorMessage(message.requestContext.condition);
140 }
141 const executionId = message.requestContext.requestId;
142 return {
143 ...(0, wrapper_1.createErrorResponse)(error, {
144 call: sCall,
145 startTime: new Date(record.Sns.Timestamp).getTime(),
146 executionId
147 }),
148 timestamp: new Date(message.timestamp).getTime()
149 };
150 }
151 else {
152 const message = raw;
153 switch (message.kind) {
154 case "promise":
155 case "iterator":
156 message.timestamp = Number(m.Attributes.SentTimestamp);
157 break;
158 case "cpumetrics":
159 break;
160 case "functionstarted":
161 break;
162 default: {
163 console.warn(`Unknown message received from response queue`);
164 }
165 }
166 return raw;
167 }
168}
169//# sourceMappingURL=data:application/json;base64,
\No newline at end of file