All files / src/publisher sns.ts

100% Statements 26/26
100% Branches 7/7
100% Functions 6/6
100% Lines 23/23

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80  5x                           5x 5x         5x               9x   9x         9x 9x     5x 4x                     4x     5x 5x 1x     4x 4x 3x   1x           2x   1x   1x 1x       5x
 
import { config, HTTPOptions, SNS } from 'aws-sdk';
import { AWSConfig } from '../aws/config';
import { MessageType } from '../model/message';
import { HasSubscribers, Publisher, Subscriber, Subscription } from './publisher';
 
 
export interface SNSOption {
    protocol?: Protocols;
    endpointSubscriber?: string;
    endpointUrl?: string;
    maxRetries?: number;
    httpOptions?: HTTPOptions;
}
 
export enum Protocols {
    HTTP = 'http', HTTPS = 'https'
}
/**
 * A Publisher that use SQS to message communications.
 */
export class SNSPublisher implements Publisher, HasSubscribers {
 
 
    private url: string;
    private sns: SNS;
    private snsOption: SNSOption;
 
    constructor(url: string, awsconfig: AWSConfig, snsOptions?: SNSOption) {
        config.update(awsconfig);
 
        this.sns = new SNS(
            snsOptions ? { endpoint: snsOptions.endpointUrl, 
                httpOptions: snsOptions.httpOptions, 
                maxRetries: snsOptions.maxRetries } : undefined
        );
        this.url = url;
        this.snsOption = snsOptions;
    }
 
    public async publish(message: MessageType): Promise<string> {
        const snsData = {
            Message: JSON.stringify(message),
            MessageAttributes: {
                eventType: {
                    DataType: 'String',
                    StringValue: message.event.eventType,
                }
            },
            TopicArn: this.url,
        };
 
        return (await this.sns.publish(snsData).promise()).MessageId;
    }
 
    public async subscribe(_: string, __: Subscriber): Promise<Subscription> {
        if (this.snsOption === undefined) {
            throw new Error('SNSOption is required to subscriber');
        }
 
        const protocolRegex = RegExp(`^${this.snsOption.protocol}:`);
        if (!protocolRegex.exec(this.snsOption.endpointSubscriber)) {
            throw new Error('Protocol and endpoint subscriber does not match');
        }
        const snsParams = {
            Endpoint: this.snsOption.endpointSubscriber,
            Protocol: this.snsOption.protocol,
            TopicArn: this.url,
        };
 
        await this.sns.subscribe(snsParams).promise();
 
        return Promise.resolve({
            remove: () => {
                this.sns.unsubscribe().promise();
                return Promise.resolve();
            }
        });
    }
}