UNPKG

21.3 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 // Aliases
35 var Observable = Rx.Observable,
36 observableProto = Observable.prototype,
37 ObservableBase = Rx.ObservableBase,
38 AbstractObserver = Rx.internals.AbstractObserver,
39 FlatMapObservable = Rx.FlatMapObservable,
40 observableConcat = Observable.concat,
41 observableDefer = Observable.defer,
42 observableEmpty = Observable.empty,
43 disposableEmpty = Rx.Disposable.empty,
44 CompositeDisposable = Rx.CompositeDisposable,
45 SerialDisposable = Rx.SerialDisposable,
46 SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
47 Enumerable = Rx.internals.Enumerable,
48 enumerableOf = Enumerable.of,
49 currentThreadScheduler = Rx.Scheduler.currentThread,
50 AsyncSubject = Rx.AsyncSubject,
51 Observer = Rx.Observer,
52 inherits = Rx.internals.inherits,
53 addProperties = Rx.internals.addProperties,
54 helpers = Rx.helpers,
55 noop = helpers.noop,
56 isPromise = helpers.isPromise,
57 isFunction = helpers.isFunction,
58 isIterable = Rx.helpers.isIterable,
59 isArrayLike = Rx.helpers.isArrayLike,
60 isScheduler = Rx.Scheduler.isScheduler,
61 observableFromPromise = Observable.fromPromise;
62
63 var errorObj = {e: {}};
64
65 function tryCatcherGen(tryCatchTarget) {
66 return function tryCatcher() {
67 try {
68 return tryCatchTarget.apply(this, arguments);
69 } catch (e) {
70 errorObj.e = e;
71 return errorObj;
72 }
73 };
74 }
75
76 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
77 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
78 return tryCatcherGen(fn);
79 };
80
81 function thrower(e) {
82 throw e;
83 }
84
85 // Shim in iterator support
86 var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) ||
87 '_es6shim_iterator_';
88 // Bug for mozilla version
89 if (root.Set && typeof new root.Set()['@@iterator'] === 'function') {
90 $iterator$ = '@@iterator';
91 }
92
93 var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined };
94
95 var isIterable = Rx.helpers.isIterable = function (o) {
96 return o && o[$iterator$] !== undefined;
97 };
98
99 var isArrayLike = Rx.helpers.isArrayLike = function (o) {
100 return o && o.length !== undefined;
101 };
102
103 Rx.helpers.iterator = $iterator$;
104
105 var WhileEnumerable = (function(__super__) {
106 inherits(WhileEnumerable, __super__);
107 function WhileEnumerable(c, s) {
108 this.c = c;
109 this.s = s;
110 }
111 WhileEnumerable.prototype[$iterator$] = function () {
112 var self = this;
113 return {
114 next: function () {
115 return self.c() ?
116 { done: false, value: self.s } :
117 { done: true, value: void 0 };
118 }
119 };
120 };
121 return WhileEnumerable;
122 }(Enumerable));
123
124 function enumerableWhile(condition, source) {
125 return new WhileEnumerable(condition, source);
126 }
127
128 /**
129 * Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions.
130 * This operator allows for a fluent style of writing queries that use the same sequence multiple times.
131 *
132 * @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence.
133 * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
134 */
135 observableProto.letBind = observableProto['let'] = function (func) {
136 return func(this);
137 };
138
139 /**
140 * Determines whether an observable collection contains values.
141 *
142 * @example
143 * 1 - res = Rx.Observable.if(condition, obs1);
144 * 2 - res = Rx.Observable.if(condition, obs1, obs2);
145 * 3 - res = Rx.Observable.if(condition, obs1, scheduler);
146 * @param {Function} condition The condition which determines if the thenSource or elseSource will be run.
147 * @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true.
148 * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler.
149 * @returns {Observable} An observable sequence which is either the thenSource or elseSource.
150 */
151 Observable['if'] = function (condition, thenSource, elseSourceOrScheduler) {
152 return observableDefer(function () {
153 elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty());
154
155 isPromise(thenSource) && (thenSource = observableFromPromise(thenSource));
156 isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler));
157
158 // Assume a scheduler for empty only
159 typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler));
160 return condition() ? thenSource : elseSourceOrScheduler;
161 });
162 };
163
164 /**
165 * Concatenates the observable sequences obtained by running the specified result selector for each element in source.
166 * There is an alias for this method called 'forIn' for browsers <IE9
167 * @param {Array} sources An array of values to turn into an observable sequence.
168 * @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence.
169 * @returns {Observable} An observable sequence from the concatenated observable sequences.
170 */
171 Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) {
172 return enumerableOf(sources, resultSelector, thisArg).concat();
173 };
174
175 /**
176 * Repeats source as long as condition holds emulating a while loop.
177 * There is an alias for this method called 'whileDo' for browsers <IE9
178 *
179 * @param {Function} condition The condition which determines if the source will be repeated.
180 * @param {Observable} source The observable sequence that will be run if the condition function returns true.
181 * @returns {Observable} An observable sequence which is repeated as long as the condition holds.
182 */
183 var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) {
184 isPromise(source) && (source = observableFromPromise(source));
185 return enumerableWhile(condition, source).concat();
186 };
187
188 /**
189 * Repeats source as long as condition holds emulating a do while loop.
190 *
191 * @param {Function} condition The condition which determines if the source will be repeated.
192 * @param {Observable} source The observable sequence that will be run if the condition function returns true.
193 * @returns {Observable} An observable sequence which is repeated as long as the condition holds.
194 */
195 observableProto.doWhile = function (condition) {
196 return observableConcat([this, observableWhileDo(condition, this)]);
197 };
198
199 /**
200 * Uses selector to determine which source in sources to use.
201 * @param {Function} selector The function which extracts the value for to test in a case statement.
202 * @param {Array} sources A object which has keys which correspond to the case statement labels.
203 * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler.
204 *
205 * @returns {Observable} An observable sequence which is determined by a case statement.
206 */
207 Observable['case'] = function (selector, sources, defaultSourceOrScheduler) {
208 return observableDefer(function () {
209 isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler));
210 defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty());
211
212 isScheduler(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler));
213
214 var result = sources[selector()];
215 isPromise(result) && (result = observableFromPromise(result));
216
217 return result || defaultSourceOrScheduler;
218 });
219 };
220
221 var ExpandObservable = (function(__super__) {
222 inherits(ExpandObservable, __super__);
223 function ExpandObservable(source, fn, scheduler) {
224 this.source = source;
225 this._fn = fn;
226 this._scheduler = scheduler;
227 __super__.call(this);
228 }
229
230 function scheduleRecursive(args, recurse) {
231 var state = args[0], self = args[1];
232 var work;
233 if (state.q.length > 0) {
234 work = state.q.shift();
235 } else {
236 state.isAcquired = false;
237 return;
238 }
239 var m1 = new SingleAssignmentDisposable();
240 state.d.add(m1);
241 m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1)));
242 recurse([state, self]);
243 }
244
245 ExpandObservable.prototype._ensureActive = function (state) {
246 var isOwner = false;
247 if (state.q.length > 0) {
248 isOwner = !state.isAcquired;
249 state.isAcquired = true;
250 }
251 isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive));
252 };
253
254 ExpandObservable.prototype.subscribeCore = function (o) {
255 var m = new SerialDisposable(),
256 d = new CompositeDisposable(m),
257 state = {
258 q: [],
259 m: m,
260 d: d,
261 activeCount: 0,
262 isAcquired: false,
263 o: o
264 };
265
266 state.q.push(this.source);
267 state.activeCount++;
268 this._ensureActive(state);
269 return d;
270 };
271
272 return ExpandObservable;
273 }(ObservableBase));
274
275 var ExpandObserver = (function(__super__) {
276 inherits(ExpandObserver, __super__);
277 function ExpandObserver(state, parent, m1) {
278 this._s = state;
279 this._p = parent;
280 this._m1 = m1;
281 __super__.call(this);
282 }
283
284 ExpandObserver.prototype.next = function (x) {
285 this._s.o.onNext(x);
286 var result = tryCatch(this._p._fn)(x);
287 if (result === errorObj) { return this._s.o.onError(result.e); }
288 this._s.q.push(result);
289 this._s.activeCount++;
290 this._p._ensureActive(this._s);
291 };
292
293 ExpandObserver.prototype.error = function (e) {
294 this._s.o.onError(e);
295 };
296
297 ExpandObserver.prototype.completed = function () {
298 this._s.d.remove(this._m1);
299 this._s.activeCount--;
300 this._s.activeCount === 0 && this._s.o.onCompleted();
301 };
302
303 return ExpandObserver;
304 }(AbstractObserver));
305
306 /**
307 * Expands an observable sequence by recursively invoking selector.
308 *
309 * @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
310 * @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
311 * @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
312 */
313 observableProto.expand = function (selector, scheduler) {
314 isScheduler(scheduler) || (scheduler = currentThreadScheduler);
315 return new ExpandObservable(this, selector, scheduler);
316 };
317
318 function argumentsToArray() {
319 var len = arguments.length, args = new Array(len);
320 for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
321 return args;
322 }
323
324 var ForkJoinObservable = (function (__super__) {
325 inherits(ForkJoinObservable, __super__);
326 function ForkJoinObservable(sources, cb) {
327 this._sources = sources;
328 this._cb = cb;
329 __super__.call(this);
330 }
331
332 ForkJoinObservable.prototype.subscribeCore = function (o) {
333 if (this._sources.length === 0) {
334 o.onCompleted();
335 return disposableEmpty;
336 }
337
338 var count = this._sources.length;
339 var state = {
340 finished: false,
341 hasResults: new Array(count),
342 hasCompleted: new Array(count),
343 results: new Array(count)
344 };
345
346 var subscriptions = new CompositeDisposable();
347 for (var i = 0, len = this._sources.length; i < len; i++) {
348 var source = this._sources[i];
349 isPromise(source) && (source = observableFromPromise(source));
350 subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions)));
351 }
352
353 return subscriptions;
354 };
355
356 return ForkJoinObservable;
357 }(ObservableBase));
358
359 var ForkJoinObserver = (function(__super__) {
360 inherits(ForkJoinObserver, __super__);
361 function ForkJoinObserver(o, s, i, cb, subs) {
362 this._o = o;
363 this._s = s;
364 this._i = i;
365 this._cb = cb;
366 this._subs = subs;
367 __super__.call(this);
368 }
369
370 ForkJoinObserver.prototype.next = function (x) {
371 if (!this._s.finished) {
372 this._s.hasResults[this._i] = true;
373 this._s.results[this._i] = x;
374 }
375 };
376
377 ForkJoinObserver.prototype.error = function (e) {
378 this._s.finished = true;
379 this._o.onError(e);
380 this._subs.dispose();
381 };
382
383 ForkJoinObserver.prototype.completed = function () {
384 if (!this._s.finished) {
385 if (!this._s.hasResults[this._i]) {
386 return this._o.onCompleted();
387 }
388 this._s.hasCompleted[this._i] = true;
389 for (var i = 0; i < this._s.results.length; i++) {
390 if (!this._s.hasCompleted[i]) { return; }
391 }
392 this._s.finished = true;
393
394 var res = tryCatch(this._cb).apply(null, this._s.results);
395 if (res === errorObj) { return this._o.onError(res.e); }
396
397 this._o.onNext(res);
398 this._o.onCompleted();
399 }
400 };
401
402 return ForkJoinObserver;
403 }(AbstractObserver));
404
405 /**
406 * Runs all observable sequences in parallel and collect their last elements.
407 *
408 * @example
409 * 1 - res = Rx.Observable.forkJoin([obs1, obs2]);
410 * 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...);
411 * @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences.
412 */
413 Observable.forkJoin = function () {
414 var len = arguments.length, args = new Array(len);
415 for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
416 var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
417 Array.isArray(args[0]) && (args = args[0]);
418 return new ForkJoinObservable(args, resultSelector);
419 };
420
421 /**
422 * Runs two observable sequences in parallel and combines their last elemenets.
423 * @param {Observable} second Second observable sequence.
424 * @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences.
425 * @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences.
426 */
427 observableProto.forkJoin = function () {
428 var len = arguments.length, args = new Array(len);
429 for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
430 if (Array.isArray(args[0])) {
431 args[0].unshift(this);
432 } else {
433 args.unshift(this);
434 }
435 return Observable.forkJoin.apply(null, args);
436 };
437
438 /**
439 * Comonadic bind operator.
440 * @param {Function} selector A transform function to apply to each element.
441 * @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler.
442 * @returns {Observable} An observable sequence which results from the comonadic bind operation.
443 */
444 observableProto.manySelect = observableProto.extend = function (selector, scheduler) {
445 isScheduler(scheduler) || (scheduler = Rx.Scheduler.immediate);
446 var source = this;
447 return observableDefer(function () {
448 var chain;
449
450 return source
451 .map(function (x) {
452 var curr = new ChainObservable(x);
453
454 chain && chain.onNext(x);
455 chain = curr;
456
457 return curr;
458 })
459 .tap(
460 noop,
461 function (e) { chain && chain.onError(e); },
462 function () { chain && chain.onCompleted(); }
463 )
464 .observeOn(scheduler)
465 .map(selector);
466 }, source);
467 };
468
469 var ChainObservable = (function (__super__) {
470 inherits(ChainObservable, __super__);
471 function ChainObservable(head) {
472 __super__.call(this);
473 this.head = head;
474 this.tail = new AsyncSubject();
475 }
476
477 addProperties(ChainObservable.prototype, Observer, {
478 _subscribe: function (o) {
479 var g = new CompositeDisposable();
480 g.add(currentThreadScheduler.schedule(this, function (_, self) {
481 o.onNext(self.head);
482 g.add(self.tail.mergeAll().subscribe(o));
483 }));
484
485 return g;
486 },
487 onCompleted: function () {
488 this.onNext(Observable.empty());
489 },
490 onError: function (e) {
491 this.onNext(Observable['throw'](e));
492 },
493 onNext: function (v) {
494 this.tail.onNext(v);
495 this.tail.onCompleted();
496 }
497 });
498
499 return ChainObservable;
500
501 }(Observable));
502
503 var SwitchFirstObservable = (function (__super__) {
504 inherits(SwitchFirstObservable, __super__);
505 function SwitchFirstObservable(source) {
506 this.source = source;
507 __super__.call(this);
508 }
509
510 SwitchFirstObservable.prototype.subscribeCore = function (o) {
511 var m = new SingleAssignmentDisposable(),
512 g = new CompositeDisposable(),
513 state = {
514 hasCurrent: false,
515 isStopped: false,
516 o: o,
517 g: g
518 };
519
520 g.add(m);
521 m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state)));
522 return g;
523 };
524
525 return SwitchFirstObservable;
526 }(ObservableBase));
527
528 var SwitchFirstObserver = (function(__super__) {
529 inherits(SwitchFirstObserver, __super__);
530 function SwitchFirstObserver(state) {
531 this._s = state;
532 __super__.call(this);
533 }
534
535 SwitchFirstObserver.prototype.next = function (x) {
536 if (!this._s.hasCurrent) {
537 this._s.hasCurrent = true;
538 isPromise(x) && (x = observableFromPromise(x));
539 var inner = new SingleAssignmentDisposable();
540 this._s.g.add(inner);
541 inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner)));
542 }
543 };
544
545 SwitchFirstObserver.prototype.error = function (e) {
546 this._s.o.onError(e);
547 };
548
549 SwitchFirstObserver.prototype.completed = function () {
550 this._s.isStopped = true;
551 !this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted();
552 };
553
554 inherits(InnerObserver, __super__);
555 function InnerObserver(state, inner) {
556 this._s = state;
557 this._i = inner;
558 __super__.call(this);
559 }
560
561 InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); };
562 InnerObserver.prototype.error = function (e) { this._s.o.onError(e); };
563 InnerObserver.prototype.completed = function () {
564 this._s.g.remove(this._i);
565 this._s.hasCurrent = false;
566 this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted();
567 };
568
569 return SwitchFirstObserver;
570 }(AbstractObserver));
571
572 /**
573 * Performs a exclusive waiting for the first to finish before subscribing to another observable.
574 * Observables that come in between subscriptions will be dropped on the floor.
575 * @returns {Observable} A exclusive observable with only the results that happen when subscribed.
576 */
577 observableProto.switchFirst = function () {
578 return new SwitchFirstObservable(this);
579 };
580
581observableProto.flatMapFirst = observableProto.selectManyFirst = function(selector, resultSelector, thisArg) {
582 return new FlatMapObservable(this, selector, resultSelector, thisArg).switchFirst();
583};
584
585Rx.Observable.prototype.flatMapWithMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
586 return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit);
587};
588 return Rx;
589}));