UNPKG

3.27 kBJavaScriptView Raw
1import { Subject, from, queueScheduler } from 'rxjs';
2import { map, mergeMap, observeOn, subscribeOn } from 'rxjs/operators';
3import { StateObservable } from './StateObservable';
4import { warn } from './utils/console';
5export function createEpicMiddleware(options = {}) {
6 // This isn't great. RxJS doesn't publicly export the constructor for
7 // QueueScheduler nor QueueAction, so we reach in. We need to do this because
8 // we don't want our internal queuing mechanism to be on the same queue as any
9 // other RxJS code outside of redux-observable internals.
10 const QueueScheduler = queueScheduler.constructor;
11 const uniqueQueueScheduler = new QueueScheduler(queueScheduler.schedulerActionCtor);
12 if (process.env.NODE_ENV !== 'production' && typeof options === 'function') {
13 throw new TypeError('Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware');
14 }
15 const epic$ = new Subject();
16 let store;
17 const epicMiddleware = _store => {
18 if (process.env.NODE_ENV !== 'production' && store) {
19 // https://github.com/redux-observable/redux-observable/issues/389
20 warn('this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da');
21 }
22 store = _store;
23 const actionSubject$ = new Subject();
24 const stateSubject$ = new Subject();
25 const action$ = actionSubject$
26 .asObservable()
27 .pipe(observeOn(uniqueQueueScheduler));
28 const state$ = new StateObservable(stateSubject$.pipe(observeOn(uniqueQueueScheduler)), store.getState());
29 const result$ = epic$.pipe(map(epic => {
30 const output$ = epic(action$, state$, options.dependencies);
31 if (!output$) {
32 throw new TypeError(`Your root Epic "${epic.name ||
33 '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`);
34 }
35 return output$;
36 }), mergeMap(output$ => from(output$).pipe(subscribeOn(uniqueQueueScheduler), observeOn(uniqueQueueScheduler))));
37 result$.subscribe(store.dispatch);
38 return next => {
39 return action => {
40 // Downstream middleware gets the action first,
41 // which includes their reducers, so state is
42 // updated before epics receive the action
43 const result = next(action);
44 // It's important to update the state$ before we emit
45 // the action because otherwise it would be stale
46 stateSubject$.next(store.getState());
47 actionSubject$.next(action);
48 return result;
49 };
50 };
51 };
52 epicMiddleware.run = rootEpic => {
53 if (process.env.NODE_ENV !== 'production' && !store) {
54 warn('epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.');
55 }
56 epic$.next(rootEpic);
57 };
58 return epicMiddleware;
59}