UNPKG

23.6 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 var Observable = Rx.Observable,
35 observableProto = Observable.prototype,
36 AnonymousObservable = Rx.AnonymousObservable,
37 ObservableBase = Rx.ObservableBase,
38 Subject = Rx.Subject,
39 AsyncSubject = Rx.AsyncSubject,
40 Observer = Rx.Observer,
41 ScheduledObserver = Rx.internals.ScheduledObserver,
42 disposableCreate = Rx.Disposable.create,
43 disposableEmpty = Rx.Disposable.empty,
44 BinaryDisposable = Rx.BinaryDisposable,
45 currentThreadScheduler = Rx.Scheduler.currentThread,
46 isFunction = Rx.helpers.isFunction,
47 inherits = Rx.internals.inherits,
48 addProperties = Rx.internals.addProperties,
49 checkDisposed = Rx.Disposable.checkDisposed;
50
51 // Utilities
52 function cloneArray(arr) {
53 var len = arr.length, a = new Array(len);
54 for(var i = 0; i < len; i++) { a[i] = arr[i]; }
55 return a;
56 }
57
58 var MulticastObservable = (function (__super__) {
59 inherits(MulticastObservable, __super__);
60 function MulticastObservable(source, fn1, fn2) {
61 this.source = source;
62 this._fn1 = fn1;
63 this._fn2 = fn2;
64 __super__.call(this);
65 }
66
67 MulticastObservable.prototype.subscribeCore = function (o) {
68 var connectable = this.source.multicast(this._fn1());
69 return new BinaryDisposable(this._fn2(connectable).subscribe(o), connectable.connect());
70 };
71
72 return MulticastObservable;
73 }(ObservableBase));
74
75 /**
76 * Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each
77 * subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's
78 * invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
79 *
80 * @example
81 * 1 - res = source.multicast(observable);
82 * 2 - res = source.multicast(function () { return new Subject(); }, function (x) { return x; });
83 *
84 * @param {Function|Subject} subjectOrSubjectSelector
85 * Factory function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function.
86 * Or:
87 * Subject to push source elements into.
88 *
89 * @param {Function} [selector] Optional selector function which can use the multicasted source sequence subject to the policies enforced by the created subject. Specified only if <paramref name="subjectOrSubjectSelector" is a factory function.
90 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
91 */
92 observableProto.multicast = function (subjectOrSubjectSelector, selector) {
93 return isFunction(subjectOrSubjectSelector) ?
94 new MulticastObservable(this, subjectOrSubjectSelector, selector) :
95 new ConnectableObservable(this, subjectOrSubjectSelector);
96 };
97
98 /**
99 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence.
100 * This operator is a specialization of Multicast using a regular Subject.
101 *
102 * @example
103 * var resres = source.publish();
104 * var res = source.publish(function (x) { return x; });
105 *
106 * @param {Function} [selector] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on.
107 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
108 */
109 observableProto.publish = function (selector) {
110 return selector && isFunction(selector) ?
111 this.multicast(function () { return new Subject(); }, selector) :
112 this.multicast(new Subject());
113 };
114
115 /**
116 * Returns an observable sequence that shares a single subscription to the underlying sequence.
117 * This operator is a specialization of publish which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
118 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
119 */
120 observableProto.share = function () {
121 return this.publish().refCount();
122 };
123
124 /**
125 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification.
126 * This operator is a specialization of Multicast using a AsyncSubject.
127 *
128 * @example
129 * var res = source.publishLast();
130 * var res = source.publishLast(function (x) { return x; });
131 *
132 * @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will only receive the last notification of the source.
133 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
134 */
135 observableProto.publishLast = function (selector) {
136 return selector && isFunction(selector) ?
137 this.multicast(function () { return new AsyncSubject(); }, selector) :
138 this.multicast(new AsyncSubject());
139 };
140
141 /**
142 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
143 * This operator is a specialization of Multicast using a BehaviorSubject.
144 *
145 * @example
146 * var res = source.publishValue(42);
147 * var res = source.publishValue(function (x) { return x.select(function (y) { return y * y; }) }, 42);
148 *
149 * @param {Function} [selector] Optional selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive immediately receive the initial value, followed by all notifications of the source from the time of the subscription on.
150 * @param {Mixed} initialValue Initial value received by observers upon subscription.
151 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
152 */
153 observableProto.publishValue = function (initialValueOrSelector, initialValue) {
154 return arguments.length === 2 ?
155 this.multicast(function () {
156 return new BehaviorSubject(initialValue);
157 }, initialValueOrSelector) :
158 this.multicast(new BehaviorSubject(initialValueOrSelector));
159 };
160
161 /**
162 * Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an initialValue.
163 * This operator is a specialization of publishValue which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
164 * @param {Mixed} initialValue Initial value received by observers upon subscription.
165 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
166 */
167 observableProto.shareValue = function (initialValue) {
168 return this.publishValue(initialValue).refCount();
169 };
170
171 /**
172 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
173 * This operator is a specialization of Multicast using a ReplaySubject.
174 *
175 * @example
176 * var res = source.replay(null, 3);
177 * var res = source.replay(null, 3, 500);
178 * var res = source.replay(null, 3, 500, scheduler);
179 * var res = source.replay(function (x) { return x.take(6).repeat(); }, 3, 500, scheduler);
180 *
181 * @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
182 * @param bufferSize [Optional] Maximum element count of the replay buffer.
183 * @param windowSize [Optional] Maximum time length of the replay buffer.
184 * @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
185 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
186 */
187 observableProto.replay = function (selector, bufferSize, windowSize, scheduler) {
188 return selector && isFunction(selector) ?
189 this.multicast(function () { return new ReplaySubject(bufferSize, windowSize, scheduler); }, selector) :
190 this.multicast(new ReplaySubject(bufferSize, windowSize, scheduler));
191 };
192
193 /**
194 * Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
195 * This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
196 *
197 * @example
198 * var res = source.shareReplay(3);
199 * var res = source.shareReplay(3, 500);
200 * var res = source.shareReplay(3, 500, scheduler);
201 *
202
203 * @param bufferSize [Optional] Maximum element count of the replay buffer.
204 * @param window [Optional] Maximum time length of the replay buffer.
205 * @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
206 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
207 */
208 observableProto.shareReplay = function (bufferSize, windowSize, scheduler) {
209 return this.replay(null, bufferSize, windowSize, scheduler).refCount();
210 };
211
212 var InnerSubscription = function (s, o) {
213 this._s = s;
214 this._o = o;
215 };
216
217 InnerSubscription.prototype.dispose = function () {
218 if (!this._s.isDisposed && this._o !== null) {
219 var idx = this._s.observers.indexOf(this._o);
220 this._s.observers.splice(idx, 1);
221 this._o = null;
222 }
223 };
224
225 /**
226 * Represents a value that changes over time.
227 * Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
228 */
229 var BehaviorSubject = Rx.BehaviorSubject = (function (__super__) {
230 inherits(BehaviorSubject, __super__);
231 function BehaviorSubject(value) {
232 __super__.call(this);
233 this.value = value;
234 this.observers = [];
235 this.isDisposed = false;
236 this.isStopped = false;
237 this.hasError = false;
238 }
239
240 addProperties(BehaviorSubject.prototype, Observer.prototype, {
241 _subscribe: function (o) {
242 checkDisposed(this);
243 if (!this.isStopped) {
244 this.observers.push(o);
245 o.onNext(this.value);
246 return new InnerSubscription(this, o);
247 }
248 if (this.hasError) {
249 o.onError(this.error);
250 } else {
251 o.onCompleted();
252 }
253 return disposableEmpty;
254 },
255 /**
256 * Gets the current value or throws an exception.
257 * Value is frozen after onCompleted is called.
258 * After onError is called always throws the specified exception.
259 * An exception is always thrown after dispose is called.
260 * @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext.
261 */
262 getValue: function () {
263 checkDisposed(this);
264 if (this.hasError) { thrower(this.error); }
265 return this.value;
266 },
267 /**
268 * Indicates whether the subject has observers subscribed to it.
269 * @returns {Boolean} Indicates whether the subject has observers subscribed to it.
270 */
271 hasObservers: function () { return this.observers.length > 0; },
272 /**
273 * Notifies all subscribed observers about the end of the sequence.
274 */
275 onCompleted: function () {
276 checkDisposed(this);
277 if (this.isStopped) { return; }
278 this.isStopped = true;
279 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
280 os[i].onCompleted();
281 }
282
283 this.observers.length = 0;
284 },
285 /**
286 * Notifies all subscribed observers about the exception.
287 * @param {Mixed} error The exception to send to all observers.
288 */
289 onError: function (error) {
290 checkDisposed(this);
291 if (this.isStopped) { return; }
292 this.isStopped = true;
293 this.hasError = true;
294 this.error = error;
295
296 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
297 os[i].onError(error);
298 }
299
300 this.observers.length = 0;
301 },
302 /**
303 * Notifies all subscribed observers about the arrival of the specified element in the sequence.
304 * @param {Mixed} value The value to send to all observers.
305 */
306 onNext: function (value) {
307 checkDisposed(this);
308 if (this.isStopped) { return; }
309 this.value = value;
310 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
311 os[i].onNext(value);
312 }
313 },
314 /**
315 * Unsubscribe all observers and release resources.
316 */
317 dispose: function () {
318 this.isDisposed = true;
319 this.observers = null;
320 this.value = null;
321 this.error = null;
322 }
323 });
324
325 return BehaviorSubject;
326 }(Observable));
327
328 /**
329 * Represents an object that is both an observable sequence as well as an observer.
330 * Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
331 */
332 var ReplaySubject = Rx.ReplaySubject = (function (__super__) {
333
334 var maxSafeInteger = Math.pow(2, 53) - 1;
335
336 function createRemovableDisposable(subject, observer) {
337 return disposableCreate(function () {
338 observer.dispose();
339 !subject.isDisposed && subject.observers.splice(subject.observers.indexOf(observer), 1);
340 });
341 }
342
343 inherits(ReplaySubject, __super__);
344
345 /**
346 * Initializes a new instance of the ReplaySubject class with the specified buffer size, window size and scheduler.
347 * @param {Number} [bufferSize] Maximum element count of the replay buffer.
348 * @param {Number} [windowSize] Maximum time length of the replay buffer.
349 * @param {Scheduler} [scheduler] Scheduler the observers are invoked on.
350 */
351 function ReplaySubject(bufferSize, windowSize, scheduler) {
352 this.bufferSize = bufferSize == null ? maxSafeInteger : bufferSize;
353 this.windowSize = windowSize == null ? maxSafeInteger : windowSize;
354 this.scheduler = scheduler || currentThreadScheduler;
355 this.q = [];
356 this.observers = [];
357 this.isStopped = false;
358 this.isDisposed = false;
359 this.hasError = false;
360 this.error = null;
361 __super__.call(this);
362 }
363
364 addProperties(ReplaySubject.prototype, Observer.prototype, {
365 _subscribe: function (o) {
366 checkDisposed(this);
367 var so = new ScheduledObserver(this.scheduler, o), subscription = createRemovableDisposable(this, so);
368
369 this._trim(this.scheduler.now());
370 this.observers.push(so);
371
372 for (var i = 0, len = this.q.length; i < len; i++) {
373 so.onNext(this.q[i].value);
374 }
375
376 if (this.hasError) {
377 so.onError(this.error);
378 } else if (this.isStopped) {
379 so.onCompleted();
380 }
381
382 so.ensureActive();
383 return subscription;
384 },
385 /**
386 * Indicates whether the subject has observers subscribed to it.
387 * @returns {Boolean} Indicates whether the subject has observers subscribed to it.
388 */
389 hasObservers: function () {
390 return this.observers.length > 0;
391 },
392 _trim: function (now) {
393 while (this.q.length > this.bufferSize) {
394 this.q.shift();
395 }
396 while (this.q.length > 0 && (now - this.q[0].interval) > this.windowSize) {
397 this.q.shift();
398 }
399 },
400 /**
401 * Notifies all subscribed observers about the arrival of the specified element in the sequence.
402 * @param {Mixed} value The value to send to all observers.
403 */
404 onNext: function (value) {
405 checkDisposed(this);
406 if (this.isStopped) { return; }
407 var now = this.scheduler.now();
408 this.q.push({ interval: now, value: value });
409 this._trim(now);
410
411 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
412 var observer = os[i];
413 observer.onNext(value);
414 observer.ensureActive();
415 }
416 },
417 /**
418 * Notifies all subscribed observers about the exception.
419 * @param {Mixed} error The exception to send to all observers.
420 */
421 onError: function (error) {
422 checkDisposed(this);
423 if (this.isStopped) { return; }
424 this.isStopped = true;
425 this.error = error;
426 this.hasError = true;
427 var now = this.scheduler.now();
428 this._trim(now);
429 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
430 var observer = os[i];
431 observer.onError(error);
432 observer.ensureActive();
433 }
434 this.observers.length = 0;
435 },
436 /**
437 * Notifies all subscribed observers about the end of the sequence.
438 */
439 onCompleted: function () {
440 checkDisposed(this);
441 if (this.isStopped) { return; }
442 this.isStopped = true;
443 var now = this.scheduler.now();
444 this._trim(now);
445 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
446 var observer = os[i];
447 observer.onCompleted();
448 observer.ensureActive();
449 }
450 this.observers.length = 0;
451 },
452 /**
453 * Unsubscribe all observers and release resources.
454 */
455 dispose: function () {
456 this.isDisposed = true;
457 this.observers = null;
458 }
459 });
460
461 return ReplaySubject;
462 }(Observable));
463
464 var RefCountObservable = (function (__super__) {
465 inherits(RefCountObservable, __super__);
466 function RefCountObservable(source) {
467 this.source = source;
468 this._count = 0;
469 this._connectableSubscription = null;
470 __super__.call(this);
471 }
472
473 RefCountObservable.prototype.subscribeCore = function (o) {
474 var subscription = this.source.subscribe(o);
475 ++this._count === 1 && (this._connectableSubscription = this.source.connect());
476 return new RefCountDisposable(this, subscription);
477 };
478
479 function RefCountDisposable(p, s) {
480 this._p = p;
481 this._s = s;
482 this.isDisposed = false;
483 }
484
485 RefCountDisposable.prototype.dispose = function () {
486 if (!this.isDisposed) {
487 this.isDisposed = true;
488 this._s.dispose();
489 --this._p._count === 0 && this._p._connectableSubscription.dispose();
490 }
491 };
492
493 return RefCountObservable;
494 }(ObservableBase));
495
496 var ConnectableObservable = Rx.ConnectableObservable = (function (__super__) {
497 inherits(ConnectableObservable, __super__);
498 function ConnectableObservable(source, subject) {
499 this.source = source;
500 this._connection = null;
501 this._source = source.asObservable();
502 this._subject = subject;
503 __super__.call(this);
504 }
505
506 function ConnectDisposable(parent, subscription) {
507 this._p = parent;
508 this._s = subscription;
509 }
510
511 ConnectDisposable.prototype.dispose = function () {
512 if (this._s) {
513 this._s.dispose();
514 this._s = null;
515 this._p._connection = null;
516 }
517 };
518
519 ConnectableObservable.prototype.connect = function () {
520 if (!this._connection) {
521 var subscription = this._source.subscribe(this._subject);
522 this._connection = new ConnectDisposable(this, subscription);
523 }
524 return this._connection;
525 };
526
527 ConnectableObservable.prototype._subscribe = function (o) {
528 return this._subject.subscribe(o);
529 };
530
531 ConnectableObservable.prototype.refCount = function () {
532 return new RefCountObservable(this);
533 };
534
535 return ConnectableObservable;
536 }(Observable));
537
538 /**
539 * Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
540 * can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`)
541 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source.
542 */
543 observableProto.singleInstance = function() {
544 var source = this, hasObservable = false, observable;
545
546 function getObservable() {
547 if (!hasObservable) {
548 hasObservable = true;
549 observable = source['finally'](function() { hasObservable = false; }).publish().refCount();
550 }
551 return observable;
552 }
553
554 return new AnonymousObservable(function(o) {
555 return getObservable().subscribe(o);
556 });
557 };
558
559 return Rx;
560}));