UNPKG

19 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 // References
35 var Observable = Rx.Observable,
36 observableProto = Observable.prototype,
37 AnonymousObservable = Rx.AnonymousObservable,
38 AbstractObserver = Rx.internals.AbstractObserver,
39 CompositeDisposable = Rx.CompositeDisposable,
40 BinaryDisposable = Rx.BinaryDisposable,
41 NAryDisposable = Rx.NAryDisposable,
42 Notification = Rx.Notification,
43 Subject = Rx.Subject,
44 Observer = Rx.Observer,
45 disposableEmpty = Rx.Disposable.empty,
46 disposableCreate = Rx.Disposable.create,
47 inherits = Rx.internals.inherits,
48 addProperties = Rx.internals.addProperties,
49 defaultScheduler = Rx.Scheduler['default'],
50 currentThreadScheduler = Rx.Scheduler.currentThread,
51 identity = Rx.helpers.identity,
52 isScheduler = Rx.Scheduler.isScheduler,
53 isFunction = Rx.helpers.isFunction,
54 checkDisposed = Rx.Disposable.checkDisposed;
55
56 var errorObj = {e: {}};
57
58 function tryCatcherGen(tryCatchTarget) {
59 return function tryCatcher() {
60 try {
61 return tryCatchTarget.apply(this, arguments);
62 } catch (e) {
63 errorObj.e = e;
64 return errorObj;
65 }
66 };
67 }
68
69 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
70 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
71 return tryCatcherGen(fn);
72 };
73
74 function thrower(e) {
75 throw e;
76 }
77
78 /**
79 * Used to pause and resume streams.
80 */
81 Rx.Pauser = (function (__super__) {
82 inherits(Pauser, __super__);
83 function Pauser() {
84 __super__.call(this);
85 }
86
87 /**
88 * Pauses the underlying sequence.
89 */
90 Pauser.prototype.pause = function () { this.onNext(false); };
91
92 /**
93 * Resumes the underlying sequence.
94 */
95 Pauser.prototype.resume = function () { this.onNext(true); };
96
97 return Pauser;
98 }(Subject));
99
100 var PausableObservable = (function (__super__) {
101 inherits(PausableObservable, __super__);
102 function PausableObservable(source, pauser) {
103 this.source = source;
104 this.controller = new Subject();
105
106 if (pauser && pauser.subscribe) {
107 this.pauser = this.controller.merge(pauser);
108 } else {
109 this.pauser = this.controller;
110 }
111
112 __super__.call(this);
113 }
114
115 PausableObservable.prototype._subscribe = function (o) {
116 var conn = this.source.publish(),
117 subscription = conn.subscribe(o),
118 connection = disposableEmpty;
119
120 var pausable = this.pauser.distinctUntilChanged().subscribe(function (b) {
121 if (b) {
122 connection = conn.connect();
123 } else {
124 connection.dispose();
125 connection = disposableEmpty;
126 }
127 });
128
129 return new NAryDisposable([subscription, connection, pausable]);
130 };
131
132 PausableObservable.prototype.pause = function () {
133 this.controller.onNext(false);
134 };
135
136 PausableObservable.prototype.resume = function () {
137 this.controller.onNext(true);
138 };
139
140 return PausableObservable;
141
142 }(Observable));
143
144 /**
145 * Pauses the underlying observable sequence based upon the observable sequence which yields true/false.
146 * @example
147 * var pauser = new Rx.Subject();
148 * var source = Rx.Observable.interval(100).pausable(pauser);
149 * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
150 * @returns {Observable} The observable sequence which is paused based upon the pauser.
151 */
152 observableProto.pausable = function (pauser) {
153 return new PausableObservable(this, pauser);
154 };
155
156 function combineLatestSource(source, subject, resultSelector) {
157 return new AnonymousObservable(function (o) {
158 var hasValue = [false, false],
159 hasValueAll = false,
160 isDone = false,
161 values = new Array(2),
162 err;
163
164 function next(x, i) {
165 values[i] = x;
166 hasValue[i] = true;
167 if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
168 if (err) { return o.onError(err); }
169 var res = tryCatch(resultSelector).apply(null, values);
170 if (res === errorObj) { return o.onError(res.e); }
171 o.onNext(res);
172 }
173 isDone && values[1] && o.onCompleted();
174 }
175
176 return new BinaryDisposable(
177 source.subscribe(
178 function (x) {
179 next(x, 0);
180 },
181 function (e) {
182 if (values[1]) {
183 o.onError(e);
184 } else {
185 err = e;
186 }
187 },
188 function () {
189 isDone = true;
190 values[1] && o.onCompleted();
191 }),
192 subject.subscribe(
193 function (x) {
194 next(x, 1);
195 },
196 function (e) { o.onError(e); },
197 function () {
198 isDone = true;
199 next(true, 1);
200 })
201 );
202 }, source);
203 }
204
205 var PausableBufferedObservable = (function (__super__) {
206 inherits(PausableBufferedObservable, __super__);
207 function PausableBufferedObservable(source, pauser) {
208 this.source = source;
209 this.controller = new Subject();
210
211 if (pauser && pauser.subscribe) {
212 this.pauser = this.controller.merge(pauser);
213 } else {
214 this.pauser = this.controller;
215 }
216
217 __super__.call(this);
218 }
219
220 PausableBufferedObservable.prototype._subscribe = function (o) {
221 var q = [], previousShouldFire;
222
223 function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }
224
225 var subscription =
226 combineLatestSource(
227 this.source,
228 this.pauser.startWith(false).distinctUntilChanged(),
229 function (data, shouldFire) {
230 return { data: data, shouldFire: shouldFire };
231 })
232 .subscribe(
233 function (results) {
234 if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) {
235 previousShouldFire = results.shouldFire;
236 // change in shouldFire
237 if (results.shouldFire) { drainQueue(); }
238 } else {
239 previousShouldFire = results.shouldFire;
240 // new data
241 if (results.shouldFire) {
242 o.onNext(results.data);
243 } else {
244 q.push(results.data);
245 }
246 }
247 },
248 function (err) {
249 drainQueue();
250 o.onError(err);
251 },
252 function () {
253 drainQueue();
254 o.onCompleted();
255 }
256 );
257 return subscription;
258 };
259
260 PausableBufferedObservable.prototype.pause = function () {
261 this.controller.onNext(false);
262 };
263
264 PausableBufferedObservable.prototype.resume = function () {
265 this.controller.onNext(true);
266 };
267
268 return PausableBufferedObservable;
269
270 }(Observable));
271
272 /**
273 * Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
274 * and yields the values that were buffered while paused.
275 * @example
276 * var pauser = new Rx.Subject();
277 * var source = Rx.Observable.interval(100).pausableBuffered(pauser);
278 * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
279 * @returns {Observable} The observable sequence which is paused based upon the pauser.
280 */
281 observableProto.pausableBuffered = function (pauser) {
282 return new PausableBufferedObservable(this, pauser);
283 };
284
285 var ControlledObservable = (function (__super__) {
286 inherits(ControlledObservable, __super__);
287 function ControlledObservable (source, enableQueue, scheduler) {
288 __super__.call(this);
289 this.subject = new ControlledSubject(enableQueue, scheduler);
290 this.source = source.multicast(this.subject).refCount();
291 }
292
293 ControlledObservable.prototype._subscribe = function (o) {
294 return this.source.subscribe(o);
295 };
296
297 ControlledObservable.prototype.request = function (numberOfItems) {
298 return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
299 };
300
301 return ControlledObservable;
302
303 }(Observable));
304
305 var ControlledSubject = (function (__super__) {
306 inherits(ControlledSubject, __super__);
307 function ControlledSubject(enableQueue, scheduler) {
308 enableQueue == null && (enableQueue = true);
309
310 __super__.call(this);
311 this.subject = new Subject();
312 this.enableQueue = enableQueue;
313 this.queue = enableQueue ? [] : null;
314 this.requestedCount = 0;
315 this.requestedDisposable = null;
316 this.error = null;
317 this.hasFailed = false;
318 this.hasCompleted = false;
319 this.scheduler = scheduler || currentThreadScheduler;
320 }
321
322 addProperties(ControlledSubject.prototype, Observer, {
323 _subscribe: function (o) {
324 return this.subject.subscribe(o);
325 },
326 onCompleted: function () {
327 this.hasCompleted = true;
328 if (!this.enableQueue || this.queue.length === 0) {
329 this.subject.onCompleted();
330 this.disposeCurrentRequest();
331 } else {
332 this.queue.push(Notification.createOnCompleted());
333 }
334 },
335 onError: function (error) {
336 this.hasFailed = true;
337 this.error = error;
338 if (!this.enableQueue || this.queue.length === 0) {
339 this.subject.onError(error);
340 this.disposeCurrentRequest();
341 } else {
342 this.queue.push(Notification.createOnError(error));
343 }
344 },
345 onNext: function (value) {
346 if (this.requestedCount <= 0) {
347 this.enableQueue && this.queue.push(Notification.createOnNext(value));
348 } else {
349 (this.requestedCount-- === 0) && this.disposeCurrentRequest();
350 this.subject.onNext(value);
351 }
352 },
353 _processRequest: function (numberOfItems) {
354 if (this.enableQueue) {
355 while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
356 var first = this.queue.shift();
357 first.accept(this.subject);
358 if (first.kind === 'N') {
359 numberOfItems--;
360 } else {
361 this.disposeCurrentRequest();
362 this.queue = [];
363 }
364 }
365 }
366
367 return numberOfItems;
368 },
369 request: function (number) {
370 this.disposeCurrentRequest();
371 var self = this;
372
373 this.requestedDisposable = this.scheduler.schedule(number,
374 function(s, i) {
375 var remaining = self._processRequest(i);
376 var stopped = self.hasCompleted || self.hasFailed;
377 if (!stopped && remaining > 0) {
378 self.requestedCount = remaining;
379
380 return disposableCreate(function () {
381 self.requestedCount = 0;
382 });
383 // Scheduled item is still in progress. Return a new
384 // disposable to allow the request to be interrupted
385 // via dispose.
386 }
387 });
388
389 return this.requestedDisposable;
390 },
391 disposeCurrentRequest: function () {
392 if (this.requestedDisposable) {
393 this.requestedDisposable.dispose();
394 this.requestedDisposable = null;
395 }
396 }
397 });
398
399 return ControlledSubject;
400 }(Observable));
401
402 /**
403 * Attaches a controller to the observable sequence with the ability to queue.
404 * @example
405 * var source = Rx.Observable.interval(100).controlled();
406 * source.request(3); // Reads 3 values
407 * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
408 * @param {Scheduler} scheduler determines how the requests will be scheduled
409 * @returns {Observable} The observable sequence which only propagates values on request.
410 */
411 observableProto.controlled = function (enableQueue, scheduler) {
412
413 if (enableQueue && isScheduler(enableQueue)) {
414 scheduler = enableQueue;
415 enableQueue = true;
416 }
417
418 if (enableQueue == null) { enableQueue = true; }
419 return new ControlledObservable(this, enableQueue, scheduler);
420 };
421
422 var StopAndWaitObservable = (function (__super__) {
423 inherits(StopAndWaitObservable, __super__);
424 function StopAndWaitObservable (source) {
425 __super__.call(this);
426 this.source = source;
427 }
428
429 function scheduleMethod(s, self) {
430 self.source.request(1);
431 }
432
433 StopAndWaitObservable.prototype._subscribe = function (o) {
434 this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription));
435 return new BinaryDisposable(
436 this.subscription,
437 defaultScheduler.schedule(this, scheduleMethod)
438 );
439 };
440
441 var StopAndWaitObserver = (function (__sub__) {
442 inherits(StopAndWaitObserver, __sub__);
443 function StopAndWaitObserver (observer, observable, cancel) {
444 __sub__.call(this);
445 this.observer = observer;
446 this.observable = observable;
447 this.cancel = cancel;
448 this.scheduleDisposable = null;
449 }
450
451 StopAndWaitObserver.prototype.completed = function () {
452 this.observer.onCompleted();
453 this.dispose();
454 };
455
456 StopAndWaitObserver.prototype.error = function (error) {
457 this.observer.onError(error);
458 this.dispose();
459 };
460
461 function innerScheduleMethod(s, self) {
462 self.observable.source.request(1);
463 }
464
465 StopAndWaitObserver.prototype.next = function (value) {
466 this.observer.onNext(value);
467 this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod);
468 };
469
470 StopAndWaitObservable.dispose = function () {
471 this.observer = null;
472 if (this.cancel) {
473 this.cancel.dispose();
474 this.cancel = null;
475 }
476 if (this.scheduleDisposable) {
477 this.scheduleDisposable.dispose();
478 this.scheduleDisposable = null;
479 }
480 __sub__.prototype.dispose.call(this);
481 };
482
483 return StopAndWaitObserver;
484 }(AbstractObserver));
485
486 return StopAndWaitObservable;
487 }(Observable));
488
489
490 /**
491 * Attaches a stop and wait observable to the current observable.
492 * @returns {Observable} A stop and wait observable.
493 */
494 ControlledObservable.prototype.stopAndWait = function () {
495 return new StopAndWaitObservable(this);
496 };
497
498 var WindowedObservable = (function (__super__) {
499 inherits(WindowedObservable, __super__);
500 function WindowedObservable(source, windowSize) {
501 __super__.call(this);
502 this.source = source;
503 this.windowSize = windowSize;
504 }
505
506 function scheduleMethod(s, self) {
507 self.source.request(self.windowSize);
508 }
509
510 WindowedObservable.prototype._subscribe = function (o) {
511 this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription));
512 return new BinaryDisposable(
513 this.subscription,
514 defaultScheduler.schedule(this, scheduleMethod)
515 );
516 };
517
518 var WindowedObserver = (function (__sub__) {
519 inherits(WindowedObserver, __sub__);
520 function WindowedObserver(observer, observable, cancel) {
521 this.observer = observer;
522 this.observable = observable;
523 this.cancel = cancel;
524 this.received = 0;
525 this.scheduleDisposable = null;
526 __sub__.call(this);
527 }
528
529 WindowedObserver.prototype.completed = function () {
530 this.observer.onCompleted();
531 this.dispose();
532 };
533
534 WindowedObserver.prototype.error = function (error) {
535 this.observer.onError(error);
536 this.dispose();
537 };
538
539 function innerScheduleMethod(s, self) {
540 self.observable.source.request(self.observable.windowSize);
541 }
542
543 WindowedObserver.prototype.next = function (value) {
544 this.observer.onNext(value);
545 this.received = ++this.received % this.observable.windowSize;
546 this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod));
547 };
548
549 WindowedObserver.prototype.dispose = function () {
550 this.observer = null;
551 if (this.cancel) {
552 this.cancel.dispose();
553 this.cancel = null;
554 }
555 if (this.scheduleDisposable) {
556 this.scheduleDisposable.dispose();
557 this.scheduleDisposable = null;
558 }
559 __sub__.prototype.dispose.call(this);
560 };
561
562 return WindowedObserver;
563 }(AbstractObserver));
564
565 return WindowedObservable;
566 }(Observable));
567
568 /**
569 * Creates a sliding windowed observable based upon the window size.
570 * @param {Number} windowSize The number of items in the window
571 * @returns {Observable} A windowed observable based upon the window size.
572 */
573 ControlledObservable.prototype.windowed = function (windowSize) {
574 return new WindowedObservable(this, windowSize);
575 };
576
577 /**
578 * Pipes the existing Observable sequence into a Node.js Stream.
579 * @param {Stream} dest The destination Node.js stream.
580 * @returns {Stream} The destination stream.
581 */
582 observableProto.pipe = function (dest) {
583 var source = this.pausableBuffered();
584
585 function onDrain() {
586 source.resume();
587 }
588
589 dest.addListener('drain', onDrain);
590
591 source.subscribe(
592 function (x) {
593 !dest.write(String(x)) && source.pause();
594 },
595 function (err) {
596 dest.emit('error', err);
597 },
598 function () {
599 // Hack check because STDIO is not closable
600 !dest._isStdio && dest.end();
601 dest.removeListener('drain', onDrain);
602 });
603
604 source.resume();
605
606 return dest;
607 };
608
609 return Rx;
610}));