UNPKG

29.5 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx.lite.compat'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('rx-lite-compat'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 // References
35 var Observable = Rx.Observable,
36 observableProto = Observable.prototype,
37 observableNever = Observable.never,
38 observableThrow = Observable['throw'],
39 AnonymousObservable = Rx.AnonymousObservable,
40 ObservableBase = Rx.ObservableBase,
41 AnonymousObserver = Rx.AnonymousObserver,
42 notificationCreateOnNext = Rx.Notification.createOnNext,
43 notificationCreateOnError = Rx.Notification.createOnError,
44 notificationCreateOnCompleted = Rx.Notification.createOnCompleted,
45 Observer = Rx.Observer,
46 observerCreate = Observer.create,
47 AbstractObserver = Rx.internals.AbstractObserver,
48 Subject = Rx.Subject,
49 internals = Rx.internals,
50 helpers = Rx.helpers,
51 ScheduledObserver = internals.ScheduledObserver,
52 SerialDisposable = Rx.SerialDisposable,
53 SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
54 CompositeDisposable = Rx.CompositeDisposable,
55 BinaryDisposable = Rx.BinaryDisposable,
56 RefCountDisposable = Rx.RefCountDisposable,
57 disposableEmpty = Rx.Disposable.empty,
58 immediateScheduler = Rx.Scheduler.immediate,
59 defaultKeySerializer = helpers.defaultKeySerializer,
60 addRef = Rx.internals.addRef,
61 identity = helpers.identity,
62 isPromise = helpers.isPromise,
63 isFunction = helpers.isFunction,
64 inherits = internals.inherits,
65 bindCallback = internals.bindCallback,
66 noop = helpers.noop,
67 isScheduler = Rx.Scheduler.isScheduler,
68 observableFromPromise = Observable.fromPromise,
69 ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError;
70
71 var errorObj = {e: {}};
72
73 function tryCatcherGen(tryCatchTarget) {
74 return function tryCatcher() {
75 try {
76 return tryCatchTarget.apply(this, arguments);
77 } catch (e) {
78 errorObj.e = e;
79 return errorObj;
80 }
81 };
82 }
83
84 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
85 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
86 return tryCatcherGen(fn);
87 };
88
89 function thrower(e) {
90 throw e;
91 }
92
93 function ScheduledDisposable(scheduler, disposable) {
94 this.scheduler = scheduler;
95 this.disposable = disposable;
96 this.isDisposed = false;
97 }
98
99 function scheduleItem(s, self) {
100 if (!self.isDisposed) {
101 self.isDisposed = true;
102 self.disposable.dispose();
103 }
104 }
105
106 ScheduledDisposable.prototype.dispose = function () {
107 this.scheduler.schedule(this, scheduleItem);
108 };
109
110 var CheckedObserver = (function (__super__) {
111 inherits(CheckedObserver, __super__);
112
113 function CheckedObserver(observer) {
114 __super__.call(this);
115 this._observer = observer;
116 this._state = 0; // 0 - idle, 1 - busy, 2 - done
117 }
118
119 var CheckedObserverPrototype = CheckedObserver.prototype;
120
121 CheckedObserverPrototype.onNext = function (value) {
122 this.checkAccess();
123 var res = tryCatch(this._observer.onNext).call(this._observer, value);
124 this._state = 0;
125 res === errorObj && thrower(res.e);
126 };
127
128 CheckedObserverPrototype.onError = function (err) {
129 this.checkAccess();
130 var res = tryCatch(this._observer.onError).call(this._observer, err);
131 this._state = 2;
132 res === errorObj && thrower(res.e);
133 };
134
135 CheckedObserverPrototype.onCompleted = function () {
136 this.checkAccess();
137 var res = tryCatch(this._observer.onCompleted).call(this._observer);
138 this._state = 2;
139 res === errorObj && thrower(res.e);
140 };
141
142 CheckedObserverPrototype.checkAccess = function () {
143 if (this._state === 1) { throw new Error('Re-entrancy detected'); }
144 if (this._state === 2) { throw new Error('Observer completed'); }
145 if (this._state === 0) { this._state = 1; }
146 };
147
148 return CheckedObserver;
149 }(Observer));
150
151 var ObserveOnObserver = (function (__super__) {
152 inherits(ObserveOnObserver, __super__);
153
154 function ObserveOnObserver(scheduler, observer, cancel) {
155 __super__.call(this, scheduler, observer);
156 this._cancel = cancel;
157 }
158
159 ObserveOnObserver.prototype.next = function (value) {
160 __super__.prototype.next.call(this, value);
161 this.ensureActive();
162 };
163
164 ObserveOnObserver.prototype.error = function (e) {
165 __super__.prototype.error.call(this, e);
166 this.ensureActive();
167 };
168
169 ObserveOnObserver.prototype.completed = function () {
170 __super__.prototype.completed.call(this);
171 this.ensureActive();
172 };
173
174 ObserveOnObserver.prototype.dispose = function () {
175 __super__.prototype.dispose.call(this);
176 this._cancel && this._cancel.dispose();
177 this._cancel = null;
178 };
179
180 return ObserveOnObserver;
181 })(ScheduledObserver);
182
183 /**
184 * Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
185 * If a violation is detected, an Error is thrown from the offending observer method call.
186 *
187 * @returns An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
188 */
189 Observer.prototype.checked = function () { return new CheckedObserver(this); };
190
191 /**
192 * Schedules the invocation of observer methods on the given scheduler.
193 * @param {Scheduler} scheduler Scheduler to schedule observer messages on.
194 * @returns {Observer} Observer whose messages are scheduled on the given scheduler.
195 */
196 Observer.notifyOn = function (scheduler) {
197 return new ObserveOnObserver(scheduler, this);
198 };
199
200 /**
201 * Creates an observer from a notification callback.
202 * @param {Function} handler Action that handles a notification.
203 * @returns The observer object that invokes the specified handler using a notification corresponding to each message it receives.
204 */
205 Observer.fromNotifier = function (handler, thisArg) {
206 var handlerFunc = bindCallback(handler, thisArg, 1);
207 return new AnonymousObserver(function (x) {
208 return handlerFunc(notificationCreateOnNext(x));
209 }, function (e) {
210 return handlerFunc(notificationCreateOnError(e));
211 }, function () {
212 return handlerFunc(notificationCreateOnCompleted());
213 });
214 };
215
216 /**
217 * Creates a notification callback from an observer.
218 * @returns The action that forwards its input notification to the underlying observer.
219 */
220 Observer.prototype.toNotifier = function () {
221 var observer = this;
222 return function (n) { return n.accept(observer); };
223 };
224
225 /**
226 * Hides the identity of an observer.
227 * @returns An observer that hides the identity of the specified observer.
228 */
229 Observer.prototype.asObserver = function () {
230 var source = this;
231 return new AnonymousObserver(
232 function (x) { source.onNext(x); },
233 function (e) { source.onError(e); },
234 function () { source.onCompleted(); }
235 );
236 };
237
238var ObserveOnObservable = (function (__super__) {
239 inherits(ObserveOnObservable, __super__);
240 function ObserveOnObservable(source, s) {
241 this.source = source;
242 this._s = s;
243 __super__.call(this);
244 }
245
246 ObserveOnObservable.prototype.subscribeCore = function (o) {
247 return this.source.subscribe(new ObserveOnObserver(this._s, o));
248 };
249
250 return ObserveOnObservable;
251}(ObservableBase));
252
253 /**
254 * Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
255 *
256 * This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
257 * that require to be run on a scheduler, use subscribeOn.
258 *
259 * @param {Scheduler} scheduler Scheduler to notify observers on.
260 * @returns {Observable} The source sequence whose observations happen on the specified scheduler.
261 */
262 observableProto.observeOn = function (scheduler) {
263 return new ObserveOnObservable(this, scheduler);
264 };
265
266 var SubscribeOnObservable = (function (__super__) {
267 inherits(SubscribeOnObservable, __super__);
268 function SubscribeOnObservable(source, s) {
269 this.source = source;
270 this._s = s;
271 __super__.call(this);
272 }
273
274 function scheduleMethod(scheduler, state) {
275 var source = state[0], d = state[1], o = state[2];
276 d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(o)));
277 }
278
279 SubscribeOnObservable.prototype.subscribeCore = function (o) {
280 var m = new SingleAssignmentDisposable(), d = new SerialDisposable();
281 d.setDisposable(m);
282 m.setDisposable(this._s.schedule([this.source, d, o], scheduleMethod));
283 return d;
284 };
285
286 return SubscribeOnObservable;
287 }(ObservableBase));
288
289 /**
290 * Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
291 * see the remarks section for more information on the distinction between subscribeOn and observeOn.
292
293 * This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
294 * callbacks on a scheduler, use observeOn.
295
296 * @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on.
297 * @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
298 */
299 observableProto.subscribeOn = function (scheduler) {
300 return new SubscribeOnObservable(this, scheduler);
301 };
302
303 var GenerateObservable = (function (__super__) {
304 inherits(GenerateObservable, __super__);
305 function GenerateObservable(state, cndFn, itrFn, resFn, s) {
306 this._state = state;
307 this._cndFn = cndFn;
308 this._itrFn = itrFn;
309 this._resFn = resFn;
310 this._s = s;
311 this._first = true;
312 __super__.call(this);
313 }
314
315 function scheduleRecursive(self, recurse) {
316 if (self._first) {
317 self._first = false;
318 } else {
319 self._state = tryCatch(self._itrFn)(self._state);
320 if (self._state === errorObj) { return self._o.onError(self._state.e); }
321 }
322 var hasResult = tryCatch(self._cndFn)(self._state);
323 if (hasResult === errorObj) { return self._o.onError(hasResult.e); }
324 if (hasResult) {
325 var result = tryCatch(self._resFn)(self._state);
326 if (result === errorObj) { return self._o.onError(result.e); }
327 self._o.onNext(result);
328 recurse(self);
329 } else {
330 self._o.onCompleted();
331 }
332 }
333
334 GenerateObservable.prototype.subscribeCore = function (o) {
335 this._o = o;
336 return this._s.scheduleRecursive(this, scheduleRecursive);
337 };
338
339 return GenerateObservable;
340 }(ObservableBase));
341
342 /**
343 * Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.
344 *
345 * @example
346 * var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; });
347 * var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }, Rx.Scheduler.timeout);
348 * @param {Mixed} initialState Initial state.
349 * @param {Function} condition Condition to terminate generation (upon returning false).
350 * @param {Function} iterate Iteration step function.
351 * @param {Function} resultSelector Selector function for results produced in the sequence.
352 * @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not provided, defaults to Scheduler.currentThread.
353 * @returns {Observable} The generated sequence.
354 */
355 Observable.generate = function (initialState, condition, iterate, resultSelector, scheduler) {
356 isScheduler(scheduler) || (scheduler = currentThreadScheduler);
357 return new GenerateObservable(initialState, condition, iterate, resultSelector, scheduler);
358 };
359
360 var UsingObservable = (function (__super__) {
361 inherits(UsingObservable, __super__);
362 function UsingObservable(resFn, obsFn) {
363 this._resFn = resFn;
364 this._obsFn = obsFn;
365 __super__.call(this);
366 }
367
368 UsingObservable.prototype.subscribeCore = function (o) {
369 var disposable = disposableEmpty;
370 var resource = tryCatch(this._resFn)();
371 if (resource === errorObj) {
372 return new BinaryDisposable(observableThrow(resource.e).subscribe(o), disposable);
373 }
374 resource && (disposable = resource);
375 var source = tryCatch(this._obsFn)(resource);
376 if (source === errorObj) {
377 return new BinaryDisposable(observableThrow(source.e).subscribe(o), disposable);
378 }
379 return new BinaryDisposable(source.subscribe(o), disposable);
380 };
381
382 return UsingObservable;
383 }(ObservableBase));
384
385 /**
386 * Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
387 * @param {Function} resourceFactory Factory function to obtain a resource object.
388 * @param {Function} observableFactory Factory function to obtain an observable sequence that depends on the obtained resource.
389 * @returns {Observable} An observable sequence whose lifetime controls the lifetime of the dependent resource object.
390 */
391 Observable.using = function (resourceFactory, observableFactory) {
392 return new UsingObservable(resourceFactory, observableFactory);
393 };
394
395 /**
396 * Propagates the observable sequence or Promise that reacts first.
397 * @param {Observable} rightSource Second observable sequence or Promise.
398 * @returns {Observable} {Observable} An observable sequence that surfaces either of the given sequences, whichever reacted first.
399 */
400 observableProto.amb = function (rightSource) {
401 var leftSource = this;
402 return new AnonymousObservable(function (observer) {
403 var choice,
404 leftChoice = 'L', rightChoice = 'R',
405 leftSubscription = new SingleAssignmentDisposable(),
406 rightSubscription = new SingleAssignmentDisposable();
407
408 isPromise(rightSource) && (rightSource = observableFromPromise(rightSource));
409
410 function choiceL() {
411 if (!choice) {
412 choice = leftChoice;
413 rightSubscription.dispose();
414 }
415 }
416
417 function choiceR() {
418 if (!choice) {
419 choice = rightChoice;
420 leftSubscription.dispose();
421 }
422 }
423
424 var leftSubscribe = observerCreate(
425 function (left) {
426 choiceL();
427 choice === leftChoice && observer.onNext(left);
428 },
429 function (e) {
430 choiceL();
431 choice === leftChoice && observer.onError(e);
432 },
433 function () {
434 choiceL();
435 choice === leftChoice && observer.onCompleted();
436 }
437 );
438 var rightSubscribe = observerCreate(
439 function (right) {
440 choiceR();
441 choice === rightChoice && observer.onNext(right);
442 },
443 function (e) {
444 choiceR();
445 choice === rightChoice && observer.onError(e);
446 },
447 function () {
448 choiceR();
449 choice === rightChoice && observer.onCompleted();
450 }
451 );
452
453 leftSubscription.setDisposable(leftSource.subscribe(leftSubscribe));
454 rightSubscription.setDisposable(rightSource.subscribe(rightSubscribe));
455
456 return new BinaryDisposable(leftSubscription, rightSubscription);
457 });
458 };
459
460 function amb(p, c) { return p.amb(c); }
461
462 /**
463 * Propagates the observable sequence or Promise that reacts first.
464 * @returns {Observable} An observable sequence that surfaces any of the given sequences, whichever reacted first.
465 */
466 Observable.amb = function () {
467 var acc = observableNever(), items;
468 if (Array.isArray(arguments[0])) {
469 items = arguments[0];
470 } else {
471 var len = arguments.length;
472 items = new Array(items);
473 for(var i = 0; i < len; i++) { items[i] = arguments[i]; }
474 }
475 for (var i = 0, len = items.length; i < len; i++) {
476 acc = amb(acc, items[i]);
477 }
478 return acc;
479 };
480
481 /**
482 * Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
483 * @param {Observable} second Second observable sequence used to produce results after the first sequence terminates.
484 * @returns {Observable} An observable sequence that concatenates the first and second sequence, even if the first sequence terminates exceptionally.
485 */
486 observableProto.onErrorResumeNext = function (second) {
487 if (!second) { throw new Error('Second observable is required'); }
488 return onErrorResumeNext([this, second]);
489 };
490
491 var OnErrorResumeNextObservable = (function(__super__) {
492 inherits(OnErrorResumeNextObservable, __super__);
493 function OnErrorResumeNextObservable(sources) {
494 this.sources = sources;
495 __super__.call(this);
496 }
497
498 function scheduleMethod(state, recurse) {
499 if (state.pos < state.sources.length) {
500 var current = state.sources[state.pos++];
501 isPromise(current) && (current = observableFromPromise(current));
502 var d = new SingleAssignmentDisposable();
503 state.subscription.setDisposable(d);
504 d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
505 } else {
506 state.o.onCompleted();
507 }
508 }
509
510 OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
511 var subscription = new SerialDisposable(),
512 state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
513 cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
514
515 return new BinaryDisposable(subscription, cancellable);
516 };
517
518 return OnErrorResumeNextObservable;
519 }(ObservableBase));
520
521 var OnErrorResumeNextObserver = (function(__super__) {
522 inherits(OnErrorResumeNextObserver, __super__);
523 function OnErrorResumeNextObserver(state, recurse) {
524 this._state = state;
525 this._recurse = recurse;
526 __super__.call(this);
527 }
528
529 OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
530 OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
531 OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
532
533 return OnErrorResumeNextObserver;
534 }(AbstractObserver));
535
536 /**
537 * Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
538 * @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
539 */
540 var onErrorResumeNext = Observable.onErrorResumeNext = function () {
541 var sources = [];
542 if (Array.isArray(arguments[0])) {
543 sources = arguments[0];
544 } else {
545 var len = arguments.length;
546 sources = new Array(len);
547 for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
548 }
549 return new OnErrorResumeNextObservable(sources);
550 };
551
552 function toArray(x) { return x.toArray(); }
553 function notEmpty(x) { return x.length > 0; }
554
555 /**
556 * Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
557 * @param {Number} count Length of each buffer.
558 * @param {Number} [skip] Number of elements to skip between creation of consecutive buffers. If not provided, defaults to the count.
559 * @returns {Observable} An observable sequence of buffers.
560 */
561 observableProto.bufferWithCount = function (count, skip) {
562 typeof skip !== 'number' && (skip = count);
563 return this.windowWithCount(count, skip)
564 .flatMap(toArray)
565 .filter(notEmpty);
566 };
567
568 /**
569 * Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
570 * @param {Number} count Length of each window.
571 * @param {Number} [skip] Number of elements to skip between creation of consecutive windows. If not specified, defaults to the count.
572 * @returns {Observable} An observable sequence of windows.
573 */
574 observableProto.windowWithCount = function (count, skip) {
575 var source = this;
576 +count || (count = 0);
577 Math.abs(count) === Infinity && (count = 0);
578 if (count <= 0) { throw new ArgumentOutOfRangeError(); }
579 skip == null && (skip = count);
580 +skip || (skip = 0);
581 Math.abs(skip) === Infinity && (skip = 0);
582
583 if (skip <= 0) { throw new ArgumentOutOfRangeError(); }
584 return new AnonymousObservable(function (observer) {
585 var m = new SingleAssignmentDisposable(),
586 refCountDisposable = new RefCountDisposable(m),
587 n = 0,
588 q = [];
589
590 function createWindow () {
591 var s = new Subject();
592 q.push(s);
593 observer.onNext(addRef(s, refCountDisposable));
594 }
595
596 createWindow();
597
598 m.setDisposable(source.subscribe(
599 function (x) {
600 for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); }
601 var c = n - count + 1;
602 c >= 0 && c % skip === 0 && q.shift().onCompleted();
603 ++n % skip === 0 && createWindow();
604 },
605 function (e) {
606 while (q.length > 0) { q.shift().onError(e); }
607 observer.onError(e);
608 },
609 function () {
610 while (q.length > 0) { q.shift().onCompleted(); }
611 observer.onCompleted();
612 }
613 ));
614 return refCountDisposable;
615 }, source);
616 };
617
618 var TakeLastBufferObserver = (function (__super__) {
619 inherits(TakeLastBufferObserver, __super__);
620 function TakeLastBufferObserver(o, c) {
621 this._o = o;
622 this._c = c;
623 this._q = [];
624 __super__.call(this);
625 }
626
627 TakeLastBufferObserver.prototype.next = function (x) {
628 this._q.push(x);
629 this._q.length > this._c && this._q.shift();
630 };
631
632 TakeLastBufferObserver.prototype.error = function (e) {
633 this._o.onError(e);
634 };
635
636 TakeLastBufferObserver.prototype.completed = function () {
637 this._o.onNext(this._q);
638 this._o.onCompleted();
639 };
640
641 return TakeLastBufferObserver;
642 }(AbstractObserver));
643
644 /**
645 * Returns an array with the specified number of contiguous elements from the end of an observable sequence.
646 *
647 * @description
648 * This operator accumulates a buffer with a length enough to store count elements. Upon completion of the
649 * source sequence, this buffer is produced on the result sequence.
650 * @param {Number} count Number of elements to take from the end of the source sequence.
651 * @returns {Observable} An observable sequence containing a single array with the specified number of elements from the end of the source sequence.
652 */
653 observableProto.takeLastBuffer = function (count) {
654 if (count < 0) { throw new ArgumentOutOfRangeError(); }
655 var source = this;
656 return new AnonymousObservable(function (o) {
657 return source.subscribe(new TakeLastBufferObserver(o, count));
658 }, source);
659 };
660
661 var DefaultIfEmptyObserver = (function (__super__) {
662 inherits(DefaultIfEmptyObserver, __super__);
663 function DefaultIfEmptyObserver(o, d) {
664 this._o = o;
665 this._d = d;
666 this._f = false;
667 __super__.call(this);
668 }
669
670 DefaultIfEmptyObserver.prototype.next = function (x) {
671 this._f = true;
672 this._o.onNext(x);
673 };
674
675 DefaultIfEmptyObserver.prototype.error = function (e) {
676 this._o.onError(e);
677 };
678
679 DefaultIfEmptyObserver.prototype.completed = function () {
680 !this._f && this._o.onNext(this._d);
681 this._o.onCompleted();
682 };
683
684 return DefaultIfEmptyObserver;
685 }(AbstractObserver));
686
687 /**
688 * Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty.
689 *
690 * var res = obs = xs.defaultIfEmpty();
691 * 2 - obs = xs.defaultIfEmpty(false);
692 *
693 * @memberOf Observable#
694 * @param defaultValue The value to return if the sequence is empty. If not provided, this defaults to null.
695 * @returns {Observable} An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself.
696 */
697 observableProto.defaultIfEmpty = function (defaultValue) {
698 var source = this;
699 defaultValue === undefined && (defaultValue = null);
700 return new AnonymousObservable(function (o) {
701 return source.subscribe(new DefaultIfEmptyObserver(o, defaultValue));
702 }, source);
703 };
704
705 // Swap out for Array.findIndex
706 function arrayIndexOfComparer(array, item, comparer) {
707 for (var i = 0, len = array.length; i < len; i++) {
708 if (comparer(array[i], item)) { return i; }
709 }
710 return -1;
711 }
712
713 function HashSet(comparer) {
714 this.comparer = comparer;
715 this.set = [];
716 }
717 HashSet.prototype.push = function(value) {
718 var retValue = arrayIndexOfComparer(this.set, value, this.comparer) === -1;
719 retValue && this.set.push(value);
720 return retValue;
721 };
722
723 var DistinctObservable = (function (__super__) {
724 inherits(DistinctObservable, __super__);
725 function DistinctObservable(source, keyFn, cmpFn) {
726 this.source = source;
727 this._keyFn = keyFn;
728 this._cmpFn = cmpFn;
729 __super__.call(this);
730 }
731
732 DistinctObservable.prototype.subscribeCore = function (o) {
733 return this.source.subscribe(new DistinctObserver(o, this._keyFn, this._cmpFn));
734 };
735
736 return DistinctObservable;
737 }(ObservableBase));
738
739 var DistinctObserver = (function (__super__) {
740 inherits(DistinctObserver, __super__);
741 function DistinctObserver(o, keyFn, cmpFn) {
742 this._o = o;
743 this._keyFn = keyFn;
744 this._h = new HashSet(cmpFn);
745 __super__.call(this);
746 }
747
748 DistinctObserver.prototype.next = function (x) {
749 var key = x;
750 if (isFunction(this._keyFn)) {
751 key = tryCatch(this._keyFn)(x);
752 if (key === errorObj) { return this._o.onError(key.e); }
753 }
754 this._h.push(key) && this._o.onNext(x);
755 };
756
757 DistinctObserver.prototype.error = function (e) { this._o.onError(e); };
758 DistinctObserver.prototype.completed = function () { this._o.onCompleted(); };
759
760 return DistinctObserver;
761 }(AbstractObserver));
762
763 /**
764 * Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer.
765 * Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.
766 *
767 * @example
768 * var res = obs = xs.distinct();
769 * 2 - obs = xs.distinct(function (x) { return x.id; });
770 * 2 - obs = xs.distinct(function (x) { return x.id; }, function (a,b) { return a === b; });
771 * @param {Function} [keySelector] A function to compute the comparison key for each element.
772 * @param {Function} [comparer] Used to compare items in the collection.
773 * @returns {Observable} An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.
774 */
775 observableProto.distinct = function (keySelector, comparer) {
776 comparer || (comparer = defaultComparer);
777 return new DistinctObservable(this, keySelector, comparer);
778 };
779
780 /**
781 * Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
782 * can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`)
783 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source.
784 */
785 observableProto.singleInstance = function() {
786 var source = this, hasObservable = false, observable;
787
788 function getObservable() {
789 if (!hasObservable) {
790 hasObservable = true;
791 observable = source['finally'](function() { hasObservable = false; }).publish().refCount();
792 }
793 return observable;
794 }
795
796 return new AnonymousObservable(function(o) {
797 return getObservable().subscribe(o);
798 });
799 };
800
801 return Rx;
802}));