1 | import { ConnectableObservable, FactoryOrValue, MonoTypeOperatorFunction, OperatorFunction, Observable, Subject} from 'rxjs';
|
2 | import { multicast as higherOrder } from 'rxjs/operators';
|
3 |
|
4 | /* tslint:disable:max-line-length */
|
5 | export function multicast<T>(this: Observable<T>, subjectOrSubjectFactory: FactoryOrValue<Subject<T>>): ConnectableObservable<T>;
|
6 | export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>): ConnectableObservable<T>;
|
7 | export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: MonoTypeOperatorFunction<T>): Observable<T>;
|
8 | export function multicast<T, R>(SubjectFactory: (this: Observable<T>) => Subject<T>): ConnectableObservable<R>;
|
9 | export function multicast<T, R>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: OperatorFunction<T, R>): Observable<R>;
|
10 | /* tslint:enable:max-line-length */
|
11 |
|
12 | /**
|
13 | * Allows source Observable to be subscribed only once with a Subject of choice,
|
14 | * while still sharing its values between multiple subscribers.
|
15 | *
|
16 | * <span class="informal">Subscribe to Observable once, but send its values to multiple subscribers.</span>
|
17 | *
|
18 | * <img src="./img/multicast.png" width="100%">
|
19 | *
|
20 | * `multicast` is an operator that works in two modes.
|
21 | *
|
22 | * In the first mode you provide a single argument to it, which can be either an initialized Subject or a Subject
|
23 | * factory. As a result you will get a special kind of an Observable - a {@link ConnectableObservable}. It can be
|
24 | * subscribed multiple times, just as regular Observable, but it won't subscribe to the source Observable at that
|
25 | * moment. It will do it only if you call its `connect` method. This means you can essentially control by hand, when
|
26 | * source Observable will be actually subscribed. What is more, ConnectableObservable will share this one subscription
|
27 | * between all of its subscribers. This means that, for example, `ajax` Observable will only send a request once,
|
28 | * even though usually it would send a request per every subscriber. Since it sends a request at the moment of
|
29 | * subscription, here request would be sent when the `connect` method of a ConnectableObservable is called.
|
30 | *
|
31 | * The most common pattern of using ConnectableObservable is calling `connect` when the first consumer subscribes,
|
32 | * keeping the subscription alive while several consumers come and go and finally unsubscribing from the source
|
33 | * Observable, when the last consumer unsubscribes. To not implement that logic over and over again,
|
34 | * ConnectableObservable has a special operator, `refCount`. When called, it returns an Observable, which will count
|
35 | * the number of consumers subscribed to it and keep ConnectableObservable connected as long as there is at least
|
36 | * one consumer. So if you don't actually need to decide yourself when to connect and disconnect a
|
37 | * ConnectableObservable, use `refCount`.
|
38 | *
|
39 | * The second mode is invoked by calling `multicast` with an additional, second argument - selector function.
|
40 | * This function accepts an Observable - which basically mirrors the source Observable - and returns Observable
|
41 | * as well, which should be the input stream modified by any operators you want. Note that in this
|
42 | * mode you cannot provide initialized Subject as a first argument - it has to be a Subject factory. If
|
43 | * you provide selector function, `multicast` returns just a regular Observable, instead of ConnectableObservable.
|
44 | * Thus, as usual, each subscription to this stream triggers subscription to the source Observable. However,
|
45 | * if inside the selector function you subscribe to the input Observable multiple times, actual source stream
|
46 | * will be subscribed only once. So if you have a chain of operators that use some Observable many times,
|
47 | * but you want to subscribe to that Observable only once, this is the mode you would use.
|
48 | *
|
49 | * Subject provided as a first parameter of `multicast` is used as a proxy for the single subscription to the
|
50 | * source Observable. It means that all values from the source stream go through that Subject. Thus, if a Subject
|
51 | * has some special properties, Observable returned by `multicast` will have them as well. If you want to use
|
52 | * `multicast` with a Subject that is one of the ones included in RxJS by default - {@link Subject},
|
53 | * {@link AsyncSubject}, {@link BehaviorSubject}, or {@link ReplaySubject} - simply use {@link publish},
|
54 | * {@link publishLast}, {@link publishBehavior} or {@link publishReplay} respectively. These are actually
|
55 | * just wrappers around `multicast`, with a specific Subject hardcoded inside.
|
56 | *
|
57 | * Also, if you use {@link publish} or {@link publishReplay} with a ConnectableObservables `refCount` operator,
|
58 | * you can simply use {@link share} and {@link shareReplay} respectively, which chain these two.
|
59 | *
|
60 | * @example <caption>Use ConnectableObservable</caption>
|
61 | * const seconds = Rx.Observable.interval(1000);
|
62 | * const connectableSeconds = seconds.multicast(new Subject());
|
63 | *
|
64 | * connectableSeconds.subscribe(value => console.log('first: ' + value));
|
65 | * connectableSeconds.subscribe(value => console.log('second: ' + value));
|
66 | *
|
67 | * // At this point still nothing happens, even though we subscribed twice.
|
68 | *
|
69 | * connectableSeconds.connect();
|
70 | *
|
71 | * // From now on `seconds` are being logged to the console,
|
72 | * // twice per every second. `seconds` Observable was however only subscribed once,
|
73 | * // so under the hood Observable.interval had only one clock started.
|
74 | *
|
75 | * @example <caption>Use selector</caption>
|
76 | * const seconds = Rx.Observable.interval(1000);
|
77 | *
|
78 | * seconds
|
79 | * .multicast(
|
80 | * () => new Subject(),
|
81 | * seconds => seconds.zip(seconds) // Usually zip would subscribe to `seconds` twice.
|
82 | * // Because we are inside selector, `seconds` is subscribed once,
|
83 | * ) // thus starting only one clock used internally by Observable.interval.
|
84 | * .subscribe();
|
85 | *
|
86 | * @see {@link publish}
|
87 | * @see {@link publishLast}
|
88 | * @see {@link publishBehavior}
|
89 | * @see {@link publishReplay}
|
90 | * @see {@link share}
|
91 | * @see {@link shareReplay}
|
92 | *
|
93 | * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate Subject through
|
94 | * which the source sequence's elements will be multicast to the selector function input Observable or
|
95 | * ConnectableObservable returned by the operator.
|
96 | * @param {Function} [selector] - Optional selector function that can use the input stream
|
97 | * as many times as needed, without causing multiple subscriptions to the source stream.
|
98 | * Subscribers to the input source will receive all notifications of the source from the
|
99 | * time of the subscription forward.
|
100 | * @return {Observable<T>|ConnectableObservable<T>} An Observable that emits the results of invoking the selector
|
101 | * on the source stream or a special {@link ConnectableObservable}, if selector was not provided.
|
102 | *
|
103 | * @method multicast
|
104 | * @owner Observable
|
105 | */
|
106 | export function multicast<T, R>(this: Observable<T>, subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
|
107 | selector?: (source: Observable<T>) => Observable<R>): ConnectableObservable<R> | Observable<R> {
|
108 | return higherOrder(<any>subjectOrSubjectFactory, selector)(this);
|
109 | }
|