export class EventSourcing { eventNames; eventLog; snapshots; snapshotInterval; lastSnapshotIndex; constructor(options = {}) { this.eventNames = new Map(); this.eventLog = []; this.snapshots = []; this.snapshotInterval = options.snapshotInterval || 100; this.lastSnapshotIndex = 0; this.persistence = options.persistence || null; this.maxLogSize = options.maxLogSize || Infinity; this.sequenceNumber = 0; // Load persisted events if persistence is enabled if (this.persistence) { this.loadEvents(); } } on(event, subscriber) { if (!this.eventNames.has(event)) { this.eventNames.set(event, new Set()); } this.eventNames.get(event).add(subscriber); return () => this.off(event, subscriber); } off(event, subscriber) { this.eventNames.get(event)?.delete(subscriber); } emit(event, data, options = {}) { const timestamp = new Date().toISOString(); const eventEntry = { id: this.generateEventId(), sequence: ++this.sequenceNumber, event, data, timestamp, metadata: options.metadata || {}, version: options.version || 1 }; // Add to event log this.eventLog.push(eventEntry); // Persist event if persistence is enabled if (this.persistence && !options.skipPersistence) { this.persistEvent(eventEntry); } // Emit to current subscribers (live projection) if (!options.skipEmit) { const subscribers = this.eventNames.get(event) ?? new Set(); for (const subscriber of subscribers) { try { subscriber(data, eventEntry); } catch (error) { console.error(`Error in event subscriber for ${event}:`, error); } } } // Create snapshot if needed if (this.shouldCreateSnapshot()) { this.createSnapshot(); } // Trim log if it exceeds max size if (this.eventLog.length > this.maxLogSize) { this.trimEventLog(); } return eventEntry.id; } // Event Sourcing Core Methods replay(fromSequence = 0, toSequence = Infinity) { const events = this.getEventsBySequenceRange(fromSequence, toSequence); for (const eventEntry of events) { const subscribers = this.eventNames.get(eventEntry.event) ?? new Set(); for (const subscriber of subscribers) { try { subscriber(eventEntry.data, eventEntry); } catch (error) { console.error(`Error during replay for ${eventEntry.event}:`, error); } } } return events.length; } replayFromSnapshot(snapshotIndex = -1) { const snapshot = snapshotIndex === -1 ? this.getLatestSnapshot() : this.snapshots[snapshotIndex]; if (!snapshot) { return this.replay(); } // Restore state from snapshot if it has state data if (snapshot.state) { this.restoreFromSnapshot(snapshot); } // Replay events after the snapshot return this.replay(snapshot.lastSequence + 1); } // Query Methods getEvents(filter = {}) { let events = [...this.eventLog]; if (filter.event) { events = events.filter(e => e.event === filter.event); } if (filter.fromTimestamp) { events = events.filter(e => new Date(e.timestamp) >= new Date(filter.fromTimestamp)); } if (filter.toTimestamp) { events = events.filter(e => new Date(e.timestamp) <= new Date(filter.toTimestamp)); } if (filter.fromSequence !== undefined) { events = events.filter(e => e.sequence >= filter.fromSequence); } if (filter.toSequence !== undefined) { events = events.filter(e => e.sequence <= filter.toSequence); } if (filter.metadata) { events = events.filter(e => { return Object.entries(filter.metadata).every(([key, value]) => e.metadata[key] === value ); }); } if (filter.limit) { events = events.slice(0, filter.limit); } return events; } getEventsBySequenceRange(from, to) { return this.eventLog.filter(e => e.sequence >= from && e.sequence <= to); } getEventsByTimeRange(fromTimestamp, toTimestamp) { const from = new Date(fromTimestamp); const to = new Date(toTimestamp); return this.eventLog.filter(e => { const eventTime = new Date(e.timestamp); return eventTime >= from && eventTime <= to; }); } getLastEvent(eventType) { for (let i = this.eventLog.length - 1; i >= 0; i--) { if (!eventType || this.eventLog[i].event === eventType) { return this.eventLog[i]; } } return null; } // Projection Methods project(projectionFn, filter = {}) { const events = this.getEvents(filter); let state = {}; for (const event of events) { state = projectionFn(state, event); } return state; } projectFromSnapshot(projectionFn, snapshotIndex = -1) { const snapshot = snapshotIndex === -1 ? this.getLatestSnapshot() : this.snapshots[snapshotIndex]; let state = snapshot?.projectionState || {}; const fromSequence = snapshot ? snapshot.lastSequence + 1 : 0; const events = this.getEventsBySequenceRange(fromSequence, Infinity); for (const event of events) { state = projectionFn(state, event); } return state; } // Snapshot Methods createSnapshot(customState = null, options = {}) { const snapshot = { id: this.generateSnapshotId(), timestamp: new Date().toISOString(), lastSequence: this.sequenceNumber, eventCount: this.eventLog.length, state: customState, projectionState: null }; this.snapshots.push(snapshot); this.lastSnapshotIndex = this.eventLog.length; if (this.persistence) { this.persistSnapshot(snapshot); } // Clear event log if requested if (options.clearLog) { this.clearEventLog(options.keepEvents || 0); } return snapshot; } clearEventLog(keepRecentCount = 0) { if (keepRecentCount > 0) { this.eventLog = this.eventLog.slice(-keepRecentCount); } else { this.eventLog = []; } // Update last snapshot index this.lastSnapshotIndex = this.eventLog.length; if (this.persistence) { this.persistence.clearEvents?.(keepRecentCount); } return this.eventLog.length; } createSnapshotAndClear(customState = null, keepRecentEvents = 0) { const snapshot = this.createSnapshot(customState, { clearLog: true, keepEvents: keepRecentEvents }); return { snapshot, eventsCleared: snapshot.eventCount - this.eventLog.length, eventsKept: this.eventLog.length }; } shouldCreateSnapshot() { return this.eventLog.length - this.lastSnapshotIndex >= this.snapshotInterval; } getLatestSnapshot() { return this.snapshots[this.snapshots.length - 1] || null; } restoreFromSnapshot(snapshot) { // Override this method in subclasses to restore specific state if (snapshot.state && typeof snapshot.state === 'object') { Object.assign(this, snapshot.state); } } // Persistence Methods async loadEvents() { if (!this.persistence) return; try { const data = await this.persistence.load(); if (data.events) { this.eventLog = data.events; this.sequenceNumber = Math.max(...this.eventLog.map(e => e.sequence), 0); } if (data.snapshots) { this.snapshots = data.snapshots; } } catch (error) { console.error('Failed to load events:', error); } } async persistEvent(event) { if (!this.persistence) return; try { await this.persistence.saveEvent(event); } catch (error) { console.error('Failed to persist event:', error); } } async persistSnapshot(snapshot) { if (!this.persistence) return; try { await this.persistence.saveSnapshot(snapshot); } catch (error) { console.error('Failed to persist snapshot:', error); } } async saveAll() { if (!this.persistence) return; try { await this.persistence.saveAll({ events: this.eventLog, snapshots: this.snapshots }); } catch (error) { console.error('Failed to save all data:', error); } } // Utility Methods generateEventId() { return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } generateSnapshotId() { return `snap_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } trimEventLog() { const latestSnapshot = this.getLatestSnapshot(); if (latestSnapshot) { // Keep events after the latest snapshot const keepFromIndex = this.eventLog.findIndex(e => e.sequence > latestSnapshot.lastSequence); if (keepFromIndex > 0) { this.eventLog = this.eventLog.slice(keepFromIndex); } } else { // No snapshots, keep only the most recent events const keepCount = Math.floor(this.maxLogSize * 0.8); this.eventLog = this.eventLog.slice(-keepCount); } } // Statistics and Monitoring getStats() { const eventTypes = new Map(); this.eventLog.forEach(e => { eventTypes.set(e.event, (eventTypes.get(e.event) || 0) + 1); }); return { totalEvents: this.eventLog.length, currentSequence: this.sequenceNumber, eventTypes: Object.fromEntries(eventTypes), subscriberCount: Array.from(this.eventNames.values()).reduce((sum, set) => sum + set.size, 0), snapshotCount: this.snapshots.length, lastEventTimestamp: this.eventLog.length > 0 ? this.eventLog[this.eventLog.length - 1].timestamp : null, memoryUsage: this.estimateMemoryUsage() }; } estimateMemoryUsage() { const eventSize = JSON.stringify(this.eventLog).length; const snapshotSize = JSON.stringify(this.snapshots).length; return { events: `${Math.round(eventSize / 1024)}KB`, snapshots: `${Math.round(snapshotSize / 1024)}KB`, total: `${Math.round((eventSize + snapshotSize) / 1024)}KB` }; } // Advanced Features createProjection(name, projectionFn, options = {}) { const projection = { name, fn: projectionFn, state: options.initialState || {}, lastProcessedSequence: 0, ...options }; // Process existing events const events = this.eventLog.filter(e => e.sequence > projection.lastProcessedSequence); for (const event of events) { projection.state = projectionFn(projection.state, event); projection.lastProcessedSequence = event.sequence; } // Subscribe to new events this.on('*', (data, event) => { if (event.sequence > projection.lastProcessedSequence) { projection.state = projectionFn(projection.state, event); projection.lastProcessedSequence = event.sequence; } }); return projection; } // Batch operations emitBatch(events, options = {}) { const results = []; const batchId = this.generateEventId(); for (const { event, data, metadata = {} } of events) { const eventId = this.emit(event, data, { ...options, metadata: { ...metadata, batchId } }); results.push(eventId); } return results; } terminate() { // Save all data before terminating if (this.persistence) { this.saveAll(); } this.eventNames.forEach((subscribers) => subscribers.clear()); this.eventNames.clear(); this.eventLog = []; this.snapshots = []; } } class EventSourcingStream extends EventSourcing { name = "StreamSource"; source; // source emitter constructor(...argv) { super(...argv); this.unsubscribe = new Set(); } emitValue(value) { this.emit("value", value); } subscribe(subscriber) { this.on("value", (v) => subscriber(v, this)); return () => this.off("value", subscriber); } get path() { const sources = []; const getNextItem = (node) => node.source; let next = this; while (next) { sources.unshift(next); next = getNextItem(next); } return sources; } terminate() { super.terminate(); this.unsubscribe.forEach((subscriber) => subscriber()); this.unsubscribe.clear(); this.path.forEach(fragment.terminate()); } } // Simple in-memory persistence adapter example export class EventSourcingMemoryPersistence { constructor() { this.data = { events: [], snapshots: [] }; } async load() { return { ...this.data }; } async saveEvent(event) { this.data.events.push(event); } async saveSnapshot(snapshot) { this.data.snapshots.push(snapshot); } async saveAll(data) { this.data = { ...data }; } async clearEvents(keepRecentCount) { this.data.events = keepRecentCount > 0 ? this.data.events.slice(-keepRecentCount) : []; } } // LocalStorage persistence adapter example export class EventSourcingLocalStoragePersistence { constructor(key = 'event_store') { this.key = key; } async load() { const data = localStorage.getItem(this.key); return data ? JSON.parse(data) : { events: [], snapshots: [] }; } async saveEvent(event) { const data = await this.load(); data.events.push(event); localStorage.setItem(this.key, JSON.stringify(data)); } async saveSnapshot(snapshot) { const data = await this.load(); data.snapshots.push(snapshot); localStorage.setItem(this.key, JSON.stringify(data)); } async saveAll(data) { localStorage.setItem(this.key, JSON.stringify(data)); } async clearEvents(keepRecentCount) { const data = await this.load(); data.events = keepRecentCount > 0 ? data.events.slice(-keepRecentCount) : []; localStorage.setItem(this.key, JSON.stringify(data)); } } export class ReactiveEventSourcingStream extends EventSourcingStream { name = "EventSourcingEngine"; fromEvent(...argv) { return fromEvent(this, ...argv); } iterate(...argv) { return iterate(this, ...argv); } map(...argv) { return map(this, ...argv); } filter(...argv) { return filter(this, ...argv); } debounce(...argv) { return debounce(this, ...argv); } distinctUntilChanged(...argv) { return distinctUntilChanged(this, ...argv); } scan(...argv) { return scan(this, ...argv); } delay(...argv) { return delay(this, ...argv); } throttle(...argv) { return throttle(this, ...argv); } withLatestFrom(...argv) { return withLatestFrom(this, ...argv); } merge(...argv) { return merge(this, ...argv); } log(...argv) { return log(this, ...argv); } } // Usage Examples: /* // Basic usage with event sourcing const emitter = new EventSourcing({ snapshotInterval: 50, maxLogSize: 1000, persistence: new EventSourcingMemoryPersistence() }); // Listen to events emitter.on('user.created', (data, event) => { console.log('User created:', data.name, 'at', event.timestamp); }); // Emit events with metadata emitter.emit('user.created', { id: 1, name: 'John' }, { metadata: { source: 'api', userId: 'admin' } } ); // Query events const userEvents = emitter.getEvents({ event: 'user.created', fromTimestamp: '2023-01-01' }); // Create projections const userProjection = emitter.project((state, event) => { if (event.event === 'user.created') { state.users = state.users || []; state.users.push(event.data); } return state; }); // Create snapshots emitter.createSnapshot({ customData: 'state' }); // Replay events emitter.replay(0, 100); // Replay events 0-100 // Get statistics console.log(emitter.getStats()); */