1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.receiveMessages = exports.processAwsErrorMessage = exports.createSQSQueue = exports.publishFunctionCallMessage = exports.sendResponseQueueMessage = exports.createSNSTopic = void 0;
|
4 | const error_1 = require("../error");
|
5 | const log_1 = require("../log");
|
6 | const serialize_1 = require("../serialize");
|
7 | const shared_1 = require("../shared");
|
8 | const throttle_1 = require("../throttle");
|
9 | const wrapper_1 = require("../wrapper");
|
10 | async function createSNSTopic(sns, Name) {
|
11 | const topic = await sns.createTopic({ Name }).promise();
|
12 | return topic.TopicArn;
|
13 | }
|
14 | exports.createSNSTopic = createSNSTopic;
|
15 | function countRequests(bytes) {
|
16 | return Math.ceil(bytes / (64 * 1024));
|
17 | }
|
18 | async function sendResponseQueueMessage(sqs, QueueUrl, message) {
|
19 | try {
|
20 | const request = { QueueUrl, MessageBody: (0, serialize_1.serialize)(message) };
|
21 | await sqs.sendMessage(request).promise();
|
22 | }
|
23 | catch (err) {
|
24 | log_1.log.warn(err);
|
25 | }
|
26 | }
|
27 | exports.sendResponseQueueMessage = sendResponseQueueMessage;
|
28 | function publishFunctionCallMessage(sns, TopicArn, message, metrics) {
|
29 | const serialized = (0, serialize_1.serialize)(message);
|
30 | metrics.sns64kRequests += countRequests(serialized.length);
|
31 | return (0, throttle_1.retryOp)((err, n) => n < 6 && err?.message?.match(/does not exist/), () => sns
|
32 | .publish({
|
33 | TopicArn,
|
34 | Message: serialized
|
35 | })
|
36 | .promise());
|
37 | }
|
38 | exports.publishFunctionCallMessage = publishFunctionCallMessage;
|
39 | async function createSQSQueue(QueueName, VTimeout, sqs) {
|
40 | try {
|
41 | const createQueueRequest = {
|
42 | QueueName,
|
43 | Attributes: {
|
44 | VisibilityTimeout: `${VTimeout}`
|
45 | }
|
46 | };
|
47 | const response = await sqs.createQueue(createQueueRequest).promise();
|
48 | const QueueUrl = response.QueueUrl;
|
49 | const arnResponse = await sqs
|
50 | .getQueueAttributes({ QueueUrl, AttributeNames: ["QueueArn"] })
|
51 | .promise();
|
52 | const QueueArn = arnResponse.Attributes?.QueueArn;
|
53 | return { QueueUrl, QueueArn };
|
54 | }
|
55 | catch (err) {
|
56 | throw new error_1.FaastError(err, "create sqs queue");
|
57 | }
|
58 | }
|
59 | exports.createSQSQueue = createSQSQueue;
|
60 | /* istanbul ignore next */
|
61 | function processAwsErrorMessage(message) {
|
62 | let err = new error_1.FaastError(message);
|
63 | if (message?.match(/Process exited before completing/) ||
|
64 | message?.match(/signal: killed/)) {
|
65 | err = new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.EMEMORY }, "possibly out of memory");
|
66 | }
|
67 | else if (message?.match(/time/)) {
|
68 | err = new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.ETIMEOUT }, "timeout");
|
69 | }
|
70 | else if (message?.match(/EventAgeExceeded/)) {
|
71 | err = new error_1.FaastError({ cause: err, name: error_1.FaastErrorNames.ECONCURRENCY }, "concurrency limit exceeded");
|
72 | }
|
73 | return err;
|
74 | }
|
75 | exports.processAwsErrorMessage = processAwsErrorMessage;
|
76 | async function receiveMessages(sqs, ResponseQueueUrl, metrics, cancel) {
|
77 | try {
|
78 | const MaxNumberOfMessages = 10;
|
79 | const request = sqs.receiveMessage({
|
80 | QueueUrl: ResponseQueueUrl,
|
81 | WaitTimeSeconds: 20,
|
82 | MaxNumberOfMessages,
|
83 | MessageAttributeNames: ["All"],
|
84 | AttributeNames: ["SentTimestamp"]
|
85 | });
|
86 | const response = await Promise.race([request.promise(), cancel]);
|
87 | if (!response) {
|
88 | request.abort();
|
89 | return { Messages: [] };
|
90 | }
|
91 | const { Messages = [] } = response;
|
92 | const { httpResponse } = response.$response;
|
93 | const receivedBytes = (0, shared_1.computeHttpResponseBytes)(httpResponse.headers);
|
94 | metrics.outboundBytes += receivedBytes;
|
95 | const inferredSqsRequestsReceived = countRequests(receivedBytes);
|
96 | const inferredSqsRequestsSent = (0, shared_1.sum)(Messages.map(m => countRequests(m.Body?.length ?? 1)));
|
97 | metrics.sqs64kRequests += inferredSqsRequestsSent + inferredSqsRequestsReceived;
|
98 | if (Messages.length > 0) {
|
99 | sqs.deleteMessageBatch({
|
100 | QueueUrl: ResponseQueueUrl,
|
101 | Entries: Messages.map(m => ({
|
102 | Id: m.MessageId,
|
103 | ReceiptHandle: m.ReceiptHandle
|
104 | }))
|
105 | })
|
106 | .promise()
|
107 | .catch(_ => { });
|
108 | metrics.sqs64kRequests++;
|
109 | }
|
110 | return {
|
111 | Messages: Messages.map(processIncomingQueueMessage).filter(shared_1.defined),
|
112 | isFullMessageBatch: Messages.length === MaxNumberOfMessages
|
113 | };
|
114 | }
|
115 | catch (err) {
|
116 | throw new error_1.FaastError(err, "receiveMessages");
|
117 | }
|
118 | }
|
119 | exports.receiveMessages = receiveMessages;
|
120 | function processIncomingQueueMessage(m) {
|
121 | // AWS Lambda Destinations
|
122 | // (https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/)
|
123 | // are used to route failures to the response queue. These
|
124 | // messages are generated by AWS Lambda and are constrained to the format it
|
125 | // provides.
|
126 | const raw = (0, serialize_1.deserialize)(m.Body);
|
127 | if (raw.responseContext) {
|
128 | const message = raw;
|
129 | const snsMessage = message.requestPayload;
|
130 | const record = snsMessage.Records[0];
|
131 | const sCall = (0, serialize_1.deserialize)(record.Sns.Message);
|
132 | let error;
|
133 | const destinationError = message.responsePayload;
|
134 | if (destinationError) {
|
135 | error = processAwsErrorMessage(destinationError.errorMessage);
|
136 | error.stack = destinationError.stackTrace?.join("\n");
|
137 | }
|
138 | else {
|
139 | error = processAwsErrorMessage(message.requestContext.condition);
|
140 | }
|
141 | const executionId = message.requestContext.requestId;
|
142 | return {
|
143 | ...(0, wrapper_1.createErrorResponse)(error, {
|
144 | call: sCall,
|
145 | startTime: new Date(record.Sns.Timestamp).getTime(),
|
146 | executionId
|
147 | }),
|
148 | timestamp: new Date(message.timestamp).getTime()
|
149 | };
|
150 | }
|
151 | else {
|
152 | const message = raw;
|
153 | switch (message.kind) {
|
154 | case "promise":
|
155 | case "iterator":
|
156 | message.timestamp = Number(m.Attributes.SentTimestamp);
|
157 | break;
|
158 | case "cpumetrics":
|
159 | break;
|
160 | case "functionstarted":
|
161 | break;
|
162 | default: {
|
163 | console.warn(`Unknown message received from response queue`);
|
164 | }
|
165 | }
|
166 | return raw;
|
167 | }
|
168 | }
|
169 | //# sourceMappingURL=data:application/json;base64,{"version":3,"file":"aws-queue.js","sourceRoot":"","sources":["../../../src/aws/aws-queue.ts"],"names":[],"mappings":";;;AAEA,oCAAuD;AACvD,gCAA6B;AAE7B,4CAAsD;AACtD,sCAAmE;AACnE,0CAAsC;AACtC,wCAA+D;AAGxD,KAAK,UAAU,cAAc,CAAC,GAAQ,EAAE,IAAY;IACvD,MAAM,KAAK,GAAG,MAAM,GAAG,CAAC,WAAW,CAAC,EAAE,IAAI,EAAE,CAAC,CAAC,OAAO,EAAE,CAAC;IACxD,OAAO,KAAK,CAAC,QAAS,CAAC;AAC3B,CAAC;AAHD,wCAGC;AAED,SAAS,aAAa,CAAC,KAAa;IAChC,OAAO,IAAI,CAAC,IAAI,CAAC,KAAK,GAAG,CAAC,EAAE,GAAG,IAAI,CAAC,CAAC,CAAC;AAC1C,CAAC;AAEM,KAAK,UAAU,wBAAwB,CAC1C,GAAQ,EACR,QAAgB,EAChB,OAAgB;IAEhB,IAAI;QACA,MAAM,OAAO,GAAG,EAAE,QAAQ,EAAE,WAAW,EAAE,IAAA,qBAAS,EAAC,OAAO,CAAC,EAAE,CAAC;QAC9D,MAAM,GAAG,CAAC,WAAW,CAAC,OAAO,CAAC,CAAC,OAAO,EAAE,CAAC;KAC5C;IAAC,OAAO,GAAQ,EAAE;QACf,SAAG,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;KACjB;AACL,CAAC;AAXD,4DAWC;AAED,SAAgB,0BAA0B,CACtC,GAAQ,EACR,QAAgB,EAChB,OAAqB,EACrB,OAAmB;IAEnB,MAAM,UAAU,GAAG,IAAA,qBAAS,EAAC,OAAO,CAAC,CAAC;IACtC,OAAO,CAAC,cAAc,IAAI,aAAa,CAAC,UAAU,CAAC,MAAM,CAAC,CAAC;IAC3D,OAAO,IAAA,kBAAO,EACV,CAAC,GAAG,EAAE,CAAC,EAAE,EAAE,CAAC,CAAC,GAAG,CAAC,IAAI,GAAG,EAAE,OAAO,EAAE,KAAK,CAAC,gBAAgB,CAAC,EAC1D,GAAG,EAAE,CACD,GAAG;SACE,OAAO,CAAC;QACL,QAAQ;QACR,OAAO,EAAE,UAAU;KACtB,CAAC;SACD,OAAO,EAAE,CACrB,CAAC;AACN,CAAC;AAlBD,gEAkBC;AAEM,KAAK,UAAU,cAAc,CAAC,SAAiB,EAAE,QAAgB,EAAE,GAAQ;IAC9E,IAAI;QACA,MAAM,kBAAkB,GAA2B;YAC/C,SAAS;YACT,UAAU,EAAE;gBACR,iBAAiB,EAAE,GAAG,QAAQ,EAAE;aACnC;SACJ,CAAC;QACF,MAAM,QAAQ,GAAG,MAAM,GAAG,CAAC,WAAW,CAAC,kBAAkB,CAAC,CAAC,OAAO,EAAE,CAAC;QACrE,MAAM,QAAQ,GAAG,QAAQ,CAAC,QAAS,CAAC;QACpC,MAAM,WAAW,GAAG,MAAM,GAAG;aACxB,kBAAkB,CAAC,EAAE,QAAQ,EAAE,cAAc,EAAE,CAAC,UAAU,CAAC,EAAE,CAAC;aAC9D,OAAO,EAAE,CAAC;QACf,MAAM,QAAQ,GAAG,WAAW,CAAC,UAAU,EAAE,QAAQ,CAAC;QAClD,OAAO,EAAE,QAAQ,EAAE,QAAQ,EAAE,CAAC;KACjC;IAAC,OAAO,GAAQ,EAAE;QACf,MAAM,IAAI,kBAAU,CAAC,GAAG,EAAE,kBAAkB,CAAC,CAAC;KACjD;AACL,CAAC;AAlBD,wCAkBC;AAED,2BAA2B;AAC3B,SAAgB,sBAAsB,CAAC,OAAe;IAClD,IAAI,GAAG,GAAG,IAAI,kBAAU,CAAC,OAAO,CAAC,CAAC;IAClC,IACI,OAAO,EAAE,KAAK,CAAC,kCAAkC,CAAC;QAClD,OAAO,EAAE,KAAK,CAAC,gBAAgB,CAAC,EAClC;QACE,GAAG,GAAG,IAAI,kBAAU,CAChB,EAAE,KAAK,EAAE,GAAG,EAAE,IAAI,EAAE,uBAAe,CAAC,OAAO,EAAE,EAC7C,wBAAwB,CAC3B,CAAC;KACL;SAAM,IAAI,OAAO,EAAE,KAAK,CAAC,MAAM,CAAC,EAAE;QAC/B,GAAG,GAAG,IAAI,kBAAU,CAAC,EAAE,KAAK,EAAE,GAAG,EAAE,IAAI,EAAE,uBAAe,CAAC,QAAQ,EAAE,EAAE,SAAS,CAAC,CAAC;KACnF;SAAM,IAAI,OAAO,EAAE,KAAK,CAAC,kBAAkB,CAAC,EAAE;QAC3C,GAAG,GAAG,IAAI,kBAAU,CAChB,EAAE,KAAK,EAAE,GAAG,EAAE,IAAI,EAAE,uBAAe,CAAC,YAAY,EAAE,EAClD,4BAA4B,CAC/B,CAAC;KACL;IACD,OAAO,GAAG,CAAC;AACf,CAAC;AAnBD,wDAmBC;AAEM,KAAK,UAAU,eAAe,CACjC,GAAQ,EACR,gBAAwB,EACxB,OAAmB,EACnB,MAAqB;IAErB,IAAI;QACA,MAAM,mBAAmB,GAAG,EAAE,CAAC;QAC/B,MAAM,OAAO,GAAG,GAAG,CAAC,cAAc,CAAC;YAC/B,QAAQ,EAAE,gBAAiB;YAC3B,eAAe,EAAE,EAAE;YACnB,mBAAmB;YACnB,qBAAqB,EAAE,CAAC,KAAK,CAAC;YAC9B,cAAc,EAAE,CAAC,eAAe,CAAC;SACpC,CAAC,CAAC;QAEH,MAAM,QAAQ,GAAG,MAAM,OAAO,CAAC,IAAI,CAAC,CAAC,OAAO,CAAC,OAAO,EAAE,EAAE,MAAM,CAAC,CAAC,CAAC;QACjE,IAAI,CAAC,QAAQ,EAAE;YACX,OAAO,CAAC,KAAK,EAAE,CAAC;YAChB,OAAO,EAAE,QAAQ,EAAE,EAAE,EAAE,CAAC;SAC3B;QAED,MAAM,EAAE,QAAQ,GAAG,EAAE,EAAE,GAAG,QAAQ,CAAC;QACnC,MAAM,EAAE,YAAY,EAAE,GAAG,QAAQ,CAAC,SAAS,CAAC;QAC5C,MAAM,aAAa,GAAG,IAAA,iCAAwB,EAAC,YAAY,CAAC,OAAO,CAAC,CAAC;QACrE,OAAO,CAAC,aAAa,IAAI,aAAa,CAAC;QACvC,MAAM,2BAA2B,GAAG,aAAa,CAAC,aAAa,CAAC,CAAC;QACjE,MAAM,uBAAuB,GAAG,IAAA,YAAG,EAC/B,QAAQ,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,aAAa,CAAC,CAAC,CAAC,IAAI,EAAE,MAAM,IAAI,CAAC,CAAC,CAAC,CACxD,CAAC;QACF,OAAO,CAAC,cAAc,IAAI,uBAAuB,GAAG,2BAA2B,CAAC;QAChF,IAAI,QAAQ,CAAC,MAAM,GAAG,CAAC,EAAE;YACrB,GAAG,CAAC,kBAAkB,CAAC;gBACnB,QAAQ,EAAE,gBAAiB;gBAC3B,OAAO,EAAE,QAAQ,CAAC,GAAG,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC;oBACxB,EAAE,EAAE,CAAC,CAAC,SAAU;oBAChB,aAAa,EAAE,CAAC,CAAC,aAAc;iBAClC,CAAC,CAAC;aACN,CAAC;iBACG,OAAO,EAAE;iBACT,KAAK,CAAC,CAAC,CAAC,EAAE,GAAE,CAAC,CAAC,CAAC;YACpB,OAAO,CAAC,cAAc,EAAE,CAAC;SAC5B;QACD,OAAO;YACH,QAAQ,EAAE,QAAQ,CAAC,GAAG,CAAC,2BAA2B,CAAC,CAAC,MAAM,CAAC,gBAAO,CAAC;YACnE,kBAAkB,EAAE,QAAQ,CAAC,MAAM,KAAK,mBAAmB;SAC9D,CAAC;KACL;IAAC,OAAO,GAAQ,EAAE;QACf,MAAM,IAAI,kBAAU,CAAC,GAAG,EAAE,iBAAiB,CAAC,CAAC;KAChD;AACL,CAAC;AAlDD,0CAkDC;AA0BD,SAAS,2BAA2B,CAAC,CAAc;IAC/C,0BAA0B;IAC1B,8EAA8E;IAC9E,0DAA0D;IAC1D,4EAA4E;IAC5E,YAAY;IACZ,MAAM,GAAG,GAAG,IAAA,uBAAW,EAAC,CAAC,CAAC,IAAK,CAAC,CAAC;IACjC,IAAI,GAAG,CAAC,eAAe,EAAE;QACrB,MAAM,OAAO,GAAG,GAA+B,CAAC;QAChD,MAAM,UAAU,GAAG,OAAO,CAAC,cAA0B,CAAC;QACtD,MAAM,MAAM,GAAG,UAAU,CAAC,OAAO,CAAC,CAAC,CAAC,CAAC;QACrC,MAAM,KAAK,GAAiB,IAAA,uBAAW,EAAC,MAAM,CAAC,GAAG,CAAC,OAAO,CAAC,CAAC;QAC5D,IAAI,KAAwB,CAAC;QAC7B,MAAM,gBAAgB,GAAG,OAAO,CAAC,eAAyC,CAAC;QAC3E,IAAI,gBAAgB,EAAE;YAClB,KAAK,GAAG,sBAAsB,CAAC,gBAAgB,CAAC,YAAY,CAAC,CAAC;YAC9D,KAAK,CAAC,KAAK,GAAG,gBAAgB,CAAC,UAAU,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;SACzD;aAAM;YACH,KAAK,GAAG,sBAAsB,CAAC,OAAO,CAAC,cAAc,CAAC,SAAS,CAAC,CAAC;SACpE;QACD,MAAM,WAAW,GAAG,OAAO,CAAC,cAAc,CAAC,SAAS,CAAC;QACrD,OAAO;YACH,GAAG,IAAA,6BAAmB,EAAC,KAAK,EAAE;gBAC1B,IAAI,EAAE,KAAK;gBACX,SAAS,EAAE,IAAI,IAAI,CAAC,MAAM,CAAC,GAAG,CAAC,SAAS,CAAC,CAAC,OAAO,EAAE;gBACnD,WAAW;aACd,CAAC;YACF,SAAS,EAAE,IAAI,IAAI,CAAC,OAAO,CAAC,SAAS,CAAC,CAAC,OAAO,EAAE;SACnD,CAAC;KACL;SAAM;QACH,MAAM,OAAO,GAAG,GAAc,CAAC;QAC/B,QAAQ,OAAO,CAAC,IAAI,EAAE;YAClB,KAAK,SAAS,CAAC;YACf,KAAK,UAAU;gBACX,OAAO,CAAC,SAAS,GAAG,MAAM,CAAC,CAAC,CAAC,UAAW,CAAC,aAAa,CAAC,CAAC;gBACxD,MAAM;YACV,KAAK,YAAY;gBACb,MAAM;YACV,KAAK,iBAAiB;gBAClB,MAAM;YACV,OAAO,CAAC,CAAC;gBACL,OAAO,CAAC,IAAI,CAAC,8CAA8C,CAAC,CAAC;aAChE;SACJ;QACD,OAAO,GAAG,CAAC;KACd;AACL,CAAC","sourcesContent":["import { SNSEvent } from \"aws-lambda\";\nimport { SNS, SQS } from \"aws-sdk\";\nimport { FaastError, FaastErrorNames } from \"../error\";\nimport { log } from \"../log\";\nimport { Message, PollResult } from \"../provider\";\nimport { deserialize, serialize } from \"../serialize\";\nimport { computeHttpResponseBytes, defined, sum } from \"../shared\";\nimport { retryOp } from \"../throttle\";\nimport { createErrorResponse, FunctionCall } from \"../wrapper\";\nimport { AwsMetrics } from \"./aws-faast\";\n\nexport async function createSNSTopic(sns: SNS, Name: string) {\n    const topic = await sns.createTopic({ Name }).promise();\n    return topic.TopicArn!;\n}\n\nfunction countRequests(bytes: number) {\n    return Math.ceil(bytes / (64 * 1024));\n}\n\nexport async function sendResponseQueueMessage(\n    sqs: SQS,\n    QueueUrl: string,\n    message: Message\n) {\n    try {\n        const request = { QueueUrl, MessageBody: serialize(message) };\n        await sqs.sendMessage(request).promise();\n    } catch (err: any) {\n        log.warn(err);\n    }\n}\n\nexport function publishFunctionCallMessage(\n    sns: SNS,\n    TopicArn: string,\n    message: FunctionCall,\n    metrics: AwsMetrics\n) {\n    const serialized = serialize(message);\n    metrics.sns64kRequests += countRequests(serialized.length);\n    return retryOp(\n        (err, n) => n < 6 && err?.message?.match(/does not exist/),\n        () =>\n            sns\n                .publish({\n                    TopicArn,\n                    Message: serialized\n                })\n                .promise()\n    );\n}\n\nexport async function createSQSQueue(QueueName: string, VTimeout: number, sqs: SQS) {\n    try {\n        const createQueueRequest: SQS.CreateQueueRequest = {\n            QueueName,\n            Attributes: {\n                VisibilityTimeout: `${VTimeout}`\n            }\n        };\n        const response = await sqs.createQueue(createQueueRequest).promise();\n        const QueueUrl = response.QueueUrl!;\n        const arnResponse = await sqs\n            .getQueueAttributes({ QueueUrl, AttributeNames: [\"QueueArn\"] })\n            .promise();\n        const QueueArn = arnResponse.Attributes?.QueueArn;\n        return { QueueUrl, QueueArn };\n    } catch (err: any) {\n        throw new FaastError(err, \"create sqs queue\");\n    }\n}\n\n/* istanbul ignore next  */\nexport function processAwsErrorMessage(message: string): Error {\n    let err = new FaastError(message);\n    if (\n        message?.match(/Process exited before completing/) ||\n        message?.match(/signal: killed/)\n    ) {\n        err = new FaastError(\n            { cause: err, name: FaastErrorNames.EMEMORY },\n            \"possibly out of memory\"\n        );\n    } else if (message?.match(/time/)) {\n        err = new FaastError({ cause: err, name: FaastErrorNames.ETIMEOUT }, \"timeout\");\n    } else if (message?.match(/EventAgeExceeded/)) {\n        err = new FaastError(\n            { cause: err, name: FaastErrorNames.ECONCURRENCY },\n            \"concurrency limit exceeded\"\n        );\n    }\n    return err;\n}\n\nexport async function receiveMessages(\n    sqs: SQS,\n    ResponseQueueUrl: string,\n    metrics: AwsMetrics,\n    cancel: Promise<void>\n): Promise<PollResult> {\n    try {\n        const MaxNumberOfMessages = 10;\n        const request = sqs.receiveMessage({\n            QueueUrl: ResponseQueueUrl!,\n            WaitTimeSeconds: 20,\n            MaxNumberOfMessages,\n            MessageAttributeNames: [\"All\"],\n            AttributeNames: [\"SentTimestamp\"]\n        });\n\n        const response = await Promise.race([request.promise(), cancel]);\n        if (!response) {\n            request.abort();\n            return { Messages: [] };\n        }\n\n        const { Messages = [] } = response;\n        const { httpResponse } = response.$response;\n        const receivedBytes = computeHttpResponseBytes(httpResponse.headers);\n        metrics.outboundBytes += receivedBytes;\n        const inferredSqsRequestsReceived = countRequests(receivedBytes);\n        const inferredSqsRequestsSent = sum(\n            Messages.map(m => countRequests(m.Body?.length ?? 1))\n        );\n        metrics.sqs64kRequests += inferredSqsRequestsSent + inferredSqsRequestsReceived;\n        if (Messages.length > 0) {\n            sqs.deleteMessageBatch({\n                QueueUrl: ResponseQueueUrl!,\n                Entries: Messages.map(m => ({\n                    Id: m.MessageId!,\n                    ReceiptHandle: m.ReceiptHandle!\n                }))\n            })\n                .promise()\n                .catch(_ => {});\n            metrics.sqs64kRequests++;\n        }\n        return {\n            Messages: Messages.map(processIncomingQueueMessage).filter(defined),\n            isFullMessageBatch: Messages.length === MaxNumberOfMessages\n        };\n    } catch (err: any) {\n        throw new FaastError(err, \"receiveMessages\");\n    }\n}\n\ninterface LambdaDestinationError {\n    errorMessage: string;\n    errorType?: string;\n    stackTrace?: string[];\n}\n\ninterface LambdaDestinationMessage {\n    version: string;\n    timestamp: string;\n    requestContext: {\n        requestId: string;\n        functionArn: string;\n        condition: \"RetriesExhausted\" | \"Success\";\n        approximateInvokeCount: number;\n    };\n    requestPayload: object;\n    responseContext: {\n        statusCode: number;\n        executedVersion: string;\n        functionError?: string;\n    };\n    responsePayload: LambdaDestinationError | object;\n}\n\nfunction processIncomingQueueMessage(m: SQS.Message): Message | void {\n    // AWS Lambda Destinations\n    // (https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/)\n    // are used to route failures to the response queue. These\n    // messages are generated by AWS Lambda and are constrained to the format it\n    // provides.\n    const raw = deserialize(m.Body!);\n    if (raw.responseContext) {\n        const message = raw as LambdaDestinationMessage;\n        const snsMessage = message.requestPayload as SNSEvent;\n        const record = snsMessage.Records[0];\n        const sCall: FunctionCall = deserialize(record.Sns.Message);\n        let error: Error | undefined;\n        const destinationError = message.responsePayload as LambdaDestinationError;\n        if (destinationError) {\n            error = processAwsErrorMessage(destinationError.errorMessage);\n            error.stack = destinationError.stackTrace?.join(\"\\n\");\n        } else {\n            error = processAwsErrorMessage(message.requestContext.condition);\n        }\n        const executionId = message.requestContext.requestId;\n        return {\n            ...createErrorResponse(error, {\n                call: sCall,\n                startTime: new Date(record.Sns.Timestamp).getTime(),\n                executionId\n            }),\n            timestamp: new Date(message.timestamp).getTime()\n        };\n    } else {\n        const message = raw as Message;\n        switch (message.kind) {\n            case \"promise\":\n            case \"iterator\":\n                message.timestamp = Number(m.Attributes!.SentTimestamp);\n                break;\n            case \"cpumetrics\":\n                break;\n            case \"functionstarted\":\n                break;\n            default: {\n                console.warn(`Unknown message received from response queue`);\n            }\n        }\n        return raw;\n    }\n}\n"]} |
\ | No newline at end of file |