1 | import * as winston from 'winston';
|
2 | import { SQS, SNS } from 'aws-sdk';
|
3 | import { SendMessageBatchRequestEntry } from 'aws-sdk/clients/sqs';
|
4 | const sns = new SNS({ region: 'us-east-1' });
|
5 |
|
6 | export const sqs = new SQS({ region: 'us-east-1' });
|
7 |
|
8 | export interface EnsureQueueResult {
|
9 | queueName: string;
|
10 | queueUrl: string;
|
11 | queueArn: string;
|
12 | topicArns: string[];
|
13 | topicNames: string[];
|
14 | }
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 | export async function ensureQueue(
|
24 | queueName: string,
|
25 | topicNames: string[] = []
|
26 | ): Promise<EnsureQueueResult> {
|
27 |
|
28 | winston.info('ensure queue', queueName);
|
29 | await sqs
|
30 | .createQueue({ QueueName: queueName, Attributes: { MessageRetentionPeriod: '60' } })
|
31 | .promise();
|
32 |
|
33 |
|
34 | let queueUrlResult = await sqs.getQueueUrl({ QueueName: queueName }).promise();
|
35 | let queueUrl = queueUrlResult.QueueUrl;
|
36 | winston.info('queueUrl', queueUrl);
|
37 |
|
38 |
|
39 | let queueAttributes = await sqs
|
40 | .getQueueAttributes({ QueueUrl: queueUrl, AttributeNames: ['QueueArn'] })
|
41 | .promise();
|
42 | let queueArn = queueAttributes.Attributes.QueueArn;
|
43 | winston.info('queueArn', queueArn);
|
44 |
|
45 |
|
46 | let topicArns = [];
|
47 | for (let topicName of topicNames) {
|
48 | winston.info('subscribing to', topicName);
|
49 |
|
50 |
|
51 | let topicResult = await sns.createTopic({ Name: topicName }).promise();
|
52 | let topicArn = topicResult.TopicArn;
|
53 | topicArns.push(topicArn);
|
54 |
|
55 |
|
56 | await sns
|
57 | .subscribe({
|
58 | TopicArn: topicArn,
|
59 | Protocol: 'sqs',
|
60 | Endpoint: queueArn,
|
61 | })
|
62 | .promise();
|
63 | }
|
64 |
|
65 |
|
66 |
|
67 | await sqs
|
68 | .setQueueAttributes({
|
69 | QueueUrl: queueUrl,
|
70 | Attributes: {
|
71 | Policy: `{
|
72 | "Version":"2012-10-17",
|
73 | "Statement":[
|
74 | {
|
75 | "Sid":"${queueName}Policy",
|
76 | "Effect":"Allow",
|
77 | "Principal":"*",
|
78 | "Action":"sqs:SendMessage",
|
79 | "Resource":"${queueArn}",
|
80 | "Condition":{
|
81 | "ForAnyValue:ArnEquals":{
|
82 | "aws:SourceArn":["${topicArns.join('","')}"]
|
83 | }
|
84 | }
|
85 | }
|
86 | ]
|
87 | }`,
|
88 | },
|
89 | })
|
90 | .promise();
|
91 |
|
92 | return {
|
93 | queueName,
|
94 | queueUrl,
|
95 | queueArn,
|
96 | topicArns,
|
97 | topicNames,
|
98 | };
|
99 | }
|
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 | export async function receiveMessages(
|
108 | queueUrl: string,
|
109 | callback: (messages: SQS.Message[]) => void
|
110 | ): Promise<void> {
|
111 | winston.info('receive messages', queueUrl);
|
112 | let keepRunning = true;
|
113 |
|
114 | while (keepRunning) {
|
115 | try {
|
116 | let results = await sqs
|
117 | .receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 10 })
|
118 | .promise();
|
119 | callback(results.Messages);
|
120 | } catch (ex) {
|
121 |
|
122 |
|
123 | winston.error(ex);
|
124 | }
|
125 | }
|
126 | }
|
127 |
|
128 |
|
129 |
|
130 |
|
131 |
|
132 |
|
133 |
|
134 | export async function deleteMessage(queueUrl: string, receiptHandle: string): Promise<void> {
|
135 | await sqs
|
136 | .deleteMessage({
|
137 | QueueUrl: queueUrl,
|
138 | ReceiptHandle: receiptHandle,
|
139 | })
|
140 | .promise();
|
141 | }
|
142 |
|
143 |
|
144 |
|
145 |
|
146 |
|
147 |
|
148 |
|
149 |
|
150 | export async function sendMessageBatch(queueUrl: string, entries: SendMessageBatchRequestEntry[]) {
|
151 | entries = entries.slice();
|
152 | let chain = Promise.resolve();
|
153 | while (entries.length) {
|
154 | let id = 0;
|
155 | let chunk = entries.splice(0, 10);
|
156 |
|
157 | chain.then(() => {
|
158 | return sqs
|
159 | .sendMessageBatch({
|
160 | QueueUrl: queueUrl,
|
161 | Entries: chunk,
|
162 | })
|
163 | .promise();
|
164 | });
|
165 | }
|
166 | return chain;
|
167 | }
|