1 | "use strict";
|
2 | var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
|
3 | return new (P || (P = Promise))(function (resolve, reject) {
|
4 | function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
|
5 | function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
|
6 | function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
|
7 | step((generator = generator.apply(thisArg, _arguments || [])).next());
|
8 | });
|
9 | };
|
10 | Object.defineProperty(exports, "__esModule", { value: true });
|
11 | const winston = require("winston");
|
12 | const aws_sdk_1 = require("aws-sdk");
|
13 | const sns = new aws_sdk_1.SNS({ region: 'us-east-1' });
|
14 | exports.sqs = new aws_sdk_1.SQS({ region: 'us-east-1' });
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 | function ensureQueue(queueName, topicNames = []) {
|
23 | return __awaiter(this, void 0, void 0, function* () {
|
24 |
|
25 | winston.info('ensure queue', queueName);
|
26 | yield exports.sqs
|
27 | .createQueue({ QueueName: queueName, Attributes: { MessageRetentionPeriod: '60' } })
|
28 | .promise();
|
29 |
|
30 | let queueUrlResult = yield exports.sqs.getQueueUrl({ QueueName: queueName }).promise();
|
31 | let queueUrl = queueUrlResult.QueueUrl;
|
32 | winston.info('queueUrl', queueUrl);
|
33 |
|
34 | let queueAttributes = yield exports.sqs
|
35 | .getQueueAttributes({ QueueUrl: queueUrl, AttributeNames: ['QueueArn'] })
|
36 | .promise();
|
37 | let queueArn = queueAttributes.Attributes.QueueArn;
|
38 | winston.info('queueArn', queueArn);
|
39 |
|
40 | let topicArns = [];
|
41 | for (let topicName of topicNames) {
|
42 | winston.info('subscribing to', topicName);
|
43 |
|
44 | let topicResult = yield sns.createTopic({ Name: topicName }).promise();
|
45 | let topicArn = topicResult.TopicArn;
|
46 | topicArns.push(topicArn);
|
47 |
|
48 | yield sns
|
49 | .subscribe({
|
50 | TopicArn: topicArn,
|
51 | Protocol: 'sqs',
|
52 | Endpoint: queueArn,
|
53 | })
|
54 | .promise();
|
55 | }
|
56 |
|
57 |
|
58 | yield exports.sqs
|
59 | .setQueueAttributes({
|
60 | QueueUrl: queueUrl,
|
61 | Attributes: {
|
62 | Policy: `{
|
63 | "Version":"2012-10-17",
|
64 | "Statement":[
|
65 | {
|
66 | "Sid":"${queueName}Policy",
|
67 | "Effect":"Allow",
|
68 | "Principal":"*",
|
69 | "Action":"sqs:SendMessage",
|
70 | "Resource":"${queueArn}",
|
71 | "Condition":{
|
72 | "ForAnyValue:ArnEquals":{
|
73 | "aws:SourceArn":["${topicArns.join('","')}"]
|
74 | }
|
75 | }
|
76 | }
|
77 | ]
|
78 | }`,
|
79 | },
|
80 | })
|
81 | .promise();
|
82 | return {
|
83 | queueName,
|
84 | queueUrl,
|
85 | queueArn,
|
86 | topicArns,
|
87 | topicNames,
|
88 | };
|
89 | });
|
90 | }
|
91 | exports.ensureQueue = ensureQueue;
|
92 |
|
93 |
|
94 |
|
95 |
|
96 |
|
97 |
|
98 | function receiveMessages(queueUrl, callback) {
|
99 | return __awaiter(this, void 0, void 0, function* () {
|
100 | winston.info('receive messages', queueUrl);
|
101 | let keepRunning = true;
|
102 | while (keepRunning) {
|
103 | let results = yield exports.sqs
|
104 | .receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 10 })
|
105 | .promise();
|
106 | callback(results.Messages);
|
107 | }
|
108 | });
|
109 | }
|
110 | exports.receiveMessages = receiveMessages;
|
111 |
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 | function deleteMessage(queueUrl, receiptHandle) {
|
118 | return __awaiter(this, void 0, void 0, function* () {
|
119 | yield exports.sqs
|
120 | .deleteMessage({
|
121 | QueueUrl: queueUrl,
|
122 | ReceiptHandle: receiptHandle,
|
123 | })
|
124 | .promise();
|
125 | });
|
126 | }
|
127 | exports.deleteMessage = deleteMessage;
|
128 |
|
129 |
|
130 |
|
131 |
|
132 |
|
133 |
|
134 | function sendMessageBatch(queueUrl, entries) {
|
135 | return __awaiter(this, void 0, void 0, function* () {
|
136 | entries = entries.slice();
|
137 | let chain = Promise.resolve();
|
138 | while (entries.length) {
|
139 | let id = 0;
|
140 | let chunk = entries.splice(0, 10);
|
141 | chain.then(() => {
|
142 | return exports.sqs
|
143 | .sendMessageBatch({
|
144 | QueueUrl: queueUrl,
|
145 | Entries: chunk,
|
146 | })
|
147 | .promise();
|
148 | });
|
149 | }
|
150 | return chain;
|
151 | });
|
152 | }
|
153 | exports.sendMessageBatch = sendMessageBatch;
|
154 |
|
\ | No newline at end of file |