All files / src/provider dynamodb.ts

100% Statements 40/40
100% Branches 12/12
100% Functions 8/8
100% Lines 35/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102    4x 4x                 4x         12x   12x 12x                 4x 4x 4x 4x 4x               4x         8x   4x             8x   6x           6x 6x 3x     6x   7x 7x 7x 7x 7x   6x 28x                 6x     4x 1x     4x 1x     4x 10x   4x  
'use strict';
 
import AWS = require('aws-sdk');
import { DocumentClient, ItemList, QueryOutput } from 'aws-sdk/clients/dynamodb';
import { Config } from '../dynamodb/dynamodb-config';
import { Event, EventType } from '../model/event';
import { Stream } from '../model/stream';
import { PersistenceProvider } from './provider';
 
/**
 * A Persistence Provider that handle all the data in Dynamodb.
 */
export class DynamodbProvider implements PersistenceProvider {
    private documentClient: DocumentClient;
    private config: Config;
 
    constructor(config: Config) {
        this.config = config;
 
        AWS.config.update(config.awsConfig);
        this.documentClient = new DocumentClient(
            {
                convertEmptyValues: true,
                endpoint: config.dynamodb.endpointUrl,
                httpOptions: config.dynamodb.httpOptions,
                maxRetries: config.dynamodb.maxRetries
            });
    }
 
    public async addEvent(stream: Stream, data: any): Promise<EventType> {
        const now = new Date();
        const commitTimestamp = now.getTime();
        const commitTimetampSeconds = Math.floor(commitTimestamp / 1000);
        const event = {
            aggregation_streamid: `${this.getKey(stream)}`,
            commitTimestamp: commitTimestamp,
            eventType: data.eventType,
            payload: data,
            stream: stream,
            ttl: this.config.dynamodb.ttl ? (commitTimetampSeconds + this.config.dynamodb.ttl) : undefined,
        };
        const record = {
            Item: event,
            TableName: this.config.dynamodb.tableName,
        };
 
        await this.documentClient.put(record).promise();
 
        return {
            commitTimestamp: commitTimestamp,
            eventType: data.eventType,
            payload: data,
        };
    }
 
    public async getEvents(stream: Stream, offset: number = 0, limit: number = -1): Promise<Array<Event>> {
        let exclusiveStartKey: any;
        let filter = {
            ExpressionAttributeValues: { ':key': this.getKey(stream) },
            KeyConditionExpression: 'aggregation_streamid = :key',
            ScanIndexForward: false,
            TableName: this.config.dynamodb.tableName,
        };
        const pageSize = offset + limit;
        if (pageSize > 0) {
            filter = { ...filter, ...{ Limit: limit } };
        }
 
        let items: ItemList = [];
        do {
            filter = { ...filter, ...{ ExclusiveStartKey: exclusiveStartKey } };
            const queryOutput: QueryOutput = (await this.documentClient.query(filter).promise());
            exclusiveStartKey = queryOutput.LastEvaluatedKey || null;
            items = items.concat(queryOutput.Items);
        } while (items.length < pageSize);
 
        const events = items.map((data, index) => {
            return {
                commitTimestamp: data.commitTimestamp,
                eventType: data.eventType || (data.payload as any).eventType,
                payload: data.payload,
                sequence: index,
                ttl: data.ttl,
            } as Event;
        });
 
        return pageSize <= 0 ? events.slice(offset) : events.slice(offset, pageSize);
    }
 
    public async getAggregations(offset: number, limit: number): Promise<Array<string>> {
        throw new Error('Method not supported');
    }
 
    public async getStreams(aggregation: string, offset: number, limit: number): Promise<Array<string>> {
        throw new Error('Method not supported');
    }
 
    private getKey(stream: Stream): string {
        return `${stream.aggregation}:${stream.id}`;
    }
}