All files / src event-store.ts

100% Statements 22/22
100% Branches 4/4
100% Functions 8/8
100% Lines 21/21

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 1144x               4x           17x 17x     4x 32x 1x   31x     4x 33x                   4x 23x                   4x 3x 2x   1x                   4x 4x                   4x 3x   4x                                                                            
import { EventStreamImpl } from './event-stream';
import { Event } from './model/event';
import { PersistenceProvider } from './provider/provider';
import { HasSubscribers, Publisher, Subscriber, Subscription } from './publisher/publisher';
 
/**
 * The EventStore itself. To create EventStore instances, use the {@link EventStoreBuilder}
 */
export class EventStore implements EventStore, HasSubscribers {
 
    private persistenceProvider: PersistenceProvider;
    private storePublisher: Publisher;
 
    public constructor(provider: PersistenceProvider, publisher?: Publisher) {
        this.persistenceProvider = provider;
        this.storePublisher = publisher;
    }
 
    public get provider(): PersistenceProvider {
        if (!this.persistenceProvider) {
            throw new Error('No Provider configured in EventStore.');
        }
        return this.persistenceProvider;
    }
 
    public get publisher(): Publisher | HasSubscribers {
        return this.storePublisher;
    }
 
    /**
     * Retrieve an event stream.
     * @param aggregation The parent aggregation for the event stream
     * @param streamId The stream identifier. Can be any string
     * @return The existing stream. If no stream exists for to the given id, a new one
     * will be created when the first event is added to the stream.
     */
    public getEventStream(aggregation: string, streamId: string): EventStream {
        return new EventStreamImpl(this, { aggregation: aggregation, id: streamId });
    }
 
    /**
     * Add a new subscription to notifications channel associated with the given aggregation.
     * It is necessary to have a valid {@link Publisher} configured that supports subscriptions.
     * @param aggregation The aggregation for the stream events
     * @param subscriber Declares the function to be called to handle new messages
     * @return A subscription. Can be used to remove the subscription to the publisher channel.
     */
    public subscribe(aggregation: string, subscriber: Subscriber): Promise<Subscription> {
        if (this.publisher && (this.publisher as HasSubscribers).subscribe) {
            return (this.publisher as HasSubscribers).subscribe(aggregation, subscriber);
        }
        throw new Error('There is no valid Publisher configured. '
            + 'Configure a Publisher that implements HasSubscribers int erface');
    }
 
    /**
     * Retrieves a ranged aggregation list
     * @param offset The start position in the aggregation list
     * @param limit The desired quantity aggregations
     * @return The aggregation list
     */
    public async getAggregations(offset?: number, limit?: number) {
        return this.provider.getAggregations(offset, limit);
    }
 
    /**
     * Retrieves a ranged stream list
     * @param aggregation The aggregation
     * @param offset The start position in the stream list
     * @param limit The desired quantity streams
     * @return The stream list
     */
    public async getStreams(aggregation: string, offset?: number, limit?: number) {
        return this.provider.getStreams(aggregation, offset, limit);
    }
}
 
/**
 * An Event Stream
 */
export interface EventStream {
    /**
     * The event stream identifier
     */
    streamId: string;
    /**
     * The parent aggregation for this event stream
     */
    aggregation: string;
    /**
     * Rertieve a list containing all the events in the stream in order.
     * @param offset The start position in the stream list
     * @param limit The desired quantity events
     * @return All the events
     */
    getEvents(offset?: number, limit?: number): Promise<Array<Event>>;
 
    /**
     * Rertieve only one object containing all the events's data in the stream in order.
     * @param offset The start position in the stream list
     * @param limit The desired quantity events
     * @return All the events
     */
    loadFromHistory(offset?: number, limit?: number): Promise<Event>;
 
    /**
     * Add a new event to the end of the event stream.
     * @param data The event data
     * @param type The Event type
     * @return The event, updated with informations like its sequence order and commitTimestamp
     */
    addEvent(data: any, type?: string): Promise<Event>;
}