UNPKG

4.45 kBPlain TextView Raw
1import * as winston from 'winston';
2import { SQS, SNS } from 'aws-sdk';
3import { SendMessageBatchRequestEntry } from 'aws-sdk/clients/sqs';
4const sns = new SNS({ region: 'us-east-1' });
5
6export const sqs = new SQS({ region: 'us-east-1' });
7
8export interface EnsureQueueResult {
9 queueName: string;
10 queueUrl: string;
11 queueArn: string;
12 topicArns: string[];
13 topicNames: string[];
14}
15
16/**
17 * http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html
18 * http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SNS.html
19 * @param {string} queueName [description]
20 * @param {string[] = []} topicNames [description]
21 * @return {Promise<EnsureQueueResult>} [description]
22 */
23export async function ensureQueue(
24 queueName: string,
25 topicNames: string[] = []
26): Promise<EnsureQueueResult> {
27 // idempotent call to create the queue
28 winston.info('ensure queue', queueName);
29 await sqs
30 .createQueue({ QueueName: queueName, Attributes: { MessageRetentionPeriod: '60' } })
31 .promise();
32
33 // retrieve the url
34 let queueUrlResult = await sqs.getQueueUrl({ QueueName: queueName }).promise();
35 let queueUrl = queueUrlResult.QueueUrl;
36 winston.info('queueUrl', queueUrl);
37
38 // retrive the arn for use with topics
39 let queueAttributes = await sqs
40 .getQueueAttributes({ QueueUrl: queueUrl, AttributeNames: ['QueueArn'] })
41 .promise();
42 let queueArn = queueAttributes.Attributes.QueueArn;
43 winston.info('queueArn', queueArn);
44
45 // attach to each topic
46 let topicArns = [];
47 for (let topicName of topicNames) {
48 winston.info('subscribing to', topicName);
49
50 // idempotent call to create the sns topic
51 let topicResult = await sns.createTopic({ Name: topicName }).promise();
52 let topicArn = topicResult.TopicArn;
53 topicArns.push(topicArn);
54
55 // subscribe to topic
56 await sns
57 .subscribe({
58 TopicArn: topicArn,
59 Protocol: 'sqs',
60 Endpoint: queueArn,
61 })
62 .promise();
63 }
64
65 // attach policy to queue
66 // refer to: http://docs.aws.amazon.com/sns/latest/dg/SendMessageToSQS.html
67 await sqs
68 .setQueueAttributes({
69 QueueUrl: queueUrl,
70 Attributes: {
71 Policy: `{
72 "Version":"2012-10-17",
73 "Statement":[
74 {
75 "Sid":"${queueName}Policy",
76 "Effect":"Allow",
77 "Principal":"*",
78 "Action":"sqs:SendMessage",
79 "Resource":"${queueArn}",
80 "Condition":{
81 "ForAnyValue:ArnEquals":{
82 "aws:SourceArn":["${topicArns.join('","')}"]
83 }
84 }
85 }
86 ]
87}`,
88 },
89 })
90 .promise();
91
92 return {
93 queueName,
94 queueUrl,
95 queueArn,
96 topicArns,
97 topicNames,
98 };
99}
100
101/**
102 * [receiveMessages description]
103 * @param {string} queueUrl [description]
104 * @param {SQS.Message[]) => void} callback [description]
105 * @return {Promise<void>} [description]
106 */
107export async function receiveMessages(
108 queueUrl: string,
109 callback: (messages: SQS.Message[]) => void
110): Promise<void> {
111 winston.info('receive messages', queueUrl);
112 let keepRunning = true;
113
114 while (keepRunning) {
115 try {
116 let results = await sqs
117 .receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 10 })
118 .promise();
119 callback(results.Messages);
120 } catch (ex) {
121 // this needs to be here incase we have an issue with the AWS-SDK
122 // which occassionally happens. we want this loop to continue.
123 winston.error(ex);
124 }
125 }
126}
127
128/**
129 * [deleteMessage description]
130 * @param {string} queueUrl [description]
131 * @param {string} receiptHandle [description]
132 * @return {Promise<void>} [description]
133 */
134export async function deleteMessage(queueUrl: string, receiptHandle: string): Promise<void> {
135 await sqs
136 .deleteMessage({
137 QueueUrl: queueUrl,
138 ReceiptHandle: receiptHandle,
139 })
140 .promise();
141}
142
143////////////////////
144
145/**
146 * Publishes a number of messages to the SQS queue
147 * @param queueUrl
148 * @param entries
149 */
150export async function sendMessageBatch(queueUrl: string, entries: SendMessageBatchRequestEntry[]) {
151 entries = entries.slice();
152 let chain = Promise.resolve();
153 while (entries.length) {
154 let id = 0;
155 let chunk = entries.splice(0, 10);
156
157 chain.then(() => {
158 return sqs
159 .sendMessageBatch({
160 QueueUrl: queueUrl,
161 Entries: chunk,
162 })
163 .promise();
164 });
165 }
166 return chain;
167}