UNPKG

32.6 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx.core'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx.core'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 var Observable = Rx.Observable,
35 observableProto = Observable.prototype,
36 AnonymousObservable = Rx.AnonymousObservable,
37 ObservableBase = Rx.ObservableBase,
38 Observer = Rx.Observer,
39 AbstractObserver = Rx.internals.AbstractObserver,
40 disposableCreate = Rx.Disposable.create,
41 disposableEmpty = Rx.Disposable.empty,
42 CompositeDisposable = Rx.CompositeDisposable,
43 BinaryDisposable = Rx.BinaryDisposable,
44 SerialDisposable = Rx.SerialDisposable,
45 currentThreadScheduler = Rx.Scheduler.currentThread,
46 isFunction = Rx.helpers.isFunction,
47 inherits = Rx.internals.inherits,
48 addProperties = Rx.internals.addProperties,
49 checkDisposed = Rx.Disposable.checkDisposed;
50
51 // Utilities
52 function cloneArray(arr) {
53 var len = arr.length, a = new Array(len);
54 for(var i = 0; i < len; i++) { a[i] = arr[i]; }
55 return a;
56 }
57
58 var errorObj = {e: {}};
59
60 function tryCatcherGen(tryCatchTarget) {
61 return function tryCatcher() {
62 try {
63 return tryCatchTarget.apply(this, arguments);
64 } catch (e) {
65 errorObj.e = e;
66 return errorObj;
67 }
68 };
69 }
70
71 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
72 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
73 return tryCatcherGen(fn);
74 };
75
76 function thrower(e) {
77 throw e;
78 }
79
80 var MulticastObservable = (function (__super__) {
81 inherits(MulticastObservable, __super__);
82 function MulticastObservable(source, fn1, fn2) {
83 this.source = source;
84 this._fn1 = fn1;
85 this._fn2 = fn2;
86 __super__.call(this);
87 }
88
89 MulticastObservable.prototype.subscribeCore = function (o) {
90 var connectable = this.source.multicast(this._fn1());
91 return new BinaryDisposable(this._fn2(connectable).subscribe(o), connectable.connect());
92 };
93
94 return MulticastObservable;
95 }(ObservableBase));
96
97 /**
98 * Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each
99 * subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's
100 * invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
101 *
102 * @example
103 * 1 - res = source.multicast(observable);
104 * 2 - res = source.multicast(function () { return new Subject(); }, function (x) { return x; });
105 *
106 * @param {Function|Subject} subjectOrSubjectSelector
107 * Factory function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function.
108 * Or:
109 * Subject to push source elements into.
110 *
111 * @param {Function} [selector] Optional selector function which can use the multicasted source sequence subject to the policies enforced by the created subject. Specified only if <paramref name="subjectOrSubjectSelector" is a factory function.
112 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
113 */
114 observableProto.multicast = function (subjectOrSubjectSelector, selector) {
115 return isFunction(subjectOrSubjectSelector) ?
116 new MulticastObservable(this, subjectOrSubjectSelector, selector) :
117 new ConnectableObservable(this, subjectOrSubjectSelector);
118 };
119
120 /**
121 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence.
122 * This operator is a specialization of Multicast using a regular Subject.
123 *
124 * @example
125 * var resres = source.publish();
126 * var res = source.publish(function (x) { return x; });
127 *
128 * @param {Function} [selector] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on.
129 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
130 */
131 observableProto.publish = function (selector) {
132 return selector && isFunction(selector) ?
133 this.multicast(function () { return new Subject(); }, selector) :
134 this.multicast(new Subject());
135 };
136
137 /**
138 * Returns an observable sequence that shares a single subscription to the underlying sequence.
139 * This operator is a specialization of publish which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
140 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
141 */
142 observableProto.share = function () {
143 return this.publish().refCount();
144 };
145
146 /**
147 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification.
148 * This operator is a specialization of Multicast using a AsyncSubject.
149 *
150 * @example
151 * var res = source.publishLast();
152 * var res = source.publishLast(function (x) { return x; });
153 *
154 * @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will only receive the last notification of the source.
155 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
156 */
157 observableProto.publishLast = function (selector) {
158 return selector && isFunction(selector) ?
159 this.multicast(function () { return new AsyncSubject(); }, selector) :
160 this.multicast(new AsyncSubject());
161 };
162
163 /**
164 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
165 * This operator is a specialization of Multicast using a BehaviorSubject.
166 *
167 * @example
168 * var res = source.publishValue(42);
169 * var res = source.publishValue(function (x) { return x.select(function (y) { return y * y; }) }, 42);
170 *
171 * @param {Function} [selector] Optional selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive immediately receive the initial value, followed by all notifications of the source from the time of the subscription on.
172 * @param {Mixed} initialValue Initial value received by observers upon subscription.
173 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
174 */
175 observableProto.publishValue = function (initialValueOrSelector, initialValue) {
176 return arguments.length === 2 ?
177 this.multicast(function () {
178 return new BehaviorSubject(initialValue);
179 }, initialValueOrSelector) :
180 this.multicast(new BehaviorSubject(initialValueOrSelector));
181 };
182
183 /**
184 * Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an initialValue.
185 * This operator is a specialization of publishValue which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
186 * @param {Mixed} initialValue Initial value received by observers upon subscription.
187 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
188 */
189 observableProto.shareValue = function (initialValue) {
190 return this.publishValue(initialValue).refCount();
191 };
192
193 /**
194 * Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
195 * This operator is a specialization of Multicast using a ReplaySubject.
196 *
197 * @example
198 * var res = source.replay(null, 3);
199 * var res = source.replay(null, 3, 500);
200 * var res = source.replay(null, 3, 500, scheduler);
201 * var res = source.replay(function (x) { return x.take(6).repeat(); }, 3, 500, scheduler);
202 *
203 * @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
204 * @param bufferSize [Optional] Maximum element count of the replay buffer.
205 * @param windowSize [Optional] Maximum time length of the replay buffer.
206 * @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
207 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
208 */
209 observableProto.replay = function (selector, bufferSize, windowSize, scheduler) {
210 return selector && isFunction(selector) ?
211 this.multicast(function () { return new ReplaySubject(bufferSize, windowSize, scheduler); }, selector) :
212 this.multicast(new ReplaySubject(bufferSize, windowSize, scheduler));
213 };
214
215 /**
216 * Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
217 * This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
218 *
219 * @example
220 * var res = source.shareReplay(3);
221 * var res = source.shareReplay(3, 500);
222 * var res = source.shareReplay(3, 500, scheduler);
223 *
224
225 * @param bufferSize [Optional] Maximum element count of the replay buffer.
226 * @param window [Optional] Maximum time length of the replay buffer.
227 * @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
228 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
229 */
230 observableProto.shareReplay = function (bufferSize, windowSize, scheduler) {
231 return this.replay(null, bufferSize, windowSize, scheduler).refCount();
232 };
233
234 var ScheduledObserver = Rx.internals.ScheduledObserver = (function (__super__) {
235 inherits(ScheduledObserver, __super__);
236
237 function ScheduledObserver(scheduler, observer) {
238 __super__.call(this);
239 this.scheduler = scheduler;
240 this.observer = observer;
241 this.isAcquired = false;
242 this.hasFaulted = false;
243 this.queue = [];
244 this.disposable = new SerialDisposable();
245 }
246
247 function enqueueNext(observer, x) { return function () { observer.onNext(x); }; }
248 function enqueueError(observer, e) { return function () { observer.onError(e); }; }
249 function enqueueCompleted(observer) { return function () { observer.onCompleted(); }; }
250
251 ScheduledObserver.prototype.next = function (x) {
252 this.queue.push(enqueueNext(this.observer, x));
253 };
254
255 ScheduledObserver.prototype.error = function (e) {
256 this.queue.push(enqueueError(this.observer, e));
257 };
258
259 ScheduledObserver.prototype.completed = function () {
260 this.queue.push(enqueueCompleted(this.observer));
261 };
262
263
264 function scheduleMethod(state, recurse) {
265 var work;
266 if (state.queue.length > 0) {
267 work = state.queue.shift();
268 } else {
269 state.isAcquired = false;
270 return;
271 }
272 var res = tryCatch(work)();
273 if (res === errorObj) {
274 state.queue = [];
275 state.hasFaulted = true;
276 return thrower(res.e);
277 }
278 recurse(state);
279 }
280
281 ScheduledObserver.prototype.ensureActive = function () {
282 var isOwner = false;
283 if (!this.hasFaulted && this.queue.length > 0) {
284 isOwner = !this.isAcquired;
285 this.isAcquired = true;
286 }
287 isOwner &&
288 this.disposable.setDisposable(this.scheduler.scheduleRecursive(this, scheduleMethod));
289 };
290
291 ScheduledObserver.prototype.dispose = function () {
292 __super__.prototype.dispose.call(this);
293 this.disposable.dispose();
294 };
295
296 return ScheduledObserver;
297 }(AbstractObserver));
298
299 var InnerSubscription = function (s, o) {
300 this._s = s;
301 this._o = o;
302 };
303
304 InnerSubscription.prototype.dispose = function () {
305 if (!this._s.isDisposed && this._o !== null) {
306 var idx = this._s.observers.indexOf(this._o);
307 this._s.observers.splice(idx, 1);
308 this._o = null;
309 }
310 };
311
312 /**
313 * Represents an object that is both an observable sequence as well as an observer.
314 * Each notification is broadcasted to all subscribed observers.
315 */
316 var Subject = Rx.Subject = (function (__super__) {
317 inherits(Subject, __super__);
318 function Subject() {
319 __super__.call(this);
320 this.isDisposed = false;
321 this.isStopped = false;
322 this.observers = [];
323 this.hasError = false;
324 }
325
326 addProperties(Subject.prototype, Observer.prototype, {
327 _subscribe: function (o) {
328 checkDisposed(this);
329 if (!this.isStopped) {
330 this.observers.push(o);
331 return new InnerSubscription(this, o);
332 }
333 if (this.hasError) {
334 o.onError(this.error);
335 return disposableEmpty;
336 }
337 o.onCompleted();
338 return disposableEmpty;
339 },
340 /**
341 * Indicates whether the subject has observers subscribed to it.
342 * @returns {Boolean} Indicates whether the subject has observers subscribed to it.
343 */
344 hasObservers: function () { return this.observers.length > 0; },
345 /**
346 * Notifies all subscribed observers about the end of the sequence.
347 */
348 onCompleted: function () {
349 checkDisposed(this);
350 if (!this.isStopped) {
351 this.isStopped = true;
352 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
353 os[i].onCompleted();
354 }
355
356 this.observers.length = 0;
357 }
358 },
359 /**
360 * Notifies all subscribed observers about the exception.
361 * @param {Mixed} error The exception to send to all observers.
362 */
363 onError: function (error) {
364 checkDisposed(this);
365 if (!this.isStopped) {
366 this.isStopped = true;
367 this.error = error;
368 this.hasError = true;
369 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
370 os[i].onError(error);
371 }
372
373 this.observers.length = 0;
374 }
375 },
376 /**
377 * Notifies all subscribed observers about the arrival of the specified element in the sequence.
378 * @param {Mixed} value The value to send to all observers.
379 */
380 onNext: function (value) {
381 checkDisposed(this);
382 if (!this.isStopped) {
383 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
384 os[i].onNext(value);
385 }
386 }
387 },
388 /**
389 * Unsubscribe all observers and release resources.
390 */
391 dispose: function () {
392 this.isDisposed = true;
393 this.observers = null;
394 }
395 });
396
397 /**
398 * Creates a subject from the specified observer and observable.
399 * @param {Observer} observer The observer used to send messages to the subject.
400 * @param {Observable} observable The observable used to subscribe to messages sent from the subject.
401 * @returns {Subject} Subject implemented using the given observer and observable.
402 */
403 Subject.create = function (observer, observable) {
404 return new AnonymousSubject(observer, observable);
405 };
406
407 return Subject;
408 }(Observable));
409
410 var AnonymousSubject = Rx.AnonymousSubject = (function (__super__) {
411 inherits(AnonymousSubject, __super__);
412 function AnonymousSubject(observer, observable) {
413 this.observer = observer;
414 this.observable = observable;
415 __super__.call(this);
416 }
417
418 addProperties(AnonymousSubject.prototype, Observer.prototype, {
419 _subscribe: function (o) {
420 return this.observable.subscribe(o);
421 },
422 onCompleted: function () {
423 this.observer.onCompleted();
424 },
425 onError: function (error) {
426 this.observer.onError(error);
427 },
428 onNext: function (value) {
429 this.observer.onNext(value);
430 }
431 });
432
433 return AnonymousSubject;
434 }(Observable));
435
436 /**
437 * Represents the result of an asynchronous operation.
438 * The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
439 */
440 var AsyncSubject = Rx.AsyncSubject = (function (__super__) {
441 inherits(AsyncSubject, __super__);
442
443 /**
444 * Creates a subject that can only receive one value and that value is cached for all future observations.
445 * @constructor
446 */
447 function AsyncSubject() {
448 __super__.call(this);
449 this.isDisposed = false;
450 this.isStopped = false;
451 this.hasValue = false;
452 this.observers = [];
453 this.hasError = false;
454 }
455
456 addProperties(AsyncSubject.prototype, Observer.prototype, {
457 _subscribe: function (o) {
458 checkDisposed(this);
459
460 if (!this.isStopped) {
461 this.observers.push(o);
462 return new InnerSubscription(this, o);
463 }
464
465 if (this.hasError) {
466 o.onError(this.error);
467 } else if (this.hasValue) {
468 o.onNext(this.value);
469 o.onCompleted();
470 } else {
471 o.onCompleted();
472 }
473
474 return disposableEmpty;
475 },
476 /**
477 * Indicates whether the subject has observers subscribed to it.
478 * @returns {Boolean} Indicates whether the subject has observers subscribed to it.
479 */
480 hasObservers: function () {
481 checkDisposed(this);
482 return this.observers.length > 0;
483 },
484 /**
485 * Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any).
486 */
487 onCompleted: function () {
488 var i, len;
489 checkDisposed(this);
490 if (!this.isStopped) {
491 this.isStopped = true;
492 var os = cloneArray(this.observers), len = os.length;
493
494 if (this.hasValue) {
495 for (i = 0; i < len; i++) {
496 var o = os[i];
497 o.onNext(this.value);
498 o.onCompleted();
499 }
500 } else {
501 for (i = 0; i < len; i++) {
502 os[i].onCompleted();
503 }
504 }
505
506 this.observers.length = 0;
507 }
508 },
509 /**
510 * Notifies all subscribed observers about the error.
511 * @param {Mixed} error The Error to send to all observers.
512 */
513 onError: function (error) {
514 checkDisposed(this);
515 if (!this.isStopped) {
516 this.isStopped = true;
517 this.hasError = true;
518 this.error = error;
519
520 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
521 os[i].onError(error);
522 }
523
524 this.observers.length = 0;
525 }
526 },
527 /**
528 * Sends a value to the subject. The last value received before successful termination will be sent to all subscribed and future observers.
529 * @param {Mixed} value The value to store in the subject.
530 */
531 onNext: function (value) {
532 checkDisposed(this);
533 if (this.isStopped) { return; }
534 this.value = value;
535 this.hasValue = true;
536 },
537 /**
538 * Unsubscribe all observers and release resources.
539 */
540 dispose: function () {
541 this.isDisposed = true;
542 this.observers = null;
543 this.error = null;
544 this.value = null;
545 }
546 });
547
548 return AsyncSubject;
549 }(Observable));
550
551 /**
552 * Represents a value that changes over time.
553 * Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
554 */
555 var BehaviorSubject = Rx.BehaviorSubject = (function (__super__) {
556 inherits(BehaviorSubject, __super__);
557 function BehaviorSubject(value) {
558 __super__.call(this);
559 this.value = value;
560 this.observers = [];
561 this.isDisposed = false;
562 this.isStopped = false;
563 this.hasError = false;
564 }
565
566 addProperties(BehaviorSubject.prototype, Observer.prototype, {
567 _subscribe: function (o) {
568 checkDisposed(this);
569 if (!this.isStopped) {
570 this.observers.push(o);
571 o.onNext(this.value);
572 return new InnerSubscription(this, o);
573 }
574 if (this.hasError) {
575 o.onError(this.error);
576 } else {
577 o.onCompleted();
578 }
579 return disposableEmpty;
580 },
581 /**
582 * Gets the current value or throws an exception.
583 * Value is frozen after onCompleted is called.
584 * After onError is called always throws the specified exception.
585 * An exception is always thrown after dispose is called.
586 * @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext.
587 */
588 getValue: function () {
589 checkDisposed(this);
590 if (this.hasError) { thrower(this.error); }
591 return this.value;
592 },
593 /**
594 * Indicates whether the subject has observers subscribed to it.
595 * @returns {Boolean} Indicates whether the subject has observers subscribed to it.
596 */
597 hasObservers: function () { return this.observers.length > 0; },
598 /**
599 * Notifies all subscribed observers about the end of the sequence.
600 */
601 onCompleted: function () {
602 checkDisposed(this);
603 if (this.isStopped) { return; }
604 this.isStopped = true;
605 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
606 os[i].onCompleted();
607 }
608
609 this.observers.length = 0;
610 },
611 /**
612 * Notifies all subscribed observers about the exception.
613 * @param {Mixed} error The exception to send to all observers.
614 */
615 onError: function (error) {
616 checkDisposed(this);
617 if (this.isStopped) { return; }
618 this.isStopped = true;
619 this.hasError = true;
620 this.error = error;
621
622 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
623 os[i].onError(error);
624 }
625
626 this.observers.length = 0;
627 },
628 /**
629 * Notifies all subscribed observers about the arrival of the specified element in the sequence.
630 * @param {Mixed} value The value to send to all observers.
631 */
632 onNext: function (value) {
633 checkDisposed(this);
634 if (this.isStopped) { return; }
635 this.value = value;
636 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
637 os[i].onNext(value);
638 }
639 },
640 /**
641 * Unsubscribe all observers and release resources.
642 */
643 dispose: function () {
644 this.isDisposed = true;
645 this.observers = null;
646 this.value = null;
647 this.error = null;
648 }
649 });
650
651 return BehaviorSubject;
652 }(Observable));
653
654 /**
655 * Represents an object that is both an observable sequence as well as an observer.
656 * Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
657 */
658 var ReplaySubject = Rx.ReplaySubject = (function (__super__) {
659
660 var maxSafeInteger = Math.pow(2, 53) - 1;
661
662 function createRemovableDisposable(subject, observer) {
663 return disposableCreate(function () {
664 observer.dispose();
665 !subject.isDisposed && subject.observers.splice(subject.observers.indexOf(observer), 1);
666 });
667 }
668
669 inherits(ReplaySubject, __super__);
670
671 /**
672 * Initializes a new instance of the ReplaySubject class with the specified buffer size, window size and scheduler.
673 * @param {Number} [bufferSize] Maximum element count of the replay buffer.
674 * @param {Number} [windowSize] Maximum time length of the replay buffer.
675 * @param {Scheduler} [scheduler] Scheduler the observers are invoked on.
676 */
677 function ReplaySubject(bufferSize, windowSize, scheduler) {
678 this.bufferSize = bufferSize == null ? maxSafeInteger : bufferSize;
679 this.windowSize = windowSize == null ? maxSafeInteger : windowSize;
680 this.scheduler = scheduler || currentThreadScheduler;
681 this.q = [];
682 this.observers = [];
683 this.isStopped = false;
684 this.isDisposed = false;
685 this.hasError = false;
686 this.error = null;
687 __super__.call(this);
688 }
689
690 addProperties(ReplaySubject.prototype, Observer.prototype, {
691 _subscribe: function (o) {
692 checkDisposed(this);
693 var so = new ScheduledObserver(this.scheduler, o), subscription = createRemovableDisposable(this, so);
694
695 this._trim(this.scheduler.now());
696 this.observers.push(so);
697
698 for (var i = 0, len = this.q.length; i < len; i++) {
699 so.onNext(this.q[i].value);
700 }
701
702 if (this.hasError) {
703 so.onError(this.error);
704 } else if (this.isStopped) {
705 so.onCompleted();
706 }
707
708 so.ensureActive();
709 return subscription;
710 },
711 /**
712 * Indicates whether the subject has observers subscribed to it.
713 * @returns {Boolean} Indicates whether the subject has observers subscribed to it.
714 */
715 hasObservers: function () {
716 return this.observers.length > 0;
717 },
718 _trim: function (now) {
719 while (this.q.length > this.bufferSize) {
720 this.q.shift();
721 }
722 while (this.q.length > 0 && (now - this.q[0].interval) > this.windowSize) {
723 this.q.shift();
724 }
725 },
726 /**
727 * Notifies all subscribed observers about the arrival of the specified element in the sequence.
728 * @param {Mixed} value The value to send to all observers.
729 */
730 onNext: function (value) {
731 checkDisposed(this);
732 if (this.isStopped) { return; }
733 var now = this.scheduler.now();
734 this.q.push({ interval: now, value: value });
735 this._trim(now);
736
737 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
738 var observer = os[i];
739 observer.onNext(value);
740 observer.ensureActive();
741 }
742 },
743 /**
744 * Notifies all subscribed observers about the exception.
745 * @param {Mixed} error The exception to send to all observers.
746 */
747 onError: function (error) {
748 checkDisposed(this);
749 if (this.isStopped) { return; }
750 this.isStopped = true;
751 this.error = error;
752 this.hasError = true;
753 var now = this.scheduler.now();
754 this._trim(now);
755 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
756 var observer = os[i];
757 observer.onError(error);
758 observer.ensureActive();
759 }
760 this.observers.length = 0;
761 },
762 /**
763 * Notifies all subscribed observers about the end of the sequence.
764 */
765 onCompleted: function () {
766 checkDisposed(this);
767 if (this.isStopped) { return; }
768 this.isStopped = true;
769 var now = this.scheduler.now();
770 this._trim(now);
771 for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) {
772 var observer = os[i];
773 observer.onCompleted();
774 observer.ensureActive();
775 }
776 this.observers.length = 0;
777 },
778 /**
779 * Unsubscribe all observers and release resources.
780 */
781 dispose: function () {
782 this.isDisposed = true;
783 this.observers = null;
784 }
785 });
786
787 return ReplaySubject;
788 }(Observable));
789
790 var RefCountObservable = (function (__super__) {
791 inherits(RefCountObservable, __super__);
792 function RefCountObservable(source) {
793 this.source = source;
794 this._count = 0;
795 this._connectableSubscription = null;
796 __super__.call(this);
797 }
798
799 RefCountObservable.prototype.subscribeCore = function (o) {
800 var subscription = this.source.subscribe(o);
801 ++this._count === 1 && (this._connectableSubscription = this.source.connect());
802 return new RefCountDisposable(this, subscription);
803 };
804
805 function RefCountDisposable(p, s) {
806 this._p = p;
807 this._s = s;
808 this.isDisposed = false;
809 }
810
811 RefCountDisposable.prototype.dispose = function () {
812 if (!this.isDisposed) {
813 this.isDisposed = true;
814 this._s.dispose();
815 --this._p._count === 0 && this._p._connectableSubscription.dispose();
816 }
817 };
818
819 return RefCountObservable;
820 }(ObservableBase));
821
822 var ConnectableObservable = Rx.ConnectableObservable = (function (__super__) {
823 inherits(ConnectableObservable, __super__);
824 function ConnectableObservable(source, subject) {
825 this.source = source;
826 this._connection = null;
827 this._source = source.asObservable();
828 this._subject = subject;
829 __super__.call(this);
830 }
831
832 function ConnectDisposable(parent, subscription) {
833 this._p = parent;
834 this._s = subscription;
835 }
836
837 ConnectDisposable.prototype.dispose = function () {
838 if (this._s) {
839 this._s.dispose();
840 this._s = null;
841 this._p._connection = null;
842 }
843 };
844
845 ConnectableObservable.prototype.connect = function () {
846 if (!this._connection) {
847 var subscription = this._source.subscribe(this._subject);
848 this._connection = new ConnectDisposable(this, subscription);
849 }
850 return this._connection;
851 };
852
853 ConnectableObservable.prototype._subscribe = function (o) {
854 return this._subject.subscribe(o);
855 };
856
857 ConnectableObservable.prototype.refCount = function () {
858 return new RefCountObservable(this);
859 };
860
861 return ConnectableObservable;
862 }(Observable));
863
864 return Rx;
865}));