UNPKG

24 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 var Observable = Rx.Observable,
35 ObservableBase = Rx.ObservableBase,
36 AbstractObserver = Rx.internals.AbstractObserver,
37 CompositeDisposable = Rx.CompositeDisposable,
38 BinaryDisposable = Rx.BinaryDisposable,
39 RefCountDisposable = Rx.RefCountDisposable,
40 SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
41 SerialDisposable = Rx.SerialDisposable,
42 Subject = Rx.Subject,
43 observableProto = Observable.prototype,
44 observableEmpty = Observable.empty,
45 observableNever = Observable.never,
46 AnonymousObservable = Rx.AnonymousObservable,
47 addRef = Rx.internals.addRef,
48 inherits = Rx.internals.inherits,
49 bindCallback = Rx.internals.bindCallback,
50 noop = Rx.helpers.noop,
51 isPromise = Rx.helpers.isPromise,
52 isFunction = Rx.helpers.isFunction,
53 observableFromPromise = Observable.fromPromise;
54
55 var errorObj = {e: {}};
56
57 function tryCatcherGen(tryCatchTarget) {
58 return function tryCatcher() {
59 try {
60 return tryCatchTarget.apply(this, arguments);
61 } catch (e) {
62 errorObj.e = e;
63 return errorObj;
64 }
65 };
66 }
67
68 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
69 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
70 return tryCatcherGen(fn);
71 };
72
73 function thrower(e) {
74 throw e;
75 }
76
77 var Map = root.Map || (function () {
78 function Map() {
79 this.size = 0;
80 this._values = [];
81 this._keys = [];
82 }
83
84 Map.prototype['delete'] = function (key) {
85 var i = this._keys.indexOf(key);
86 if (i === -1) { return false; }
87 this._values.splice(i, 1);
88 this._keys.splice(i, 1);
89 this.size--;
90 return true;
91 };
92
93 Map.prototype.get = function (key) {
94 var i = this._keys.indexOf(key);
95 return i === -1 ? undefined : this._values[i];
96 };
97
98 Map.prototype.set = function (key, value) {
99 var i = this._keys.indexOf(key);
100 if (i === -1) {
101 this._keys.push(key);
102 this._values.push(value);
103 this.size++;
104 } else {
105 this._values[i] = value;
106 }
107 return this;
108 };
109
110 Map.prototype.forEach = function (cb, thisArg) {
111 for (var i = 0; i < this.size; i++) {
112 cb.call(thisArg, this._values[i], this._keys[i]);
113 }
114 };
115
116 return Map;
117 }());
118
119 /**
120 * Correlates the elements of two sequences based on overlapping durations.
121 *
122 * @param {Observable} right The right observable sequence to join elements for.
123 * @param {Function} leftDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the left observable sequence, used to determine overlap.
124 * @param {Function} rightDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the right observable sequence, used to determine overlap.
125 * @param {Function} resultSelector A function invoked to compute a result element for any two overlapping elements of the left and right observable sequences. The parameters passed to the function correspond with the elements from the left and right source sequences for which overlap occurs.
126 * @returns {Observable} An observable sequence that contains result elements computed from source elements that have an overlapping duration.
127 */
128 observableProto.join = function (right, leftDurationSelector, rightDurationSelector, resultSelector) {
129 var left = this;
130 return new AnonymousObservable(function (o) {
131 var group = new CompositeDisposable();
132 var leftDone = false, rightDone = false;
133 var leftId = 0, rightId = 0;
134 var leftMap = new Map(), rightMap = new Map();
135 var handleError = function (e) { o.onError(e); };
136
137 group.add(left.subscribe(
138 function (value) {
139 var id = leftId++, md = new SingleAssignmentDisposable();
140
141 leftMap.set(id, value);
142 group.add(md);
143
144 var duration = tryCatch(leftDurationSelector)(value);
145 if (duration === errorObj) { return o.onError(duration.e); }
146
147 md.setDisposable(duration.take(1).subscribe(
148 noop,
149 handleError,
150 function () {
151 leftMap['delete'](id) && leftMap.size === 0 && leftDone && o.onCompleted();
152 group.remove(md);
153 }));
154
155 rightMap.forEach(function (v) {
156 var result = tryCatch(resultSelector)(value, v);
157 if (result === errorObj) { return o.onError(result.e); }
158 o.onNext(result);
159 });
160 },
161 handleError,
162 function () {
163 leftDone = true;
164 (rightDone || leftMap.size === 0) && o.onCompleted();
165 })
166 );
167
168 group.add(right.subscribe(
169 function (value) {
170 var id = rightId++, md = new SingleAssignmentDisposable();
171
172 rightMap.set(id, value);
173 group.add(md);
174
175 var duration = tryCatch(rightDurationSelector)(value);
176 if (duration === errorObj) { return o.onError(duration.e); }
177
178 md.setDisposable(duration.take(1).subscribe(
179 noop,
180 handleError,
181 function () {
182 rightMap['delete'](id) && rightMap.size === 0 && rightDone && o.onCompleted();
183 group.remove(md);
184 }));
185
186 leftMap.forEach(function (v) {
187 var result = tryCatch(resultSelector)(v, value);
188 if (result === errorObj) { return o.onError(result.e); }
189 o.onNext(result);
190 });
191 },
192 handleError,
193 function () {
194 rightDone = true;
195 (leftDone || rightMap.size === 0) && o.onCompleted();
196 })
197 );
198 return group;
199 }, left);
200 };
201
202 /**
203 * Correlates the elements of two sequences based on overlapping durations, and groups the results.
204 *
205 * @param {Observable} right The right observable sequence to join elements for.
206 * @param {Function} leftDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the left observable sequence, used to determine overlap.
207 * @param {Function} rightDurationSelector A function to select the duration (expressed as an observable sequence) of each element of the right observable sequence, used to determine overlap.
208 * @param {Function} resultSelector A function invoked to compute a result element for any element of the left sequence with overlapping elements from the right observable sequence. The first parameter passed to the function is an element of the left sequence. The second parameter passed to the function is an observable sequence with elements from the right sequence that overlap with the left sequence's element.
209 * @returns {Observable} An observable sequence that contains result elements computed from source elements that have an overlapping duration.
210 */
211 observableProto.groupJoin = function (right, leftDurationSelector, rightDurationSelector, resultSelector) {
212 var left = this;
213 return new AnonymousObservable(function (o) {
214 var group = new CompositeDisposable();
215 var r = new RefCountDisposable(group);
216 var leftMap = new Map(), rightMap = new Map();
217 var leftId = 0, rightId = 0;
218 var handleError = function (e) { return function (v) { v.onError(e); }; };
219
220 function handleError(e) { };
221
222 group.add(left.subscribe(
223 function (value) {
224 var s = new Subject();
225 var id = leftId++;
226 leftMap.set(id, s);
227
228 var result = tryCatch(resultSelector)(value, addRef(s, r));
229 if (result === errorObj) {
230 leftMap.forEach(handleError(result.e));
231 return o.onError(result.e);
232 }
233 o.onNext(result);
234
235 rightMap.forEach(function (v) { s.onNext(v); });
236
237 var md = new SingleAssignmentDisposable();
238 group.add(md);
239
240 var duration = tryCatch(leftDurationSelector)(value);
241 if (duration === errorObj) {
242 leftMap.forEach(handleError(duration.e));
243 return o.onError(duration.e);
244 }
245
246 md.setDisposable(duration.take(1).subscribe(
247 noop,
248 function (e) {
249 leftMap.forEach(handleError(e));
250 o.onError(e);
251 },
252 function () {
253 leftMap['delete'](id) && s.onCompleted();
254 group.remove(md);
255 }));
256 },
257 function (e) {
258 leftMap.forEach(handleError(e));
259 o.onError(e);
260 },
261 function () { o.onCompleted(); })
262 );
263
264 group.add(right.subscribe(
265 function (value) {
266 var id = rightId++;
267 rightMap.set(id, value);
268
269 var md = new SingleAssignmentDisposable();
270 group.add(md);
271
272 var duration = tryCatch(rightDurationSelector)(value);
273 if (duration === errorObj) {
274 leftMap.forEach(handleError(duration.e));
275 return o.onError(duration.e);
276 }
277
278 md.setDisposable(duration.take(1).subscribe(
279 noop,
280 function (e) {
281 leftMap.forEach(handleError(e));
282 o.onError(e);
283 },
284 function () {
285 rightMap['delete'](id);
286 group.remove(md);
287 }));
288
289 leftMap.forEach(function (v) { v.onNext(value); });
290 },
291 function (e) {
292 leftMap.forEach(handleError(e));
293 o.onError(e);
294 })
295 );
296
297 return r;
298 }, left);
299 };
300
301 function toArray(x) { return x.toArray(); }
302
303 /**
304 * Projects each element of an observable sequence into zero or more buffers.
305 * @param {Mixed} bufferOpeningsOrClosingSelector Observable sequence whose elements denote the creation of new windows, or, a function invoked to define the boundaries of the produced windows (a new window is started when the previous one is closed, resulting in non-overlapping windows).
306 * @param {Function} [bufferClosingSelector] A function invoked to define the closing of each produced window. If a closing selector function is specified for the first parameter, this parameter is ignored.
307 * @returns {Observable} An observable sequence of windows.
308 */
309 observableProto.buffer = function () {
310 return this.window.apply(this, arguments)
311 .flatMap(toArray);
312 };
313
314 /**
315 * Projects each element of an observable sequence into zero or more windows.
316 *
317 * @param {Mixed} windowOpeningsOrClosingSelector Observable sequence whose elements denote the creation of new windows, or, a function invoked to define the boundaries of the produced windows (a new window is started when the previous one is closed, resulting in non-overlapping windows).
318 * @param {Function} [windowClosingSelector] A function invoked to define the closing of each produced window. If a closing selector function is specified for the first parameter, this parameter is ignored.
319 * @returns {Observable} An observable sequence of windows.
320 */
321 observableProto.window = function (windowOpeningsOrClosingSelector, windowClosingSelector) {
322 if (arguments.length === 1 && typeof arguments[0] !== 'function') {
323 return observableWindowWithBoundaries.call(this, windowOpeningsOrClosingSelector);
324 }
325 return typeof windowOpeningsOrClosingSelector === 'function' ?
326 observableWindowWithClosingSelector.call(this, windowOpeningsOrClosingSelector) :
327 observableWindowWithOpenings.call(this, windowOpeningsOrClosingSelector, windowClosingSelector);
328 };
329
330 function observableWindowWithOpenings(windowOpenings, windowClosingSelector) {
331 return windowOpenings.groupJoin(this, windowClosingSelector, observableEmpty, function (_, win) {
332 return win;
333 });
334 }
335
336 function observableWindowWithBoundaries(windowBoundaries) {
337 var source = this;
338 return new AnonymousObservable(function (observer) {
339 var win = new Subject(),
340 d = new CompositeDisposable(),
341 r = new RefCountDisposable(d);
342
343 observer.onNext(addRef(win, r));
344
345 d.add(source.subscribe(function (x) {
346 win.onNext(x);
347 }, function (err) {
348 win.onError(err);
349 observer.onError(err);
350 }, function () {
351 win.onCompleted();
352 observer.onCompleted();
353 }));
354
355 isPromise(windowBoundaries) && (windowBoundaries = observableFromPromise(windowBoundaries));
356
357 d.add(windowBoundaries.subscribe(function (w) {
358 win.onCompleted();
359 win = new Subject();
360 observer.onNext(addRef(win, r));
361 }, function (err) {
362 win.onError(err);
363 observer.onError(err);
364 }, function () {
365 win.onCompleted();
366 observer.onCompleted();
367 }));
368
369 return r;
370 }, source);
371 }
372
373 function observableWindowWithClosingSelector(windowClosingSelector) {
374 var source = this;
375 return new AnonymousObservable(function (observer) {
376 var m = new SerialDisposable(),
377 d = new CompositeDisposable(m),
378 r = new RefCountDisposable(d),
379 win = new Subject();
380 observer.onNext(addRef(win, r));
381 d.add(source.subscribe(function (x) {
382 win.onNext(x);
383 }, function (err) {
384 win.onError(err);
385 observer.onError(err);
386 }, function () {
387 win.onCompleted();
388 observer.onCompleted();
389 }));
390
391 function createWindowClose () {
392 var windowClose;
393 try {
394 windowClose = windowClosingSelector();
395 } catch (e) {
396 observer.onError(e);
397 return;
398 }
399
400 isPromise(windowClose) && (windowClose = observableFromPromise(windowClose));
401
402 var m1 = new SingleAssignmentDisposable();
403 m.setDisposable(m1);
404 m1.setDisposable(windowClose.take(1).subscribe(noop, function (err) {
405 win.onError(err);
406 observer.onError(err);
407 }, function () {
408 win.onCompleted();
409 win = new Subject();
410 observer.onNext(addRef(win, r));
411 createWindowClose();
412 }));
413 }
414
415 createWindowClose();
416 return r;
417 }, source);
418 }
419
420 var PairwiseObservable = (function (__super__) {
421 inherits(PairwiseObservable, __super__);
422 function PairwiseObservable(source) {
423 this.source = source;
424 __super__.call(this);
425 }
426
427 PairwiseObservable.prototype.subscribeCore = function (o) {
428 return this.source.subscribe(new PairwiseObserver(o));
429 };
430
431 return PairwiseObservable;
432 }(ObservableBase));
433
434 var PairwiseObserver = (function(__super__) {
435 inherits(PairwiseObserver, __super__);
436 function PairwiseObserver(o) {
437 this._o = o;
438 this._p = null;
439 this._hp = false;
440 }
441
442 PairwiseObserver.prototype.next = function (x) {
443 if (this._hp) {
444 this._o.onNext([this._p, x]);
445 } else {
446 this._hp = true;
447 }
448 this._p = x;
449 };
450 PairwiseObserver.prototype.error = function (err) { this._o.onError(err); };
451 PairwiseObserver.prototype.completed = function () { this._o.onCompleted(); };
452
453 return PairwiseObserver;
454 }(AbstractObserver));
455
456 /**
457 * Returns a new observable that triggers on the second and subsequent triggerings of the input observable.
458 * The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair.
459 * The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.
460 * @returns {Observable} An observable that triggers on successive pairs of observations from the input observable as an array.
461 */
462 observableProto.pairwise = function () {
463 return new PairwiseObservable(this);
464 };
465
466 /**
467 * Returns two observables which partition the observations of the source by the given function.
468 * The first will trigger observations for those values for which the predicate returns true.
469 * The second will trigger observations for those values where the predicate returns false.
470 * The predicate is executed once for each subscribed observer.
471 * Both also propagate all error observations arising from the source and each completes
472 * when the source completes.
473 * @param {Function} predicate
474 * The function to determine which output Observable will trigger a particular observation.
475 * @returns {Array}
476 * An array of observables. The first triggers when the predicate returns true,
477 * and the second triggers when the predicate returns false.
478 */
479 observableProto.partition = function(predicate, thisArg) {
480 var fn = bindCallback(predicate, thisArg, 3);
481 return [
482 this.filter(predicate, thisArg),
483 this.filter(function (x, i, o) { return !fn(x, i, o); })
484 ];
485 };
486
487 /**
488 * Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
489 *
490 * @example
491 * var res = observable.groupBy(function (x) { return x.id; });
492 * 2 - observable.groupBy(function (x) { return x.id; }), function (x) { return x.name; });
493 * 3 - observable.groupBy(function (x) { return x.id; }), function (x) { return x.name; }, function (x) { return x.toString(); });
494 * @param {Function} keySelector A function to extract the key for each element.
495 * @param {Function} [elementSelector] A function to map each source element to an element in an observable group.
496 * @returns {Observable} A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
497 */
498 observableProto.groupBy = function (keySelector, elementSelector) {
499 return this.groupByUntil(keySelector, elementSelector, observableNever);
500 };
501
502 /**
503 * Groups the elements of an observable sequence according to a specified key selector function.
504 * A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
505 * key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
506 *
507 * @example
508 * var res = observable.groupByUntil(function (x) { return x.id; }, null, function () { return Rx.Observable.never(); });
509 * 2 - observable.groupBy(function (x) { return x.id; }), function (x) { return x.name; }, function () { return Rx.Observable.never(); });
510 * 3 - observable.groupBy(function (x) { return x.id; }), function (x) { return x.name; }, function () { return Rx.Observable.never(); }, function (x) { return x.toString(); });
511 * @param {Function} keySelector A function to extract the key for each element.
512 * @param {Function} durationSelector A function to signal the expiration of a group.
513 * @returns {Observable}
514 * A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
515 * If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
516 *
517 */
518 observableProto.groupByUntil = function (keySelector, elementSelector, durationSelector) {
519 var source = this;
520 return new AnonymousObservable(function (o) {
521 var map = new Map(),
522 groupDisposable = new CompositeDisposable(),
523 refCountDisposable = new RefCountDisposable(groupDisposable),
524 handleError = function (e) { return function (item) { item.onError(e); }; };
525
526 groupDisposable.add(
527 source.subscribe(function (x) {
528 var key = tryCatch(keySelector)(x);
529 if (key === errorObj) {
530 map.forEach(handleError(key.e));
531 return o.onError(key.e);
532 }
533
534 var fireNewMapEntry = false, writer = map.get(key);
535 if (writer === undefined) {
536 writer = new Subject();
537 map.set(key, writer);
538 fireNewMapEntry = true;
539 }
540
541 if (fireNewMapEntry) {
542 var group = new GroupedObservable(key, writer, refCountDisposable),
543 durationGroup = new GroupedObservable(key, writer);
544 var duration = tryCatch(durationSelector)(durationGroup);
545 if (duration === errorObj) {
546 map.forEach(handleError(duration.e));
547 return o.onError(duration.e);
548 }
549
550 o.onNext(group);
551
552 var md = new SingleAssignmentDisposable();
553 groupDisposable.add(md);
554
555 md.setDisposable(duration.take(1).subscribe(
556 noop,
557 function (e) {
558 map.forEach(handleError(e));
559 o.onError(e);
560 },
561 function () {
562 if (map['delete'](key)) { writer.onCompleted(); }
563 groupDisposable.remove(md);
564 }));
565 }
566
567 var element = x;
568 if (isFunction(elementSelector)) {
569 element = tryCatch(elementSelector)(x);
570 if (element === errorObj) {
571 map.forEach(handleError(element.e));
572 return o.onError(element.e);
573 }
574 }
575
576 writer.onNext(element);
577 }, function (e) {
578 map.forEach(handleError(e));
579 o.onError(e);
580 }, function () {
581 map.forEach(function (item) { item.onCompleted(); });
582 o.onCompleted();
583 }));
584
585 return refCountDisposable;
586 }, source);
587 };
588
589 var UnderlyingObservable = (function (__super__) {
590 inherits(UnderlyingObservable, __super__);
591 function UnderlyingObservable(m, u) {
592 this._m = m;
593 this._u = u;
594 __super__.call(this);
595 }
596
597 UnderlyingObservable.prototype.subscribeCore = function (o) {
598 return new BinaryDisposable(this._m.getDisposable(), this._u.subscribe(o));
599 };
600
601 return UnderlyingObservable;
602 }(ObservableBase));
603
604 var GroupedObservable = (function (__super__) {
605 inherits(GroupedObservable, __super__);
606 function GroupedObservable(key, underlyingObservable, mergedDisposable) {
607 __super__.call(this);
608 this.key = key;
609 this.underlyingObservable = !mergedDisposable ?
610 underlyingObservable :
611 new UnderlyingObservable(mergedDisposable, underlyingObservable);
612 }
613
614 GroupedObservable.prototype._subscribe = function (o) {
615 return this.underlyingObservable.subscribe(o);
616 };
617
618 return GroupedObservable;
619 }(Observable));
620
621 return Rx;
622}));