UNPKG

10.1 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.publishResponseMessage = exports.publishPubSub = exports.receiveMessages = void 0;
4const abort_controller_1 = require("abort-controller");
5const serialize_1 = require("../serialize");
6const shared_1 = require("../shared");
7const throttle_1 = require("../throttle");
8async function receiveMessages(pubsub, subscription, metrics, cancel) {
9 // Does higher message batching lead to better throughput? 10 is the max that AWS SQS allows.
10 const maxMessages = 10;
11 const source = new abort_controller_1.AbortController();
12 const request = pubsub.projects.subscriptions.pull({
13 subscription,
14 requestBody: { returnImmediately: false, maxMessages }
15 }, { signal: source.signal });
16 const response = await Promise.race([request, cancel]);
17 if (!response) {
18 source.abort();
19 return { Messages: [] };
20 }
21 metrics.outboundBytes += (0, shared_1.computeHttpResponseBytes)(response.headers);
22 metrics.pubSubBytes +=
23 (0, shared_1.computeHttpResponseBytes)(response.headers, { httpHeaders: false, min: 1024 }) * 2;
24 const Messages = response.data.receivedMessages || [];
25 if (Messages.length > 0) {
26 pubsub.projects.subscriptions
27 .acknowledge({
28 subscription,
29 requestBody: {
30 ackIds: Messages.map(m => m.ackId || "").filter(m => m !== "")
31 }
32 })
33 .catch(_ => { });
34 }
35 return {
36 Messages: Messages.map(m => m.message)
37 .map(processMessage)
38 .filter(shared_1.defined),
39 isFullMessageBatch: Messages.length === maxMessages
40 };
41}
42exports.receiveMessages = receiveMessages;
43function parseTimestamp(timestampStr) {
44 return Date.parse(timestampStr || "") || 0;
45}
46function processMessage(m) {
47 const data = m.data || "";
48 const raw = Buffer.from(data, "base64").toString();
49 const message = (0, serialize_1.deserialize)(raw);
50 if (message.kind === "response") {
51 message.timestamp = parseTimestamp(m.publishTime);
52 }
53 return message;
54}
55async function publishPubSub(pubsub, topic, message, attributes) {
56 const data = Buffer.from(message).toString("base64");
57 await (0, throttle_1.retryOp)(6, () => pubsub.projects.topics.publish({
58 topic,
59 requestBody: { messages: [{ data, attributes }] }
60 }));
61}
62exports.publishPubSub = publishPubSub;
63function publishResponseMessage(pubsub, ResponseQueue, message) {
64 return publishPubSub(pubsub, ResponseQueue, (0, serialize_1.serialize)(message));
65}
66exports.publishResponseMessage = publishResponseMessage;
67//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiZ29vZ2xlLXF1ZXVlLmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiLi4vLi4vLi4vc3JjL2dvb2dsZS9nb29nbGUtcXVldWUudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7O0FBQUEsdURBQW1EO0FBR25ELDRDQUFzRDtBQUN0RCxzQ0FBOEQ7QUFDOUQsMENBQXNDO0FBTS9CLEtBQUssVUFBVSxlQUFlLENBQ2pDLE1BQXdCLEVBQ3hCLFlBQW9CLEVBQ3BCLE9BQXNCLEVBQ3RCLE1BQXFCO0lBRXJCLDZGQUE2RjtJQUM3RixNQUFNLFdBQVcsR0FBRyxFQUFFLENBQUM7SUFDdkIsTUFBTSxNQUFNLEdBQUcsSUFBSSxrQ0FBZSxFQUFFLENBQUM7SUFDckMsTUFBTSxPQUFPLEdBQUcsTUFBTSxDQUFDLFFBQVEsQ0FBQyxhQUFhLENBQUMsSUFBSSxDQUM5QztRQUNJLFlBQVk7UUFDWixXQUFXLEVBQUUsRUFBRSxpQkFBaUIsRUFBRSxLQUFLLEVBQUUsV0FBVyxFQUFFO0tBQ3pELEVBQ0QsRUFBRSxNQUFNLEVBQUUsTUFBTSxDQUFDLE1BQU0sRUFBRSxDQUM1QixDQUFDO0lBRUYsTUFBTSxRQUFRLEdBQUcsTUFBTSxPQUFPLENBQUMsSUFBSSxDQUFDLENBQUMsT0FBTyxFQUFFLE1BQU0sQ0FBQyxDQUFDLENBQUM7SUFDdkQsSUFBSSxDQUFDLFFBQVEsRUFBRTtRQUNYLE1BQU0sQ0FBQyxLQUFLLEVBQUUsQ0FBQztRQUNmLE9BQU8sRUFBRSxRQUFRLEVBQUUsRUFBRSxFQUFFLENBQUM7S0FDM0I7SUFFRCxPQUFPLENBQUMsYUFBYSxJQUFJLElBQUEsaUNBQXdCLEVBQUMsUUFBUSxDQUFDLE9BQU8sQ0FBQyxDQUFDO0lBQ3BFLE9BQU8sQ0FBQyxXQUFXO1FBQ2YsSUFBQSxpQ0FBd0IsRUFBQyxRQUFRLENBQUMsT0FBTyxFQUFFLEVBQUUsV0FBVyxFQUFFLEtBQUssRUFBRSxHQUFHLEVBQUUsSUFBSSxFQUFFLENBQUMsR0FBRyxDQUFDLENBQUM7SUFDdEYsTUFBTSxRQUFRLEdBQUcsUUFBUSxDQUFDLElBQUksQ0FBQyxnQkFBZ0IsSUFBSSxFQUFFLENBQUM7SUFDdEQsSUFBSSxRQUFRLENBQUMsTUFBTSxHQUFHLENBQUMsRUFBRTtRQUNyQixNQUFNLENBQUMsUUFBUSxDQUFDLGFBQWE7YUFDeEIsV0FBVyxDQUFDO1lBQ1QsWUFBWTtZQUNaLFdBQVcsRUFBRTtnQkFDVCxNQUFNLEVBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBQyxDQUFDLENBQUMsRUFBRSxDQUFDLENBQUMsQ0FBQyxLQUFLLElBQUksRUFBRSxDQUFDLENBQUMsTUFBTSxDQUFDLENBQUMsQ0FBQyxFQUFFLENBQUMsQ0FBQyxLQUFLLEVBQUUsQ0FBQzthQUNqRTtTQUNKLENBQUM7YUFDRCxLQUFLLENBQUMsQ0FBQyxDQUFDLEVBQUUsR0FBRSxDQUFDLENBQUMsQ0FBQztLQUN2QjtJQUNELE9BQU87UUFDSCxRQUFRLEVBQUUsUUFBUSxDQUFDLEdBQUcsQ0FBQyxDQUFDLENBQUMsRUFBRSxDQUFDLENBQUMsQ0FBQyxPQUFRLENBQUM7YUFDbEMsR0FBRyxDQUFDLGNBQWMsQ0FBQzthQUNuQixNQUFNLENBQUMsZ0JBQU8sQ0FBQztRQUNwQixrQkFBa0IsRUFBRSxRQUFRLENBQUMsTUFBTSxLQUFLLFdBQVc7S0FDdEQsQ0FBQztBQUNOLENBQUM7QUEzQ0QsMENBMkNDO0FBRUQsU0FBUyxjQUFjLENBQUMsWUFBZ0M7SUFDcEQsT0FBTyxJQUFJLENBQUMsS0FBSyxDQUFDLFlBQVksSUFBSSxFQUFFLENBQUMsSUFBSSxDQUFDLENBQUM7QUFDL0MsQ0FBQztBQUVELFNBQVMsY0FBYyxDQUFDLENBQWdCO0lBQ3BDLE1BQU0sSUFBSSxHQUFHLENBQUMsQ0FBQyxJQUFJLElBQUksRUFBRSxDQUFDO0lBQzFCLE1BQU0sR0FBRyxHQUFHLE1BQU0sQ0FBQyxJQUFJLENBQUMsSUFBSSxFQUFFLFFBQVEsQ0FBQyxDQUFDLFFBQVEsRUFBRSxDQUFDO0lBQ25ELE1BQU0sT0FBTyxHQUFHLElBQUEsdUJBQVcsRUFBQyxHQUFHLENBQUMsQ0FBQztJQUNqQyxJQUFJLE9BQU8sQ0FBQyxJQUFJLEtBQUssVUFBVSxFQUFFO1FBQzdCLE9BQU8sQ0FBQyxTQUFTLEdBQUcsY0FBYyxDQUFDLENBQUMsQ0FBQyxXQUFZLENBQUMsQ0FBQztLQUN0RDtJQUNELE9BQU8sT0FBTyxDQUFDO0FBQ25CLENBQUM7QUFFTSxLQUFLLFVBQVUsYUFBYSxDQUMvQixNQUF3QixFQUN4QixLQUFhLEVBQ2IsT0FBZSxFQUNmLFVBQXVCO0lBRXZCLE1BQU0sSUFBSSxHQUFHLE1BQU0sQ0FBQyxJQUFJLENBQUMsT0FBTyxDQUFDLENBQUMsUUFBUSxDQUFDLFFBQVEsQ0FBQyxDQUFDO0lBRXJELE1BQU0sSUFBQSxrQkFBTyxFQUFDLENBQUMsRUFBRSxHQUFHLEVBQUUsQ0FDbEIsTUFBTSxDQUFDLFFBQVEsQ0FBQyxNQUFNLENBQUMsT0FBTyxDQUFDO1FBQzNCLEtBQUs7UUFDTCxXQUFXLEVBQUUsRUFBRSxRQUFRLEVBQUUsQ0FBQyxFQUFFLElBQUksRUFBRSxVQUFVLEVBQUUsQ0FBQyxFQUFFO0tBQ3BELENBQUMsQ0FDTCxDQUFDO0FBQ04sQ0FBQztBQWRELHNDQWNDO0FBRUQsU0FBZ0Isc0JBQXNCLENBQ2xDLE1BQXdCLEVBQ3hCLGFBQXFCLEVBQ3JCLE9BQWdCO0lBRWhCLE9BQU8sYUFBYSxDQUFDLE1BQU0sRUFBRSxhQUFhLEVBQUUsSUFBQSxxQkFBUyxFQUFDLE9BQU8sQ0FBQyxDQUFDLENBQUM7QUFDcEUsQ0FBQztBQU5ELHdEQU1DIiwic291cmNlc0NvbnRlbnQiOlsiaW1wb3J0IHsgQWJvcnRDb250cm9sbGVyIH0gZnJvbSBcImFib3J0LWNvbnRyb2xsZXJcIjtcbmltcG9ydCB7IHB1YnN1Yl92MSB9IGZyb20gXCJnb29nbGVhcGlzXCI7XG5pbXBvcnQgeyBNZXNzYWdlLCBQb2xsUmVzdWx0IH0gZnJvbSBcIi4uL3Byb3ZpZGVyXCI7XG5pbXBvcnQgeyBkZXNlcmlhbGl6ZSwgc2VyaWFsaXplIH0gZnJvbSBcIi4uL3NlcmlhbGl6ZVwiO1xuaW1wb3J0IHsgY29tcHV0ZUh0dHBSZXNwb25zZUJ5dGVzLCBkZWZpbmVkIH0gZnJvbSBcIi4uL3NoYXJlZFwiO1xuaW1wb3J0IHsgcmV0cnlPcCB9IGZyb20gXCIuLi90aHJvdHRsZVwiO1xuaW1wb3J0IHsgQXR0cmlidXRlcyB9IGZyb20gXCIuLi90eXBlc1wiO1xuaW1wb3J0IHsgR29vZ2xlTWV0cmljcyB9IGZyb20gXCIuL2dvb2dsZS1mYWFzdFwiO1xuaW1wb3J0IFB1YlN1YkFwaSA9IHB1YnN1Yl92MTtcbmltcG9ydCBQdWJTdWJNZXNzYWdlID0gcHVic3ViX3YxLlNjaGVtYSRQdWJzdWJNZXNzYWdlO1xuXG5leHBvcnQgYXN5bmMgZnVuY3Rpb24gcmVjZWl2ZU1lc3NhZ2VzKFxuICAgIHB1YnN1YjogUHViU3ViQXBpLlB1YnN1YixcbiAgICBzdWJzY3JpcHRpb246IHN0cmluZyxcbiAgICBtZXRyaWNzOiBHb29nbGVNZXRyaWNzLFxuICAgIGNhbmNlbDogUHJvbWlzZTx2b2lkPlxuKTogUHJvbWlzZTxQb2xsUmVzdWx0PiB7XG4gICAgLy8gRG9lcyBoaWdoZXIgbWVzc2FnZSBiYXRjaGluZyBsZWFkIHRvIGJldHRlciB0aHJvdWdocHV0PyAxMCBpcyB0aGUgbWF4IHRoYXQgQVdTIFNRUyBhbGxvd3MuXG4gICAgY29uc3QgbWF4TWVzc2FnZXMgPSAxMDtcbiAgICBjb25zdCBzb3VyY2UgPSBuZXcgQWJvcnRDb250cm9sbGVyKCk7XG4gICAgY29uc3QgcmVxdWVzdCA9IHB1YnN1Yi5wcm9qZWN0cy5zdWJzY3JpcHRpb25zLnB1bGwoXG4gICAgICAgIHtcbiAgICAgICAgICAgIHN1YnNjcmlwdGlvbixcbiAgICAgICAgICAgIHJlcXVlc3RCb2R5OiB7IHJldHVybkltbWVkaWF0ZWx5OiBmYWxzZSwgbWF4TWVzc2FnZXMgfVxuICAgICAgICB9LFxuICAgICAgICB7IHNpZ25hbDogc291cmNlLnNpZ25hbCB9XG4gICAgKTtcblxuICAgIGNvbnN0IHJlc3BvbnNlID0gYXdhaXQgUHJvbWlzZS5yYWNlKFtyZXF1ZXN0LCBjYW5jZWxdKTtcbiAgICBpZiAoIXJlc3BvbnNlKSB7XG4gICAgICAgIHNvdXJjZS5hYm9ydCgpO1xuICAgICAgICByZXR1cm4geyBNZXNzYWdlczogW10gfTtcbiAgICB9XG5cbiAgICBtZXRyaWNzLm91dGJvdW5kQnl0ZXMgKz0gY29tcHV0ZUh0dHBSZXNwb25zZUJ5dGVzKHJlc3BvbnNlLmhlYWRlcnMpO1xuICAgIG1ldHJpY3MucHViU3ViQnl0ZXMgKz1cbiAgICAgICAgY29tcHV0ZUh0dHBSZXNwb25zZUJ5dGVzKHJlc3BvbnNlLmhlYWRlcnMsIHsgaHR0cEhlYWRlcnM6IGZhbHNlLCBtaW46IDEwMjQgfSkgKiAyO1xuICAgIGNvbnN0IE1lc3NhZ2VzID0gcmVzcG9uc2UuZGF0YS5yZWNlaXZlZE1lc3NhZ2VzIHx8IFtdO1xuICAgIGlmIChNZXNzYWdlcy5sZW5ndGggPiAwKSB7XG4gICAgICAgIHB1YnN1Yi5wcm9qZWN0cy5zdWJzY3JpcHRpb25zXG4gICAgICAgICAgICAuYWNrbm93bGVkZ2Uoe1xuICAgICAgICAgICAgICAgIHN1YnNjcmlwdGlvbixcbiAgICAgICAgICAgICAgICByZXF1ZXN0Qm9keToge1xuICAgICAgICAgICAgICAgICAgICBhY2tJZHM6IE1lc3NhZ2VzLm1hcChtID0+IG0uYWNrSWQgfHwgXCJcIikuZmlsdGVyKG0gPT4gbSAhPT0gXCJcIilcbiAgICAgICAgICAgICAgICB9XG4gICAgICAgICAgICB9KVxuICAgICAgICAgICAgLmNhdGNoKF8gPT4ge30pO1xuICAgIH1cbiAgICByZXR1cm4ge1xuICAgICAgICBNZXNzYWdlczogTWVzc2FnZXMubWFwKG0gPT4gbS5tZXNzYWdlISlcbiAgICAgICAgICAgIC5tYXAocHJvY2Vzc01lc3NhZ2UpXG4gICAgICAgICAgICAuZmlsdGVyKGRlZmluZWQpLFxuICAgICAgICBpc0Z1bGxNZXNzYWdlQmF0Y2g6IE1lc3NhZ2VzLmxlbmd0aCA9PT0gbWF4TWVzc2FnZXNcbiAgICB9O1xufVxuXG5mdW5jdGlvbiBwYXJzZVRpbWVzdGFtcCh0aW1lc3RhbXBTdHI6IHN0cmluZyB8IHVuZGVmaW5lZCkge1xuICAgIHJldHVybiBEYXRlLnBhcnNlKHRpbWVzdGFtcFN0ciB8fCBcIlwiKSB8fCAwO1xufVxuXG5mdW5jdGlvbiBwcm9jZXNzTWVzc2FnZShtOiBQdWJTdWJNZXNzYWdlKTogTWVzc2FnZSB8IHZvaWQge1xuICAgIGNvbnN0IGRhdGEgPSBtLmRhdGEgfHwgXCJcIjtcbiAgICBjb25zdCByYXcgPSBCdWZmZXIuZnJvbShkYXRhLCBcImJhc2U2NFwiKS50b1N0cmluZygpO1xuICAgIGNvbnN0IG1lc3NhZ2UgPSBkZXNlcmlhbGl6ZShyYXcpO1xuICAgIGlmIChtZXNzYWdlLmtpbmQgPT09IFwicmVzcG9uc2VcIikge1xuICAgICAgICBtZXNzYWdlLnRpbWVzdGFtcCA9IHBhcnNlVGltZXN0YW1wKG0ucHVibGlzaFRpbWUhKTtcbiAgICB9XG4gICAgcmV0dXJuIG1lc3NhZ2U7XG59XG5cbmV4cG9ydCBhc3luYyBmdW5jdGlvbiBwdWJsaXNoUHViU3ViKFxuICAgIHB1YnN1YjogUHViU3ViQXBpLlB1YnN1YixcbiAgICB0b3BpYzogc3RyaW5nLFxuICAgIG1lc3NhZ2U6IHN0cmluZyxcbiAgICBhdHRyaWJ1dGVzPzogQXR0cmlidXRlc1xuKSB7XG4gICAgY29uc3QgZGF0YSA9IEJ1ZmZlci5mcm9tKG1lc3NhZ2UpLnRvU3RyaW5nKFwiYmFzZTY0XCIpO1xuXG4gICAgYXdhaXQgcmV0cnlPcCg2LCAoKSA9PlxuICAgICAgICBwdWJzdWIucHJvamVjdHMudG9waWNzLnB1Ymxpc2goe1xuICAgICAgICAgICAgdG9waWMsXG4gICAgICAgICAgICByZXF1ZXN0Qm9keTogeyBtZXNzYWdlczogW3sgZGF0YSwgYXR0cmlidXRlcyB9XSB9XG4gICAgICAgIH0pXG4gICAgKTtcbn1cblxuZXhwb3J0IGZ1bmN0aW9uIHB1Ymxpc2hSZXNwb25zZU1lc3NhZ2UoXG4gICAgcHVic3ViOiBQdWJTdWJBcGkuUHVic3ViLFxuICAgIFJlc3BvbnNlUXVldWU6IHN0cmluZyxcbiAgICBtZXNzYWdlOiBNZXNzYWdlXG4pIHtcbiAgICByZXR1cm4gcHVibGlzaFB1YlN1YihwdWJzdWIsIFJlc3BvbnNlUXVldWUsIHNlcmlhbGl6ZShtZXNzYWdlKSk7XG59XG4iXX0=
\No newline at end of file