Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | 4x 4x 4x 12x 12x 12x 4x 4x 4x 4x 4x 4x 8x 4x 8x 6x 6x 6x 3x 6x 7x 7x 7x 7x 7x 6x 28x 6x 4x 1x 4x 1x 4x 10x 4x | 'use strict';
import AWS = require('aws-sdk');
import { DocumentClient, ItemList, QueryOutput } from 'aws-sdk/clients/dynamodb';
import { Config } from '../dynamodb/dynamodb-config';
import { Event, EventType } from '../model/event';
import { Stream } from '../model/stream';
import { PersistenceProvider } from './provider';
/**
* A Persistence Provider that handle all the data in Dynamodb.
*/
export class DynamodbProvider implements PersistenceProvider {
private documentClient: DocumentClient;
private config: Config;
constructor(config: Config) {
this.config = config;
AWS.config.update(config.awsConfig);
this.documentClient = new DocumentClient(
{
convertEmptyValues: true,
endpoint: config.dynamodb.endpointUrl,
httpOptions: config.dynamodb.httpOptions,
maxRetries: config.dynamodb.maxRetries
});
}
public async addEvent(stream: Stream, data: any): Promise<EventType> {
const now = new Date();
const commitTimestamp = now.getTime();
const commitTimetampSeconds = Math.floor(commitTimestamp / 1000);
const event = {
aggregation_streamid: `${this.getKey(stream)}`,
commitTimestamp: commitTimestamp,
eventType: data.eventType,
payload: data,
stream: stream,
ttl: this.config.dynamodb.ttl ? (commitTimetampSeconds + this.config.dynamodb.ttl) : undefined,
};
const record = {
Item: event,
TableName: this.config.dynamodb.tableName,
};
await this.documentClient.put(record).promise();
return {
commitTimestamp: commitTimestamp,
eventType: data.eventType,
payload: data,
};
}
public async getEvents(stream: Stream, offset: number = 0, limit: number = -1): Promise<Array<Event>> {
let exclusiveStartKey: any;
let filter = {
ExpressionAttributeValues: { ':key': this.getKey(stream) },
KeyConditionExpression: 'aggregation_streamid = :key',
ScanIndexForward: false,
TableName: this.config.dynamodb.tableName,
};
const pageSize = offset + limit;
if (pageSize > 0) {
filter = { ...filter, ...{ Limit: limit } };
}
let items: ItemList = [];
do {
filter = { ...filter, ...{ ExclusiveStartKey: exclusiveStartKey } };
const queryOutput: QueryOutput = (await this.documentClient.query(filter).promise());
exclusiveStartKey = queryOutput.LastEvaluatedKey || null;
items = items.concat(queryOutput.Items);
} while (items.length < pageSize);
const events = items.map((data, index) => {
return {
commitTimestamp: data.commitTimestamp,
eventType: data.eventType || (data.payload as any).eventType,
payload: data.payload,
sequence: index,
ttl: data.ttl,
} as Event;
});
return pageSize <= 0 ? events.slice(offset) : events.slice(offset, pageSize);
}
public async getAggregations(offset: number, limit: number): Promise<Array<string>> {
throw new Error('Method not supported');
}
public async getStreams(aggregation: string, offset: number, limit: number): Promise<Array<string>> {
throw new Error('Method not supported');
}
private getKey(stream: Stream): string {
return `${stream.aggregation}:${stream.id}`;
}
}
|