UNPKG

8.07 kBPlain TextView Raw
1import xs, {Stream, MemoryStream} from 'xstream';
2import {adapt} from './adapt';
3import {
4 CycleProgram,
5 DevToolEnabledSource,
6 DisposeFunction,
7 Drivers,
8 SinkProxies,
9 Sources,
10 Sinks,
11 FantasySinks,
12} from './types';
13
14export {
15 FantasyObserver,
16 FantasySubscription,
17 FantasyObservable,
18 DevToolEnabledSource,
19 Sources,
20 Sinks,
21 SinkProxies,
22 FantasySinks,
23 Driver,
24 Drivers,
25 DisposeFunction,
26 CycleProgram,
27} from './types';
28
29function logToConsoleError(err: any) {
30 const target = err.stack || err;
31 if (console && console.error) {
32 console.error(target);
33 } else if (console && console.log) {
34 console.log(target);
35 }
36}
37
38function makeSinkProxies<So extends Sources, Si extends Sinks>(
39 drivers: Drivers<So, Si>,
40): SinkProxies<Si> {
41 const sinkProxies: SinkProxies<Si> = {} as SinkProxies<Si>;
42 for (const name in drivers) {
43 if (drivers.hasOwnProperty(name)) {
44 sinkProxies[name] = xs.createWithMemory<any>();
45 }
46 }
47 return sinkProxies;
48}
49
50function callDrivers<So extends Sources, Si extends Sinks>(
51 drivers: Drivers<So, Si>,
52 sinkProxies: SinkProxies<Si>,
53): So {
54 const sources: So = {} as So;
55 for (const name in drivers) {
56 if (drivers.hasOwnProperty(name)) {
57 sources[name as any] = drivers[name](sinkProxies[name], name);
58 if (sources[name as any] && typeof sources[name as any] === 'object') {
59 (sources[name as any] as DevToolEnabledSource)._isCycleSource = name;
60 }
61 }
62 }
63 return sources;
64}
65
66// NOTE: this will mutate `sources`.
67function adaptSources<So extends Sources>(sources: So): So {
68 for (const name in sources) {
69 if (
70 sources.hasOwnProperty(name) &&
71 sources[name] &&
72 typeof sources[name]['shamefullySendNext'] === 'function'
73 ) {
74 sources[name] = adapt((sources[name] as any) as Stream<any>);
75 }
76 }
77 return sources;
78}
79
80/**
81 * Notice that we do not replicate 'complete' from real sinks, in
82 * SinksReplicators and ReplicationBuffers.
83 * Complete is triggered only on disposeReplication. See discussion in #425
84 * for details.
85 */
86type SinkReplicators<Si extends Sinks> = {
87 [P in keyof Si]: {
88 next(x: any): void;
89 _n?(x: any): void;
90 error(err: any): void;
91 _e?(err: any): void;
92 complete(): void;
93 }
94};
95
96type ReplicationBuffers<Si extends Sinks> = {
97 [P in keyof Si]: {
98 _n: Array<any>;
99 _e: Array<any>;
100 }
101};
102
103function replicateMany<So extends Sources, Si extends Sinks>(
104 sinks: Si,
105 sinkProxies: SinkProxies<Si>,
106): DisposeFunction {
107 const sinkNames: Array<keyof Si> = Object.keys(sinks).filter(
108 name => !!sinkProxies[name],
109 );
110
111 let buffers: ReplicationBuffers<Si> = {} as ReplicationBuffers<Si>;
112 const replicators: SinkReplicators<Si> = {} as SinkReplicators<Si>;
113 sinkNames.forEach(name => {
114 buffers[name] = {_n: [], _e: []};
115 replicators[name] = {
116 next: (x: any) => buffers[name]._n.push(x),
117 error: (err: any) => buffers[name]._e.push(err),
118 complete: () => {},
119 };
120 });
121
122 const subscriptions = sinkNames.map(name =>
123 xs.fromObservable(sinks[name] as any).subscribe(replicators[name]),
124 );
125
126 sinkNames.forEach(name => {
127 const listener = sinkProxies[name];
128 const next = (x: any) => {
129 listener._n(x);
130 };
131 const error = (err: any) => {
132 logToConsoleError(err);
133 listener._e(err);
134 };
135 buffers[name]._n.forEach(next);
136 buffers[name]._e.forEach(error);
137 replicators[name].next = next;
138 replicators[name].error = error;
139 // because sink.subscribe(replicator) had mutated replicator to add
140 // _n, _e, _c, we must also update these:
141 replicators[name]._n = next;
142 replicators[name]._e = error;
143 });
144 buffers = null as any; // free up for GC
145
146 return function disposeReplication() {
147 subscriptions.forEach(s => s.unsubscribe());
148 sinkNames.forEach(name => sinkProxies[name]._c());
149 };
150}
151
152function disposeSources<So extends Sources>(sources: So) {
153 for (const k in sources) {
154 if (
155 sources.hasOwnProperty(k) &&
156 sources[k] &&
157 (sources[k] as any).dispose
158 ) {
159 (sources[k] as any).dispose();
160 }
161 }
162}
163
164function isObjectEmpty(obj: any): boolean {
165 return Object.keys(obj).length === 0;
166}
167
168/**
169 * A function that prepares the Cycle application to be executed. Takes a `main`
170 * function and prepares to circularly connects it to the given collection of
171 * driver functions. As an output, `setup()` returns an object with three
172 * properties: `sources`, `sinks` and `run`. Only when `run()` is called will
173 * the application actually execute. Refer to the documentation of `run()` for
174 * more details.
175 *
176 * **Example:**
177 * ```js
178 * import {setup} from '@cycle/run';
179 * const {sources, sinks, run} = setup(main, drivers);
180 * // ...
181 * const dispose = run(); // Executes the application
182 * // ...
183 * dispose();
184 * ```
185 *
186 * @param {Function} main a function that takes `sources` as input and outputs
187 * `sinks`.
188 * @param {Object} drivers an object where keys are driver names and values
189 * are driver functions.
190 * @return {Object} an object with three properties: `sources`, `sinks` and
191 * `run`. `sources` is the collection of driver sources, `sinks` is the
192 * collection of driver sinks, these can be used for debugging or testing. `run`
193 * is the function that once called will execute the application.
194 * @function setup
195 */
196export function setup<So extends Sources, Si extends FantasySinks<Si>>(
197 main: (sources: So) => Si,
198 drivers: Drivers<So, Si>,
199): CycleProgram<So, Si> {
200 if (typeof main !== `function`) {
201 throw new Error(
202 `First argument given to Cycle must be the 'main' ` + `function.`,
203 );
204 }
205 if (typeof drivers !== `object` || drivers === null) {
206 throw new Error(
207 `Second argument given to Cycle must be an object ` +
208 `with driver functions as properties.`,
209 );
210 }
211 if (isObjectEmpty(drivers)) {
212 throw new Error(
213 `Second argument given to Cycle must be an object ` +
214 `with at least one driver function declared as a property.`,
215 );
216 }
217
218 const sinkProxies = makeSinkProxies<So, Si>(drivers);
219 const sources = callDrivers<So, Si>(drivers, sinkProxies);
220 const adaptedSources = adaptSources(sources);
221 const sinks = main(adaptedSources);
222 if (typeof window !== 'undefined') {
223 (window as any).Cyclejs = (window as any).Cyclejs || {};
224 (window as any).Cyclejs.sinks = sinks;
225 }
226 function _run(): DisposeFunction {
227 const disposeReplication = replicateMany(sinks, sinkProxies);
228 return function dispose() {
229 disposeSources(sources);
230 disposeReplication();
231 };
232 }
233 return {sinks, sources, run: _run};
234}
235
236/**
237 * Takes a `main` function and circularly connects it to the given collection
238 * of driver functions.
239 *
240 * **Example:**
241 * ```js
242 * import run from '@cycle/run';
243 * const dispose = run(main, drivers);
244 * // ...
245 * dispose();
246 * ```
247 *
248 * The `main` function expects a collection of "source" streams (returned from
249 * drivers) as input, and should return a collection of "sink" streams (to be
250 * given to drivers). A "collection of streams" is a JavaScript object where
251 * keys match the driver names registered by the `drivers` object, and values
252 * are the streams. Refer to the documentation of each driver to see more
253 * details on what types of sources it outputs and sinks it receives.
254 *
255 * @param {Function} main a function that takes `sources` as input and outputs
256 * `sinks`.
257 * @param {Object} drivers an object where keys are driver names and values
258 * are driver functions.
259 * @return {Function} a dispose function, used to terminate the execution of the
260 * Cycle.js program, cleaning up resources used.
261 * @function run
262 */
263export function run<So extends Sources, Si extends FantasySinks<Si>>(
264 main: (sources: So) => Si,
265 drivers: Drivers<So, Si>,
266): DisposeFunction {
267 const program = setup(main, drivers);
268 if (
269 typeof window !== 'undefined' &&
270 window['CyclejsDevTool_startGraphSerializer']
271 ) {
272 window['CyclejsDevTool_startGraphSerializer'](program.sinks);
273 }
274 return program.run();
275}
276
277export default run;