1 | import xs, {Stream} from 'xstream';
|
2 | import quicktask from 'quicktask';
|
3 | import {adapt} from './adapt';
|
4 | import {
|
5 | DevToolEnabledSource,
|
6 | DisposeFunction,
|
7 | Drivers,
|
8 | SinkProxies,
|
9 | Sources,
|
10 | } from './types';
|
11 |
|
12 | const scheduleMicrotask = quicktask();
|
13 |
|
14 | export function makeSinkProxies<D extends Drivers>(drivers: D): SinkProxies<D> {
|
15 | const sinkProxies: SinkProxies<D> = {} as SinkProxies<D>;
|
16 | for (const name in drivers) {
|
17 | if (drivers.hasOwnProperty(name)) {
|
18 | sinkProxies[name] = xs.create<any>();
|
19 | }
|
20 | }
|
21 | return sinkProxies;
|
22 | }
|
23 |
|
24 | export function callDrivers<D extends Drivers>(
|
25 | drivers: D,
|
26 | sinkProxies: SinkProxies<D>
|
27 | ): Sources<D> {
|
28 | const sources: Sources<D> = {} as Sources<D>;
|
29 | for (const name in drivers) {
|
30 | if (drivers.hasOwnProperty(name)) {
|
31 | sources[name as any] = (drivers[name] as any)(sinkProxies[name], name);
|
32 | if (sources[name as any] && typeof sources[name as any] === 'object') {
|
33 | (sources[name as any] as DevToolEnabledSource)._isCycleSource = name;
|
34 | }
|
35 | }
|
36 | }
|
37 | return sources;
|
38 | }
|
39 |
|
40 |
|
41 | export function adaptSources<So>(sources: So): So {
|
42 | for (const name in sources) {
|
43 | if (
|
44 | sources.hasOwnProperty(name) &&
|
45 | sources[name] &&
|
46 | typeof ((sources[name] as any) as Stream<any>).shamefullySendNext ===
|
47 | 'function'
|
48 | ) {
|
49 | sources[name] = adapt((sources[name] as any) as Stream<any>);
|
50 | }
|
51 | }
|
52 | return sources;
|
53 | }
|
54 |
|
55 |
|
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 | type SinkReplicators<Si> = {
|
62 | [P in keyof Si]: {
|
63 | next(x: any): void;
|
64 | _n?(x: any): void;
|
65 | error(err: any): void;
|
66 | _e?(err: any): void;
|
67 | complete(): void;
|
68 | }
|
69 | };
|
70 |
|
71 | type ReplicationBuffers<Si> = {
|
72 | [P in keyof Si]: {
|
73 | _n: Array<any>;
|
74 | _e: Array<any>;
|
75 | }
|
76 | };
|
77 |
|
78 | export function replicateMany<Si extends any>(
|
79 | sinks: Si,
|
80 | sinkProxies: SinkProxies<Si>
|
81 | ): DisposeFunction {
|
82 | const sinkNames: Array<keyof Si> = Object.keys(sinks).filter(
|
83 | name => !!sinkProxies[name]
|
84 | );
|
85 |
|
86 | let buffers: ReplicationBuffers<Si> = {} as ReplicationBuffers<Si>;
|
87 | const replicators: SinkReplicators<Si> = {} as SinkReplicators<Si>;
|
88 | sinkNames.forEach(name => {
|
89 | buffers[name] = {_n: [], _e: []};
|
90 | replicators[name] = {
|
91 | next: (x: any) => buffers[name]._n.push(x),
|
92 | error: (err: any) => buffers[name]._e.push(err),
|
93 | complete: () => {},
|
94 | };
|
95 | });
|
96 |
|
97 | const subscriptions = sinkNames.map(name =>
|
98 | xs.fromObservable(sinks[name] as any).subscribe(replicators[name])
|
99 | );
|
100 |
|
101 | sinkNames.forEach(name => {
|
102 | const listener = sinkProxies[name];
|
103 | const next = (x: any) => {
|
104 | scheduleMicrotask(() => listener._n(x));
|
105 | };
|
106 | const error = (err: any) => {
|
107 | scheduleMicrotask(() => {
|
108 | (console.error || console.log)(err);
|
109 | listener._e(err);
|
110 | });
|
111 | };
|
112 | buffers[name]._n.forEach(next);
|
113 | buffers[name]._e.forEach(error);
|
114 | replicators[name].next = next;
|
115 | replicators[name].error = error;
|
116 |
|
117 |
|
118 | replicators[name]._n = next;
|
119 | replicators[name]._e = error;
|
120 | });
|
121 | buffers = null as any;
|
122 |
|
123 | return function disposeReplication() {
|
124 | subscriptions.forEach(s => s.unsubscribe());
|
125 | };
|
126 | }
|
127 |
|
128 | export function disposeSinkProxies<Si>(sinkProxies: SinkProxies<Si>) {
|
129 | Object.keys(sinkProxies).forEach(name => sinkProxies[name]._c());
|
130 | }
|
131 |
|
132 | export function disposeSources<So>(sources: So) {
|
133 | for (const k in sources) {
|
134 | if (
|
135 | sources.hasOwnProperty(k) &&
|
136 | sources[k] &&
|
137 | (sources[k] as any).dispose
|
138 | ) {
|
139 | (sources[k] as any).dispose();
|
140 | }
|
141 | }
|
142 | }
|
143 |
|
144 | export function isObjectEmpty(obj: any): boolean {
|
145 | return Object.keys(obj).length === 0;
|
146 | }
|