UNPKG

5.55 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 return new (P || (P = Promise))(function (resolve, reject) {
4 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
5 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
6 function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
7 step((generator = generator.apply(thisArg, _arguments || [])).next());
8 });
9};
10Object.defineProperty(exports, "__esModule", { value: true });
11const winston = require("winston");
12const aws_sdk_1 = require("aws-sdk");
13const sns = new aws_sdk_1.SNS({ region: 'us-east-1' });
14exports.sqs = new aws_sdk_1.SQS({ region: 'us-east-1' });
15/**
16 * http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html
17 * http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SNS.html
18 * @param {string} queueName [description]
19 * @param {string[] = []} topicNames [description]
20 * @return {Promise<EnsureQueueResult>} [description]
21 */
22function ensureQueue(queueName, topicNames = []) {
23 return __awaiter(this, void 0, void 0, function* () {
24 // idempotent call to create the queue
25 winston.info('ensure queue', queueName);
26 yield exports.sqs
27 .createQueue({ QueueName: queueName, Attributes: { MessageRetentionPeriod: '60' } })
28 .promise();
29 // retrieve the url
30 let queueUrlResult = yield exports.sqs.getQueueUrl({ QueueName: queueName }).promise();
31 let queueUrl = queueUrlResult.QueueUrl;
32 winston.info('queueUrl', queueUrl);
33 // retrive the arn for use with topics
34 let queueAttributes = yield exports.sqs
35 .getQueueAttributes({ QueueUrl: queueUrl, AttributeNames: ['QueueArn'] })
36 .promise();
37 let queueArn = queueAttributes.Attributes.QueueArn;
38 winston.info('queueArn', queueArn);
39 // attach to each topic
40 let topicArns = [];
41 for (let topicName of topicNames) {
42 winston.info('subscribing to', topicName);
43 // idempotent call to create the sns topic
44 let topicResult = yield sns.createTopic({ Name: topicName }).promise();
45 let topicArn = topicResult.TopicArn;
46 topicArns.push(topicArn);
47 // subscribe to topic
48 yield sns
49 .subscribe({
50 TopicArn: topicArn,
51 Protocol: 'sqs',
52 Endpoint: queueArn,
53 })
54 .promise();
55 }
56 // attach policy to queue
57 // refer to: http://docs.aws.amazon.com/sns/latest/dg/SendMessageToSQS.html
58 yield exports.sqs
59 .setQueueAttributes({
60 QueueUrl: queueUrl,
61 Attributes: {
62 Policy: `{
63 "Version":"2012-10-17",
64 "Statement":[
65 {
66 "Sid":"${queueName}Policy",
67 "Effect":"Allow",
68 "Principal":"*",
69 "Action":"sqs:SendMessage",
70 "Resource":"${queueArn}",
71 "Condition":{
72 "ForAnyValue:ArnEquals":{
73 "aws:SourceArn":["${topicArns.join('","')}"]
74 }
75 }
76 }
77 ]
78}`,
79 },
80 })
81 .promise();
82 return {
83 queueName,
84 queueUrl,
85 queueArn,
86 topicArns,
87 topicNames,
88 };
89 });
90}
91exports.ensureQueue = ensureQueue;
92/**
93 * [receiveMessages description]
94 * @param {string} queueUrl [description]
95 * @param {SQS.Message[]) => void} callback [description]
96 * @return {Promise<void>} [description]
97 */
98function receiveMessages(queueUrl, callback) {
99 return __awaiter(this, void 0, void 0, function* () {
100 winston.info('receive messages', queueUrl);
101 let keepRunning = true;
102 while (keepRunning) {
103 let results = yield exports.sqs
104 .receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 10 })
105 .promise();
106 callback(results.Messages);
107 }
108 });
109}
110exports.receiveMessages = receiveMessages;
111/**
112 * [deleteMessage description]
113 * @param {string} queueUrl [description]
114 * @param {string} receiptHandle [description]
115 * @return {Promise<void>} [description]
116 */
117function deleteMessage(queueUrl, receiptHandle) {
118 return __awaiter(this, void 0, void 0, function* () {
119 yield exports.sqs
120 .deleteMessage({
121 QueueUrl: queueUrl,
122 ReceiptHandle: receiptHandle,
123 })
124 .promise();
125 });
126}
127exports.deleteMessage = deleteMessage;
128////////////////////
129/**
130 * Publishes a number of messages to the SQS queue
131 * @param queueUrl
132 * @param entries
133 */
134function sendMessageBatch(queueUrl, entries) {
135 return __awaiter(this, void 0, void 0, function* () {
136 entries = entries.slice();
137 let chain = Promise.resolve();
138 while (entries.length) {
139 let id = 0;
140 let chunk = entries.splice(0, 10);
141 chain.then(() => {
142 return exports.sqs
143 .sendMessageBatch({
144 QueueUrl: queueUrl,
145 Entries: chunk,
146 })
147 .promise();
148 });
149 }
150 return chain;
151 });
152}
153exports.sendMessageBatch = sendMessageBatch;
154//# sourceMappingURL=sqs.js.map
\No newline at end of file