import xs, {Stream} from 'xstream'; import quicktask from 'quicktask'; import {adapt} from './adapt'; import { DevToolEnabledSource, DisposeFunction, Drivers, SinkProxies, Sources, } from './types'; export function makeSinkProxies(drivers: D): SinkProxies { const sinkProxies: SinkProxies = {} as SinkProxies; for (const name in drivers) { if (drivers.hasOwnProperty(name)) { sinkProxies[name] = xs.create(); } } return sinkProxies; } export function callDrivers( drivers: D, sinkProxies: SinkProxies ): Sources { const sources: Sources = {} as Sources; for (const name in drivers) { if (drivers.hasOwnProperty(name)) { sources[name as any] = (drivers[name] as any)(sinkProxies[name], name); if (sources[name as any] && typeof sources[name as any] === 'object') { (sources[name as any] as DevToolEnabledSource)._isCycleSource = name; } } } return sources; } // NOTE: this will mutate `sources`. export function adaptSources(sources: So): So { for (const name in sources) { if ( sources.hasOwnProperty(name) && sources[name] && typeof ((sources[name] as any) as Stream).shamefullySendNext === 'function' ) { sources[name] = adapt((sources[name] as any) as Stream); } } return sources; } /** * Notice that we do not replicate 'complete' from real sinks, in * SinksReplicators and ReplicationBuffers. * Complete is triggered only on disposeReplication. See discussion in #425 * for details. */ type SinkReplicators = { [P in keyof Si]: { next(x: any): void; _n?(x: any): void; error(err: any): void; _e?(err: any): void; complete(): void; }; }; type ReplicationBuffers = { [P in keyof Si]: { _n: Array; _e: Array; }; }; export function replicateMany( sinks: Si, sinkProxies: SinkProxies ): DisposeFunction { const scheduleMicrotask = quicktask(); const sinkNames: Array = Object.keys(sinks).filter( name => !!sinkProxies[name] ); let buffers: ReplicationBuffers = {} as ReplicationBuffers; const replicators: SinkReplicators = {} as SinkReplicators; sinkNames.forEach(name => { buffers[name] = {_n: [], _e: []}; replicators[name] = { next: (x: any) => buffers[name]._n.push(x), error: (err: any) => buffers[name]._e.push(err), complete: () => {}, }; }); const subscriptions = sinkNames.map(name => xs.fromObservable(sinks[name] as any).subscribe(replicators[name]) ); sinkNames.forEach(name => { const listener = sinkProxies[name]; const next = (x: any) => { scheduleMicrotask(() => listener._n(x)); }; const error = (err: any) => { scheduleMicrotask(() => { (console.error || console.log)(err); listener._e(err); }); }; buffers[name]._n.forEach(next); buffers[name]._e.forEach(error); replicators[name].next = next; replicators[name].error = error; // because sink.subscribe(replicator) had mutated replicator to add // _n, _e, _c, we must also update these: replicators[name]._n = next; replicators[name]._e = error; }); buffers = null as any; // free up for GC return function disposeReplication() { subscriptions.forEach(s => s.unsubscribe()); }; } export function disposeSinkProxies(sinkProxies: SinkProxies) { Object.keys(sinkProxies).forEach(name => sinkProxies[name]._c()); } export function disposeSources(sources: So) { for (const k in sources) { if ( sources.hasOwnProperty(k) && sources[k] && (sources[k] as any).dispose ) { (sources[k] as any).dispose(); } } } export function isObjectEmpty(obj: any): boolean { return Object.keys(obj).length === 0; }