UNPKG

3.9 kBPlain TextView Raw
1import xs, {Stream} from 'xstream';
2import quicktask from 'quicktask';
3import {adapt} from './adapt';
4import {
5 DevToolEnabledSource,
6 DisposeFunction,
7 Drivers,
8 SinkProxies,
9 Sources,
10} from './types';
11
12export function makeSinkProxies<D extends Drivers>(drivers: D): SinkProxies<D> {
13 const sinkProxies: SinkProxies<D> = {} as SinkProxies<D>;
14 for (const name in drivers) {
15 if (drivers.hasOwnProperty(name)) {
16 sinkProxies[name] = xs.create<any>();
17 }
18 }
19 return sinkProxies;
20}
21
22export function callDrivers<D extends Drivers>(
23 drivers: D,
24 sinkProxies: SinkProxies<D>
25): Sources<D> {
26 const sources: Sources<D> = {} as Sources<D>;
27 for (const name in drivers) {
28 if (drivers.hasOwnProperty(name)) {
29 sources[name as any] = (drivers[name] as any)(sinkProxies[name], name);
30 if (sources[name as any] && typeof sources[name as any] === 'object') {
31 (sources[name as any] as DevToolEnabledSource)._isCycleSource = name;
32 }
33 }
34 }
35 return sources;
36}
37
38// NOTE: this will mutate `sources`.
39export function adaptSources<So>(sources: So): So {
40 for (const name in sources) {
41 if (
42 sources.hasOwnProperty(name) &&
43 sources[name] &&
44 typeof ((sources[name] as any) as Stream<any>).shamefullySendNext ===
45 'function'
46 ) {
47 sources[name] = adapt((sources[name] as any) as Stream<any>);
48 }
49 }
50 return sources;
51}
52
53/**
54 * Notice that we do not replicate 'complete' from real sinks, in
55 * SinksReplicators and ReplicationBuffers.
56 * Complete is triggered only on disposeReplication. See discussion in #425
57 * for details.
58 */
59type SinkReplicators<Si> = {
60 [P in keyof Si]: {
61 next(x: any): void;
62 _n?(x: any): void;
63 error(err: any): void;
64 _e?(err: any): void;
65 complete(): void;
66 };
67};
68
69type ReplicationBuffers<Si> = {
70 [P in keyof Si]: {
71 _n: Array<any>;
72 _e: Array<any>;
73 };
74};
75
76export function replicateMany<Si extends any>(
77 sinks: Si,
78 sinkProxies: SinkProxies<Si>
79): DisposeFunction {
80 const scheduleMicrotask = quicktask();
81
82 const sinkNames: Array<keyof Si> = Object.keys(sinks).filter(
83 name => !!sinkProxies[name]
84 );
85
86 let buffers: ReplicationBuffers<Si> = {} as ReplicationBuffers<Si>;
87 const replicators: SinkReplicators<Si> = {} as SinkReplicators<Si>;
88 sinkNames.forEach(name => {
89 buffers[name] = {_n: [], _e: []};
90 replicators[name] = {
91 next: (x: any) => buffers[name]._n.push(x),
92 error: (err: any) => buffers[name]._e.push(err),
93 complete: () => {},
94 };
95 });
96
97 const subscriptions = sinkNames.map(name =>
98 xs.fromObservable(sinks[name] as any).subscribe(replicators[name])
99 );
100
101 sinkNames.forEach(name => {
102 const listener = sinkProxies[name];
103 const next = (x: any) => {
104 scheduleMicrotask(() => listener._n(x));
105 };
106 const error = (err: any) => {
107 scheduleMicrotask(() => {
108 (console.error || console.log)(err);
109 listener._e(err);
110 });
111 };
112 buffers[name]._n.forEach(next);
113 buffers[name]._e.forEach(error);
114 replicators[name].next = next;
115 replicators[name].error = error;
116 // because sink.subscribe(replicator) had mutated replicator to add
117 // _n, _e, _c, we must also update these:
118 replicators[name]._n = next;
119 replicators[name]._e = error;
120 });
121 buffers = null as any; // free up for GC
122
123 return function disposeReplication() {
124 subscriptions.forEach(s => s.unsubscribe());
125 };
126}
127
128export function disposeSinkProxies<Si>(sinkProxies: SinkProxies<Si>) {
129 Object.keys(sinkProxies).forEach(name => sinkProxies[name]._c());
130}
131
132export function disposeSources<So>(sources: So) {
133 for (const k in sources) {
134 if (
135 sources.hasOwnProperty(k) &&
136 sources[k] &&
137 (sources[k] as any).dispose
138 ) {
139 (sources[k] as any).dispose();
140 }
141 }
142}
143
144export function isObjectEmpty(obj: any): boolean {
145 return Object.keys(obj).length === 0;
146}