import * as winston from 'winston'; import { SQS, SNS } from 'aws-sdk'; import { SendMessageBatchRequestEntry } from 'aws-sdk/clients/sqs'; const sns = new SNS({ region: 'us-east-1' }); export const sqs = new SQS({ region: 'us-east-1' }); export interface EnsureQueueResult { queueName: string; queueUrl: string; queueArn: string; topicArns: string[]; topicNames: string[]; } /** * http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html * http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SNS.html * @param {string} queueName [description] * @param {string[] = []} topicNames [description] * @return {Promise} [description] */ export async function ensureQueue( queueName: string, topicNames: string[] = [] ): Promise { // idempotent call to create the queue winston.info('ensure queue', queueName); await sqs .createQueue({ QueueName: queueName, Attributes: { MessageRetentionPeriod: '60' } }) .promise(); // retrieve the url let queueUrlResult = await sqs.getQueueUrl({ QueueName: queueName }).promise(); let queueUrl = queueUrlResult.QueueUrl; winston.info('queueUrl', queueUrl); // retrive the arn for use with topics let queueAttributes = await sqs .getQueueAttributes({ QueueUrl: queueUrl, AttributeNames: ['QueueArn'] }) .promise(); let queueArn = queueAttributes.Attributes.QueueArn; winston.info('queueArn', queueArn); // attach to each topic let topicArns = []; for (let topicName of topicNames) { winston.info('subscribing to', topicName); // idempotent call to create the sns topic let topicResult = await sns.createTopic({ Name: topicName }).promise(); let topicArn = topicResult.TopicArn; topicArns.push(topicArn); // subscribe to topic await sns .subscribe({ TopicArn: topicArn, Protocol: 'sqs', Endpoint: queueArn, }) .promise(); } // attach policy to queue // refer to: http://docs.aws.amazon.com/sns/latest/dg/SendMessageToSQS.html await sqs .setQueueAttributes({ QueueUrl: queueUrl, Attributes: { Policy: `{ "Version":"2012-10-17", "Statement":[ { "Sid":"${queueName}Policy", "Effect":"Allow", "Principal":"*", "Action":"sqs:SendMessage", "Resource":"${queueArn}", "Condition":{ "ForAnyValue:ArnEquals":{ "aws:SourceArn":["${topicArns.join('","')}"] } } } ] }`, }, }) .promise(); return { queueName, queueUrl, queueArn, topicArns, topicNames, }; } /** * [receiveMessages description] * @param {string} queueUrl [description] * @param {SQS.Message[]) => void} callback [description] * @return {Promise} [description] */ export async function receiveMessages( queueUrl: string, callback: (messages: SQS.Message[]) => void ): Promise { winston.info('receive messages', queueUrl); let keepRunning = true; while (keepRunning) { try { let results = await sqs .receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 10 }) .promise(); callback(results.Messages); } catch (ex) { // this needs to be here incase we have an issue with the AWS-SDK // which occassionally happens. we want this loop to continue. winston.error(ex); } } } /** * [deleteMessage description] * @param {string} queueUrl [description] * @param {string} receiptHandle [description] * @return {Promise} [description] */ export async function deleteMessage(queueUrl: string, receiptHandle: string): Promise { await sqs .deleteMessage({ QueueUrl: queueUrl, ReceiptHandle: receiptHandle, }) .promise(); } //////////////////// /** * Publishes a number of messages to the SQS queue * @param queueUrl * @param entries */ export async function sendMessageBatch(queueUrl: string, entries: SendMessageBatchRequestEntry[]) { entries = entries.slice(); let chain = Promise.resolve(); while (entries.length) { let id = 0; let chunk = entries.splice(0, 10); chain.then(() => { return sqs .sendMessageBatch({ QueueUrl: queueUrl, Entries: chunk, }) .promise(); }); } return chain; }