UNPKG

3.9 kBPlain TextView Raw
1import xs, {Stream} from 'xstream';
2import quicktask from 'quicktask';
3import {adapt} from './adapt';
4import {
5 DevToolEnabledSource,
6 DisposeFunction,
7 Drivers,
8 SinkProxies,
9 Sources,
10} from './types';
11
12const scheduleMicrotask = quicktask();
13
14export function makeSinkProxies<D extends Drivers>(drivers: D): SinkProxies<D> {
15 const sinkProxies: SinkProxies<D> = {} as SinkProxies<D>;
16 for (const name in drivers) {
17 if (drivers.hasOwnProperty(name)) {
18 sinkProxies[name] = xs.create<any>();
19 }
20 }
21 return sinkProxies;
22}
23
24export function callDrivers<D extends Drivers>(
25 drivers: D,
26 sinkProxies: SinkProxies<D>
27): Sources<D> {
28 const sources: Sources<D> = {} as Sources<D>;
29 for (const name in drivers) {
30 if (drivers.hasOwnProperty(name)) {
31 sources[name as any] = (drivers[name] as any)(sinkProxies[name], name);
32 if (sources[name as any] && typeof sources[name as any] === 'object') {
33 (sources[name as any] as DevToolEnabledSource)._isCycleSource = name;
34 }
35 }
36 }
37 return sources;
38}
39
40// NOTE: this will mutate `sources`.
41export function adaptSources<So>(sources: So): So {
42 for (const name in sources) {
43 if (
44 sources.hasOwnProperty(name) &&
45 sources[name] &&
46 typeof ((sources[name] as any) as Stream<any>).shamefullySendNext ===
47 'function'
48 ) {
49 sources[name] = adapt((sources[name] as any) as Stream<any>);
50 }
51 }
52 return sources;
53}
54
55/**
56 * Notice that we do not replicate 'complete' from real sinks, in
57 * SinksReplicators and ReplicationBuffers.
58 * Complete is triggered only on disposeReplication. See discussion in #425
59 * for details.
60 */
61type SinkReplicators<Si> = {
62 [P in keyof Si]: {
63 next(x: any): void;
64 _n?(x: any): void;
65 error(err: any): void;
66 _e?(err: any): void;
67 complete(): void;
68 }
69};
70
71type ReplicationBuffers<Si> = {
72 [P in keyof Si]: {
73 _n: Array<any>;
74 _e: Array<any>;
75 }
76};
77
78export function replicateMany<Si extends any>(
79 sinks: Si,
80 sinkProxies: SinkProxies<Si>
81): DisposeFunction {
82 const sinkNames: Array<keyof Si> = Object.keys(sinks).filter(
83 name => !!sinkProxies[name]
84 );
85
86 let buffers: ReplicationBuffers<Si> = {} as ReplicationBuffers<Si>;
87 const replicators: SinkReplicators<Si> = {} as SinkReplicators<Si>;
88 sinkNames.forEach(name => {
89 buffers[name] = {_n: [], _e: []};
90 replicators[name] = {
91 next: (x: any) => buffers[name]._n.push(x),
92 error: (err: any) => buffers[name]._e.push(err),
93 complete: () => {},
94 };
95 });
96
97 const subscriptions = sinkNames.map(name =>
98 xs.fromObservable(sinks[name] as any).subscribe(replicators[name])
99 );
100
101 sinkNames.forEach(name => {
102 const listener = sinkProxies[name];
103 const next = (x: any) => {
104 scheduleMicrotask(() => listener._n(x));
105 };
106 const error = (err: any) => {
107 scheduleMicrotask(() => {
108 (console.error || console.log)(err);
109 listener._e(err);
110 });
111 };
112 buffers[name]._n.forEach(next);
113 buffers[name]._e.forEach(error);
114 replicators[name].next = next;
115 replicators[name].error = error;
116 // because sink.subscribe(replicator) had mutated replicator to add
117 // _n, _e, _c, we must also update these:
118 replicators[name]._n = next;
119 replicators[name]._e = error;
120 });
121 buffers = null as any; // free up for GC
122
123 return function disposeReplication() {
124 subscriptions.forEach(s => s.unsubscribe());
125 };
126}
127
128export function disposeSinkProxies<Si>(sinkProxies: SinkProxies<Si>) {
129 Object.keys(sinkProxies).forEach(name => sinkProxies[name]._c());
130}
131
132export function disposeSources<So>(sources: So) {
133 for (const k in sources) {
134 if (
135 sources.hasOwnProperty(k) &&
136 sources[k] &&
137 (sources[k] as any).dispose
138 ) {
139 (sources[k] as any).dispose();
140 }
141 }
142}
143
144export function isObjectEmpty(obj: any): boolean {
145 return Object.keys(obj).length === 0;
146}