UNPKG

54.2 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 // Refernces
35 var inherits = Rx.internals.inherits,
36 AbstractObserver = Rx.internals.AbstractObserver,
37 Observable = Rx.Observable,
38 observableProto = Observable.prototype,
39 AnonymousObservable = Rx.AnonymousObservable,
40 ObservableBase = Rx.ObservableBase,
41 observableDefer = Observable.defer,
42 observableEmpty = Observable.empty,
43 observableNever = Observable.never,
44 observableThrow = Observable['throw'],
45 observableFromArray = Observable.fromArray,
46 defaultScheduler = Rx.Scheduler['default'],
47 SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
48 SerialDisposable = Rx.SerialDisposable,
49 CompositeDisposable = Rx.CompositeDisposable,
50 BinaryDisposable = Rx.BinaryDisposable,
51 RefCountDisposable = Rx.RefCountDisposable,
52 Subject = Rx.Subject,
53 addRef = Rx.internals.addRef,
54 normalizeTime = Rx.Scheduler.normalize,
55 helpers = Rx.helpers,
56 isPromise = helpers.isPromise,
57 isFunction = helpers.isFunction,
58 isScheduler = Rx.Scheduler.isScheduler,
59 observableFromPromise = Observable.fromPromise;
60
61 var errorObj = {e: {}};
62
63 function tryCatcherGen(tryCatchTarget) {
64 return function tryCatcher() {
65 try {
66 return tryCatchTarget.apply(this, arguments);
67 } catch (e) {
68 errorObj.e = e;
69 return errorObj;
70 }
71 };
72 }
73
74 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
75 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
76 return tryCatcherGen(fn);
77 };
78
79 function thrower(e) {
80 throw e;
81 }
82
83 var TimerObservable = (function(__super__) {
84 inherits(TimerObservable, __super__);
85 function TimerObservable(dt, s) {
86 this._dt = dt;
87 this._s = s;
88 __super__.call(this);
89 }
90
91 TimerObservable.prototype.subscribeCore = function (o) {
92 return this._s.scheduleFuture(o, this._dt, scheduleMethod);
93 };
94
95 function scheduleMethod(s, o) {
96 o.onNext(0);
97 o.onCompleted();
98 }
99
100 return TimerObservable;
101 }(ObservableBase));
102
103 function _observableTimer(dueTime, scheduler) {
104 return new TimerObservable(dueTime, scheduler);
105 }
106
107 function observableTimerDateAndPeriod(dueTime, period, scheduler) {
108 return new AnonymousObservable(function (observer) {
109 var d = dueTime, p = normalizeTime(period);
110 return scheduler.scheduleRecursiveFuture(0, d, function (count, self) {
111 if (p > 0) {
112 var now = scheduler.now();
113 d = new Date(d.getTime() + p);
114 d.getTime() <= now && (d = new Date(now + p));
115 }
116 observer.onNext(count);
117 self(count + 1, new Date(d));
118 });
119 });
120 }
121
122 function observableTimerTimeSpanAndPeriod(dueTime, period, scheduler) {
123 return dueTime === period ?
124 new AnonymousObservable(function (observer) {
125 return scheduler.schedulePeriodic(0, period, function (count) {
126 observer.onNext(count);
127 return count + 1;
128 });
129 }) :
130 observableDefer(function () {
131 return observableTimerDateAndPeriod(new Date(scheduler.now() + dueTime), period, scheduler);
132 });
133 }
134
135 /**
136 * Returns an observable sequence that produces a value after each period.
137 *
138 * @example
139 * 1 - res = Rx.Observable.interval(1000);
140 * 2 - res = Rx.Observable.interval(1000, Rx.Scheduler.timeout);
141 *
142 * @param {Number} period Period for producing the values in the resulting sequence (specified as an integer denoting milliseconds).
143 * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, Rx.Scheduler.timeout is used.
144 * @returns {Observable} An observable sequence that produces a value after each period.
145 */
146 var observableinterval = Observable.interval = function (period, scheduler) {
147 return observableTimerTimeSpanAndPeriod(period, period, isScheduler(scheduler) ? scheduler : defaultScheduler);
148 };
149
150 /**
151 * Returns an observable sequence that produces a value after dueTime has elapsed and then after each period.
152 * @param {Number} dueTime Absolute (specified as a Date object) or relative time (specified as an integer denoting milliseconds) at which to produce the first value.
153 * @param {Mixed} [periodOrScheduler] Period to produce subsequent values (specified as an integer denoting milliseconds), or the scheduler to run the timer on. If not specified, the resulting timer is not recurring.
154 * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, the timeout scheduler is used.
155 * @returns {Observable} An observable sequence that produces a value after due time has elapsed and then each period.
156 */
157 var observableTimer = Observable.timer = function (dueTime, periodOrScheduler, scheduler) {
158 var period;
159 isScheduler(scheduler) || (scheduler = defaultScheduler);
160 if (periodOrScheduler != null && typeof periodOrScheduler === 'number') {
161 period = periodOrScheduler;
162 } else if (isScheduler(periodOrScheduler)) {
163 scheduler = periodOrScheduler;
164 }
165 if ((dueTime instanceof Date || typeof dueTime === 'number') && period === undefined) {
166 return _observableTimer(dueTime, scheduler);
167 }
168 if (dueTime instanceof Date && period !== undefined) {
169 return observableTimerDateAndPeriod(dueTime.getTime(), periodOrScheduler, scheduler);
170 }
171 return observableTimerTimeSpanAndPeriod(dueTime, period, scheduler);
172 };
173
174 function observableDelayRelative(source, dueTime, scheduler) {
175 return new AnonymousObservable(function (o) {
176 var active = false,
177 cancelable = new SerialDisposable(),
178 exception = null,
179 q = [],
180 running = false,
181 subscription;
182 subscription = source.materialize().timestamp(scheduler).subscribe(function (notification) {
183 var d, shouldRun;
184 if (notification.value.kind === 'E') {
185 q = [];
186 q.push(notification);
187 exception = notification.value.error;
188 shouldRun = !running;
189 } else {
190 q.push({ value: notification.value, timestamp: notification.timestamp + dueTime });
191 shouldRun = !active;
192 active = true;
193 }
194 if (shouldRun) {
195 if (exception !== null) {
196 o.onError(exception);
197 } else {
198 d = new SingleAssignmentDisposable();
199 cancelable.setDisposable(d);
200 d.setDisposable(scheduler.scheduleRecursiveFuture(null, dueTime, function (_, self) {
201 var e, recurseDueTime, result, shouldRecurse;
202 if (exception !== null) {
203 return;
204 }
205 running = true;
206 do {
207 result = null;
208 if (q.length > 0 && q[0].timestamp - scheduler.now() <= 0) {
209 result = q.shift().value;
210 }
211 if (result !== null) {
212 result.accept(o);
213 }
214 } while (result !== null);
215 shouldRecurse = false;
216 recurseDueTime = 0;
217 if (q.length > 0) {
218 shouldRecurse = true;
219 recurseDueTime = Math.max(0, q[0].timestamp - scheduler.now());
220 } else {
221 active = false;
222 }
223 e = exception;
224 running = false;
225 if (e !== null) {
226 o.onError(e);
227 } else if (shouldRecurse) {
228 self(null, recurseDueTime);
229 }
230 }));
231 }
232 }
233 });
234 return new BinaryDisposable(subscription, cancelable);
235 }, source);
236 }
237
238 function observableDelayAbsolute(source, dueTime, scheduler) {
239 return observableDefer(function () {
240 return observableDelayRelative(source, dueTime - scheduler.now(), scheduler);
241 });
242 }
243
244 function delayWithSelector(source, subscriptionDelay, delayDurationSelector) {
245 var subDelay, selector;
246 if (isFunction(subscriptionDelay)) {
247 selector = subscriptionDelay;
248 } else {
249 subDelay = subscriptionDelay;
250 selector = delayDurationSelector;
251 }
252 return new AnonymousObservable(function (o) {
253 var delays = new CompositeDisposable(), atEnd = false, subscription = new SerialDisposable();
254
255 function start() {
256 subscription.setDisposable(source.subscribe(
257 function (x) {
258 var delay = tryCatch(selector)(x);
259 if (delay === errorObj) { return o.onError(delay.e); }
260 var d = new SingleAssignmentDisposable();
261 delays.add(d);
262 d.setDisposable(delay.subscribe(
263 function () {
264 o.onNext(x);
265 delays.remove(d);
266 done();
267 },
268 function (e) { o.onError(e); },
269 function () {
270 o.onNext(x);
271 delays.remove(d);
272 done();
273 }
274 ));
275 },
276 function (e) { o.onError(e); },
277 function () {
278 atEnd = true;
279 subscription.dispose();
280 done();
281 }
282 ));
283 }
284
285 function done () {
286 atEnd && delays.length === 0 && o.onCompleted();
287 }
288
289 if (!subDelay) {
290 start();
291 } else {
292 subscription.setDisposable(subDelay.subscribe(start, function (e) { o.onError(e); }, start));
293 }
294
295 return new BinaryDisposable(subscription, delays);
296 }, this);
297 }
298
299 /**
300 * Time shifts the observable sequence by dueTime.
301 * The relative time intervals between the values are preserved.
302 *
303 * @param {Number} dueTime Absolute (specified as a Date object) or relative time (specified as an integer denoting milliseconds) by which to shift the observable sequence.
304 * @param {Scheduler} [scheduler] Scheduler to run the delay timers on. If not specified, the timeout scheduler is used.
305 * @returns {Observable} Time-shifted sequence.
306 */
307 observableProto.delay = function () {
308 var firstArg = arguments[0];
309 if (typeof firstArg === 'number' || firstArg instanceof Date) {
310 var dueTime = firstArg, scheduler = arguments[1];
311 isScheduler(scheduler) || (scheduler = defaultScheduler);
312 return dueTime instanceof Date ?
313 observableDelayAbsolute(this, dueTime, scheduler) :
314 observableDelayRelative(this, dueTime, scheduler);
315 } else if (Observable.isObservable(firstArg) || isFunction(firstArg)) {
316 return delayWithSelector(this, firstArg, arguments[1]);
317 } else {
318 throw new Error('Invalid arguments');
319 }
320 };
321
322 var DebounceObservable = (function (__super__) {
323 inherits(DebounceObservable, __super__);
324 function DebounceObservable(source, dt, s) {
325 isScheduler(s) || (s = defaultScheduler);
326 this.source = source;
327 this._dt = dt;
328 this._s = s;
329 __super__.call(this);
330 }
331
332 DebounceObservable.prototype.subscribeCore = function (o) {
333 var cancelable = new SerialDisposable();
334 return new BinaryDisposable(
335 this.source.subscribe(new DebounceObserver(o, this.source, this._dt, this._s, cancelable)),
336 cancelable);
337 };
338
339 return DebounceObservable;
340 }(ObservableBase));
341
342 var DebounceObserver = (function (__super__) {
343 inherits(DebounceObserver, __super__);
344 function DebounceObserver(observer, source, dueTime, scheduler, cancelable) {
345 this._o = observer;
346 this._s = source;
347 this._d = dueTime;
348 this._scheduler = scheduler;
349 this._c = cancelable;
350 this._v = null;
351 this._hv = false;
352 this._id = 0;
353 __super__.call(this);
354 }
355
356 DebounceObserver.prototype.next = function (x) {
357 this._hv = true;
358 this._v = x;
359 var currentId = ++this._id, d = new SingleAssignmentDisposable();
360 this._c.setDisposable(d);
361 d.setDisposable(this._scheduler.scheduleFuture(this, this._d, function (_, self) {
362 self._hv && self._id === currentId && self._o.onNext(x);
363 self._hv = false;
364 }));
365 };
366
367 DebounceObserver.prototype.error = function (e) {
368 this._c.dispose();
369 this._o.onError(e);
370 this._hv = false;
371 this._id++;
372 };
373
374 DebounceObserver.prototype.completed = function () {
375 this._c.dispose();
376 this._hv && this._o.onNext(this._v);
377 this._o.onCompleted();
378 this._hv = false;
379 this._id++;
380 };
381
382 return DebounceObserver;
383 }(AbstractObserver));
384
385 function debounceWithSelector(source, durationSelector) {
386 return new AnonymousObservable(function (o) {
387 var value, hasValue = false, cancelable = new SerialDisposable(), id = 0;
388 var subscription = source.subscribe(
389 function (x) {
390 var throttle = tryCatch(durationSelector)(x);
391 if (throttle === errorObj) { return o.onError(throttle.e); }
392
393 isPromise(throttle) && (throttle = observableFromPromise(throttle));
394
395 hasValue = true;
396 value = x;
397 id++;
398 var currentid = id, d = new SingleAssignmentDisposable();
399 cancelable.setDisposable(d);
400 d.setDisposable(throttle.subscribe(
401 function () {
402 hasValue && id === currentid && o.onNext(value);
403 hasValue = false;
404 d.dispose();
405 },
406 function (e) { o.onError(e); },
407 function () {
408 hasValue && id === currentid && o.onNext(value);
409 hasValue = false;
410 d.dispose();
411 }
412 ));
413 },
414 function (e) {
415 cancelable.dispose();
416 o.onError(e);
417 hasValue = false;
418 id++;
419 },
420 function () {
421 cancelable.dispose();
422 hasValue && o.onNext(value);
423 o.onCompleted();
424 hasValue = false;
425 id++;
426 }
427 );
428 return new BinaryDisposable(subscription, cancelable);
429 }, source);
430 }
431
432 observableProto.debounce = function () {
433 if (isFunction (arguments[0])) {
434 return debounceWithSelector(this, arguments[0]);
435 } else if (typeof arguments[0] === 'number') {
436 return new DebounceObservable(this, arguments[0], arguments[1]);
437 } else {
438 throw new Error('Invalid arguments');
439 }
440 };
441
442 /**
443 * Projects each element of an observable sequence into zero or more windows which are produced based on timing information.
444 * @param {Number} timeSpan Length of each window (specified as an integer denoting milliseconds).
445 * @param {Mixed} [timeShiftOrScheduler] Interval between creation of consecutive windows (specified as an integer denoting milliseconds), or an optional scheduler parameter. If not specified, the time shift corresponds to the timeSpan parameter, resulting in non-overlapping adjacent windows.
446 * @param {Scheduler} [scheduler] Scheduler to run windowing timers on. If not specified, the timeout scheduler is used.
447 * @returns {Observable} An observable sequence of windows.
448 */
449 observableProto.windowWithTime = function (timeSpan, timeShiftOrScheduler, scheduler) {
450 var source = this, timeShift;
451 timeShiftOrScheduler == null && (timeShift = timeSpan);
452 isScheduler(scheduler) || (scheduler = defaultScheduler);
453 if (typeof timeShiftOrScheduler === 'number') {
454 timeShift = timeShiftOrScheduler;
455 } else if (isScheduler(timeShiftOrScheduler)) {
456 timeShift = timeSpan;
457 scheduler = timeShiftOrScheduler;
458 }
459 return new AnonymousObservable(function (observer) {
460 var groupDisposable,
461 nextShift = timeShift,
462 nextSpan = timeSpan,
463 q = [],
464 refCountDisposable,
465 timerD = new SerialDisposable(),
466 totalTime = 0;
467 groupDisposable = new CompositeDisposable(timerD),
468 refCountDisposable = new RefCountDisposable(groupDisposable);
469
470 function createTimer () {
471 var m = new SingleAssignmentDisposable(),
472 isSpan = false,
473 isShift = false;
474 timerD.setDisposable(m);
475 if (nextSpan === nextShift) {
476 isSpan = true;
477 isShift = true;
478 } else if (nextSpan < nextShift) {
479 isSpan = true;
480 } else {
481 isShift = true;
482 }
483 var newTotalTime = isSpan ? nextSpan : nextShift,
484 ts = newTotalTime - totalTime;
485 totalTime = newTotalTime;
486 if (isSpan) {
487 nextSpan += timeShift;
488 }
489 if (isShift) {
490 nextShift += timeShift;
491 }
492 m.setDisposable(scheduler.scheduleFuture(null, ts, function () {
493 if (isShift) {
494 var s = new Subject();
495 q.push(s);
496 observer.onNext(addRef(s, refCountDisposable));
497 }
498 isSpan && q.shift().onCompleted();
499 createTimer();
500 }));
501 };
502 q.push(new Subject());
503 observer.onNext(addRef(q[0], refCountDisposable));
504 createTimer();
505 groupDisposable.add(source.subscribe(
506 function (x) {
507 for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); }
508 },
509 function (e) {
510 for (var i = 0, len = q.length; i < len; i++) { q[i].onError(e); }
511 observer.onError(e);
512 },
513 function () {
514 for (var i = 0, len = q.length; i < len; i++) { q[i].onCompleted(); }
515 observer.onCompleted();
516 }
517 ));
518 return refCountDisposable;
519 }, source);
520 };
521
522 /**
523 * Projects each element of an observable sequence into a window that is completed when either it's full or a given amount of time has elapsed.
524 * @param {Number} timeSpan Maximum time length of a window.
525 * @param {Number} count Maximum element count of a window.
526 * @param {Scheduler} [scheduler] Scheduler to run windowing timers on. If not specified, the timeout scheduler is used.
527 * @returns {Observable} An observable sequence of windows.
528 */
529 observableProto.windowWithTimeOrCount = function (timeSpan, count, scheduler) {
530 var source = this;
531 isScheduler(scheduler) || (scheduler = defaultScheduler);
532 return new AnonymousObservable(function (observer) {
533 var timerD = new SerialDisposable(),
534 groupDisposable = new CompositeDisposable(timerD),
535 refCountDisposable = new RefCountDisposable(groupDisposable),
536 n = 0,
537 windowId = 0,
538 s = new Subject();
539
540 function createTimer(id) {
541 var m = new SingleAssignmentDisposable();
542 timerD.setDisposable(m);
543 m.setDisposable(scheduler.scheduleFuture(null, timeSpan, function () {
544 if (id !== windowId) { return; }
545 n = 0;
546 var newId = ++windowId;
547 s.onCompleted();
548 s = new Subject();
549 observer.onNext(addRef(s, refCountDisposable));
550 createTimer(newId);
551 }));
552 }
553
554 observer.onNext(addRef(s, refCountDisposable));
555 createTimer(0);
556
557 groupDisposable.add(source.subscribe(
558 function (x) {
559 var newId = 0, newWindow = false;
560 s.onNext(x);
561 if (++n === count) {
562 newWindow = true;
563 n = 0;
564 newId = ++windowId;
565 s.onCompleted();
566 s = new Subject();
567 observer.onNext(addRef(s, refCountDisposable));
568 }
569 newWindow && createTimer(newId);
570 },
571 function (e) {
572 s.onError(e);
573 observer.onError(e);
574 }, function () {
575 s.onCompleted();
576 observer.onCompleted();
577 }
578 ));
579 return refCountDisposable;
580 }, source);
581 };
582
583 function toArray(x) { return x.toArray(); }
584
585 /**
586 * Projects each element of an observable sequence into zero or more buffers which are produced based on timing information.
587 * @param {Number} timeSpan Length of each buffer (specified as an integer denoting milliseconds).
588 * @param {Mixed} [timeShiftOrScheduler] Interval between creation of consecutive buffers (specified as an integer denoting milliseconds), or an optional scheduler parameter. If not specified, the time shift corresponds to the timeSpan parameter, resulting in non-overlapping adjacent buffers.
589 * @param {Scheduler} [scheduler] Scheduler to run buffer timers on. If not specified, the timeout scheduler is used.
590 * @returns {Observable} An observable sequence of buffers.
591 */
592 observableProto.bufferWithTime = function (timeSpan, timeShiftOrScheduler, scheduler) {
593 return this.windowWithTime(timeSpan, timeShiftOrScheduler, scheduler).flatMap(toArray);
594 };
595
596 function toArray(x) { return x.toArray(); }
597
598 /**
599 * Projects each element of an observable sequence into a buffer that is completed when either it's full or a given amount of time has elapsed.
600 * @param {Number} timeSpan Maximum time length of a buffer.
601 * @param {Number} count Maximum element count of a buffer.
602 * @param {Scheduler} [scheduler] Scheduler to run bufferin timers on. If not specified, the timeout scheduler is used.
603 * @returns {Observable} An observable sequence of buffers.
604 */
605 observableProto.bufferWithTimeOrCount = function (timeSpan, count, scheduler) {
606 return this.windowWithTimeOrCount(timeSpan, count, scheduler).flatMap(toArray);
607 };
608
609 var TimeIntervalObservable = (function (__super__) {
610 inherits(TimeIntervalObservable, __super__);
611 function TimeIntervalObservable(source, s) {
612 this.source = source;
613 this._s = s;
614 __super__.call(this);
615 }
616
617 TimeIntervalObservable.prototype.subscribeCore = function (o) {
618 return this.source.subscribe(new TimeIntervalObserver(o, this._s));
619 };
620
621 return TimeIntervalObservable;
622 }(ObservableBase));
623
624 var TimeIntervalObserver = (function (__super__) {
625 inherits(TimeIntervalObserver, __super__);
626
627 function TimeIntervalObserver(o, s) {
628 this._o = o;
629 this._s = s;
630 this._l = s.now();
631 __super__.call(this);
632 }
633
634 TimeIntervalObserver.prototype.next = function (x) {
635 var now = this._s.now(), span = now - this._l;
636 this._l = now;
637 this._o.onNext({ value: x, interval: span });
638 };
639 TimeIntervalObserver.prototype.error = function (e) { this._o.onError(e); };
640 TimeIntervalObserver.prototype.completed = function () { this._o.onCompleted(); };
641
642 return TimeIntervalObserver;
643 }(AbstractObserver));
644
645 /**
646 * Records the time interval between consecutive values in an observable sequence.
647 *
648 * @example
649 * 1 - res = source.timeInterval();
650 * 2 - res = source.timeInterval(Rx.Scheduler.timeout);
651 *
652 * @param [scheduler] Scheduler used to compute time intervals. If not specified, the timeout scheduler is used.
653 * @returns {Observable} An observable sequence with time interval information on values.
654 */
655 observableProto.timeInterval = function (scheduler) {
656 isScheduler(scheduler) || (scheduler = defaultScheduler);
657 return new TimeIntervalObservable(this, scheduler);
658 };
659
660 var TimestampObservable = (function (__super__) {
661 inherits(TimestampObservable, __super__);
662 function TimestampObservable(source, s) {
663 this.source = source;
664 this._s = s;
665 __super__.call(this);
666 }
667
668 TimestampObservable.prototype.subscribeCore = function (o) {
669 return this.source.subscribe(new TimestampObserver(o, this._s));
670 };
671
672 return TimestampObservable;
673 }(ObservableBase));
674
675 var TimestampObserver = (function (__super__) {
676 inherits(TimestampObserver, __super__);
677 function TimestampObserver(o, s) {
678 this._o = o;
679 this._s = s;
680 __super__.call(this);
681 }
682
683 TimestampObserver.prototype.next = function (x) {
684 this._o.onNext({ value: x, timestamp: this._s.now() });
685 };
686
687 TimestampObserver.prototype.error = function (e) {
688 this._o.onError(e);
689 };
690
691 TimestampObserver.prototype.completed = function () {
692 this._o.onCompleted();
693 };
694
695 return TimestampObserver;
696 }(AbstractObserver));
697
698 /**
699 * Records the timestamp for each value in an observable sequence.
700 *
701 * @example
702 * 1 - res = source.timestamp(); // produces { value: x, timestamp: ts }
703 * 2 - res = source.timestamp(Rx.Scheduler.default);
704 *
705 * @param {Scheduler} [scheduler] Scheduler used to compute timestamps. If not specified, the default scheduler is used.
706 * @returns {Observable} An observable sequence with timestamp information on values.
707 */
708 observableProto.timestamp = function (scheduler) {
709 isScheduler(scheduler) || (scheduler = defaultScheduler);
710 return new TimestampObservable(this, scheduler);
711 };
712
713 function sampleObservable(source, sampler) {
714 return new AnonymousObservable(function (o) {
715 var atEnd = false, value, hasValue = false;
716
717 function sampleSubscribe() {
718 if (hasValue) {
719 hasValue = false;
720 o.onNext(value);
721 }
722 atEnd && o.onCompleted();
723 }
724
725 var sourceSubscription = new SingleAssignmentDisposable();
726 sourceSubscription.setDisposable(source.subscribe(
727 function (newValue) {
728 hasValue = true;
729 value = newValue;
730 },
731 function (e) { o.onError(e); },
732 function () {
733 atEnd = true;
734 sourceSubscription.dispose();
735 }
736 ));
737
738 return new BinaryDisposable(
739 sourceSubscription,
740 sampler.subscribe(sampleSubscribe, function (e) { o.onError(e); }, sampleSubscribe)
741 );
742 }, source);
743 }
744
745 /**
746 * Samples the observable sequence at each interval.
747 *
748 * @example
749 * 1 - res = source.sample(sampleObservable); // Sampler tick sequence
750 * 2 - res = source.sample(5000); // 5 seconds
751 * 2 - res = source.sample(5000, Rx.Scheduler.timeout); // 5 seconds
752 *
753 * @param {Mixed} intervalOrSampler Interval at which to sample (specified as an integer denoting milliseconds) or Sampler Observable.
754 * @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used.
755 * @returns {Observable} Sampled observable sequence.
756 */
757 observableProto.sample = observableProto.throttleLatest = function (intervalOrSampler, scheduler) {
758 isScheduler(scheduler) || (scheduler = defaultScheduler);
759 return typeof intervalOrSampler === 'number' ?
760 sampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
761 sampleObservable(this, intervalOrSampler);
762 };
763
764 var TimeoutError = Rx.TimeoutError = function(message) {
765 this.message = message || 'Timeout has occurred';
766 this.name = 'TimeoutError';
767 Error.call(this);
768 };
769 TimeoutError.prototype = Object.create(Error.prototype);
770
771 function timeoutWithSelector(source, firstTimeout, timeoutDurationSelector, other) {
772 if (isFunction(firstTimeout)) {
773 other = timeoutDurationSelector;
774 timeoutDurationSelector = firstTimeout;
775 firstTimeout = observableNever();
776 }
777 Observable.isObservable(other) || (other = observableThrow(new TimeoutError()));
778 return new AnonymousObservable(function (o) {
779 var subscription = new SerialDisposable(),
780 timer = new SerialDisposable(),
781 original = new SingleAssignmentDisposable();
782
783 subscription.setDisposable(original);
784
785 var id = 0, switched = false;
786
787 function setTimer(timeout) {
788 var myId = id, d = new SingleAssignmentDisposable();
789
790 function timerWins() {
791 switched = (myId === id);
792 return switched;
793 }
794
795 timer.setDisposable(d);
796 d.setDisposable(timeout.subscribe(function () {
797 timerWins() && subscription.setDisposable(other.subscribe(o));
798 d.dispose();
799 }, function (e) {
800 timerWins() && o.onError(e);
801 }, function () {
802 timerWins() && subscription.setDisposable(other.subscribe(o));
803 }));
804 };
805
806 setTimer(firstTimeout);
807
808 function oWins() {
809 var res = !switched;
810 if (res) { id++; }
811 return res;
812 }
813
814 original.setDisposable(source.subscribe(function (x) {
815 if (oWins()) {
816 o.onNext(x);
817 var timeout = tryCatch(timeoutDurationSelector)(x);
818 if (timeout === errorObj) { return o.onError(timeout.e); }
819 setTimer(isPromise(timeout) ? observableFromPromise(timeout) : timeout);
820 }
821 }, function (e) {
822 oWins() && o.onError(e);
823 }, function () {
824 oWins() && o.onCompleted();
825 }));
826 return new BinaryDisposable(subscription, timer);
827 }, source);
828 }
829
830 function timeout(source, dueTime, other, scheduler) {
831 if (isScheduler(other)) {
832 scheduler = other;
833 other = observableThrow(new TimeoutError());
834 }
835 if (other instanceof Error) { other = observableThrow(other); }
836 isScheduler(scheduler) || (scheduler = defaultScheduler);
837 Observable.isObservable(other) || (other = observableThrow(new TimeoutError()));
838 return new AnonymousObservable(function (o) {
839 var id = 0,
840 original = new SingleAssignmentDisposable(),
841 subscription = new SerialDisposable(),
842 switched = false,
843 timer = new SerialDisposable();
844
845 subscription.setDisposable(original);
846
847 function createTimer() {
848 var myId = id;
849 timer.setDisposable(scheduler.scheduleFuture(null, dueTime, function () {
850 switched = id === myId;
851 if (switched) {
852 isPromise(other) && (other = observableFromPromise(other));
853 subscription.setDisposable(other.subscribe(o));
854 }
855 }));
856 }
857
858 createTimer();
859
860 original.setDisposable(source.subscribe(function (x) {
861 if (!switched) {
862 id++;
863 o.onNext(x);
864 createTimer();
865 }
866 }, function (e) {
867 if (!switched) {
868 id++;
869 o.onError(e);
870 }
871 }, function () {
872 if (!switched) {
873 id++;
874 o.onCompleted();
875 }
876 }));
877 return new BinaryDisposable(subscription, timer);
878 }, source);
879 }
880
881 observableProto.timeout = function () {
882 var firstArg = arguments[0];
883 if (firstArg instanceof Date || typeof firstArg === 'number') {
884 return timeout(this, firstArg, arguments[1], arguments[2]);
885 } else if (Observable.isObservable(firstArg) || isFunction(firstArg)) {
886 return timeoutWithSelector(this, firstArg, arguments[1], arguments[2]);
887 } else {
888 throw new Error('Invalid arguments');
889 }
890 };
891
892 var GenerateAbsoluteObservable = (function (__super__) {
893 inherits(GenerateAbsoluteObservable, __super__);
894 function GenerateAbsoluteObservable(state, cndFn, itrFn, resFn, timeFn, s) {
895 this._state = state;
896 this._cndFn = cndFn;
897 this._itrFn = itrFn;
898 this._resFn = resFn;
899 this._timeFn = timeFn;
900 this._s = s;
901 this._first = true;
902 this._hasResult = false;
903 __super__.call(this);
904 }
905
906 function scheduleRecursive(self, recurse) {
907 self._hasResult && self._o.onNext(self._state);
908
909 if (self._first) {
910 self._first = false;
911 } else {
912 self._state = tryCatch(self._itrFn)(self._state);
913 if (self._state === errorObj) { return self._o.onError(self._state.e); }
914 }
915 self._hasResult = tryCatch(self._cndFn)(self._state);
916 if (self._hasResult === errorObj) { return self._o.onError(self._hasResult.e); }
917 if (self._hasResult) {
918 var result = tryCatch(self._resFn)(self._state);
919 if (result === errorObj) { return self._o.onError(result.e); }
920 var time = tryCatch(self._timeFn)(self._state);
921 if (time === errorObj) { return self._o.onError(time.e); }
922 recurse(self, time);
923 } else {
924 self._o.onCompleted();
925 }
926 }
927
928 GenerateAbsoluteObservable.prototype.subscribeCore = function (o) {
929 this._o = o;
930 return this._s.scheduleRecursiveFuture(this, new Date(this._s.now()), scheduleRecursive);
931 };
932
933 return GenerateAbsoluteObservable;
934 }(ObservableBase));
935
936 /**
937 * GenerateAbsolutes an observable sequence by iterating a state from an initial state until the condition fails.
938 *
939 * @example
940 * res = source.generateWithAbsoluteTime(0,
941 * function (x) { return return true; },
942 * function (x) { return x + 1; },
943 * function (x) { return x; },
944 * function (x) { return new Date(); }
945 * });
946 *
947 * @param {Mixed} initialState Initial state.
948 * @param {Function} condition Condition to terminate generation (upon returning false).
949 * @param {Function} iterate Iteration step function.
950 * @param {Function} resultSelector Selector function for results produced in the sequence.
951 * @param {Function} timeSelector Time selector function to control the speed of values being produced each iteration, returning Date values.
952 * @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not specified, the timeout scheduler is used.
953 * @returns {Observable} The generated sequence.
954 */
955 Observable.generateWithAbsoluteTime = function (initialState, condition, iterate, resultSelector, timeSelector, scheduler) {
956 isScheduler(scheduler) || (scheduler = defaultScheduler);
957 return new GenerateAbsoluteObservable(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
958 };
959
960 var GenerateRelativeObservable = (function (__super__) {
961 inherits(GenerateRelativeObservable, __super__);
962 function GenerateRelativeObservable(state, cndFn, itrFn, resFn, timeFn, s) {
963 this._state = state;
964 this._cndFn = cndFn;
965 this._itrFn = itrFn;
966 this._resFn = resFn;
967 this._timeFn = timeFn;
968 this._s = s;
969 this._first = true;
970 this._hasResult = false;
971 __super__.call(this);
972 }
973
974 function scheduleRecursive(self, recurse) {
975 self._hasResult && self._o.onNext(self._state);
976
977 if (self._first) {
978 self._first = false;
979 } else {
980 self._state = tryCatch(self._itrFn)(self._state);
981 if (self._state === errorObj) { return self._o.onError(self._state.e); }
982 }
983 self._hasResult = tryCatch(self._cndFn)(self._state);
984 if (self._hasResult === errorObj) { return self._o.onError(self._hasResult.e); }
985 if (self._hasResult) {
986 var result = tryCatch(self._resFn)(self._state);
987 if (result === errorObj) { return self._o.onError(result.e); }
988 var time = tryCatch(self._timeFn)(self._state);
989 if (time === errorObj) { return self._o.onError(time.e); }
990 recurse(self, time);
991 } else {
992 self._o.onCompleted();
993 }
994 }
995
996 GenerateRelativeObservable.prototype.subscribeCore = function (o) {
997 this._o = o;
998 return this._s.scheduleRecursiveFuture(this, 0, scheduleRecursive);
999 };
1000
1001 return GenerateRelativeObservable;
1002 }(ObservableBase));
1003
1004 /**
1005 * Generates an observable sequence by iterating a state from an initial state until the condition fails.
1006 *
1007 * @example
1008 * res = source.generateWithRelativeTime(0,
1009 * function (x) { return return true; },
1010 * function (x) { return x + 1; },
1011 * function (x) { return x; },
1012 * function (x) { return 500; }
1013 * );
1014 *
1015 * @param {Mixed} initialState Initial state.
1016 * @param {Function} condition Condition to terminate generation (upon returning false).
1017 * @param {Function} iterate Iteration step function.
1018 * @param {Function} resultSelector Selector function for results produced in the sequence.
1019 * @param {Function} timeSelector Time selector function to control the speed of values being produced each iteration, returning integer values denoting milliseconds.
1020 * @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not specified, the timeout scheduler is used.
1021 * @returns {Observable} The generated sequence.
1022 */
1023 Observable.generateWithRelativeTime = function (initialState, condition, iterate, resultSelector, timeSelector, scheduler) {
1024 isScheduler(scheduler) || (scheduler = defaultScheduler);
1025 return new GenerateRelativeObservable(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
1026 };
1027
1028 var DelaySubscription = (function(__super__) {
1029 inherits(DelaySubscription, __super__);
1030 function DelaySubscription(source, dt, s) {
1031 this.source = source;
1032 this._dt = dt;
1033 this._s = s;
1034 __super__.call(this);
1035 }
1036
1037 DelaySubscription.prototype.subscribeCore = function (o) {
1038 var d = new SerialDisposable();
1039
1040 d.setDisposable(this._s.scheduleFuture([this.source, o, d], this._dt, scheduleMethod));
1041
1042 return d;
1043 };
1044
1045 function scheduleMethod(s, state) {
1046 var source = state[0], o = state[1], d = state[2];
1047 d.setDisposable(source.subscribe(o));
1048 }
1049
1050 return DelaySubscription;
1051 }(ObservableBase));
1052
1053 /**
1054 * Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers.
1055 *
1056 * @example
1057 * 1 - res = source.delaySubscription(5000); // 5s
1058 * 2 - res = source.delaySubscription(5000, Rx.Scheduler.default); // 5 seconds
1059 *
1060 * @param {Number} dueTime Relative or absolute time shift of the subscription.
1061 * @param {Scheduler} [scheduler] Scheduler to run the subscription delay timer on. If not specified, the timeout scheduler is used.
1062 * @returns {Observable} Time-shifted sequence.
1063 */
1064 observableProto.delaySubscription = function (dueTime, scheduler) {
1065 isScheduler(scheduler) || (scheduler = defaultScheduler);
1066 return new DelaySubscription(this, dueTime, scheduler);
1067 };
1068
1069 var SkipLastWithTimeObservable = (function (__super__) {
1070 inherits(SkipLastWithTimeObservable, __super__);
1071 function SkipLastWithTimeObservable(source, d, s) {
1072 this.source = source;
1073 this._d = d;
1074 this._s = s;
1075 __super__.call(this);
1076 }
1077
1078 SkipLastWithTimeObservable.prototype.subscribeCore = function (o) {
1079 return this.source.subscribe(new SkipLastWithTimeObserver(o, this));
1080 };
1081
1082 return SkipLastWithTimeObservable;
1083 }(ObservableBase));
1084
1085 var SkipLastWithTimeObserver = (function (__super__) {
1086 inherits(SkipLastWithTimeObserver, __super__);
1087
1088 function SkipLastWithTimeObserver(o, p) {
1089 this._o = o;
1090 this._s = p._s;
1091 this._d = p._d;
1092 this._q = [];
1093 __super__.call(this);
1094 }
1095
1096 SkipLastWithTimeObserver.prototype.next = function (x) {
1097 var now = this._s.now();
1098 this._q.push({ interval: now, value: x });
1099 while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
1100 this._o.onNext(this._q.shift().value);
1101 }
1102 };
1103 SkipLastWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
1104 SkipLastWithTimeObserver.prototype.completed = function () {
1105 var now = this._s.now();
1106 while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
1107 this._o.onNext(this._q.shift().value);
1108 }
1109 this._o.onCompleted();
1110 };
1111
1112 return SkipLastWithTimeObserver;
1113 }(AbstractObserver));
1114
1115 /**
1116 * Skips elements for the specified duration from the end of the observable source sequence, using the specified scheduler to run timers.
1117 * @description
1118 * This operator accumulates a queue with a length enough to store elements received during the initial duration window.
1119 * As more elements are received, elements older than the specified duration are taken from the queue and produced on the
1120 * result sequence. This causes elements to be delayed with duration.
1121 * @param {Number} duration Duration for skipping elements from the end of the sequence.
1122 * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout
1123 * @returns {Observable} An observable sequence with the elements skipped during the specified duration from the end of the source sequence.
1124 */
1125 observableProto.skipLastWithTime = function (duration, scheduler) {
1126 isScheduler(scheduler) || (scheduler = defaultScheduler);
1127 return new SkipLastWithTimeObservable(this, duration, scheduler);
1128 };
1129
1130 var TakeLastWithTimeObservable = (function (__super__) {
1131 inherits(TakeLastWithTimeObservable, __super__);
1132 function TakeLastWithTimeObservable(source, d, s) {
1133 this.source = source;
1134 this._d = d;
1135 this._s = s;
1136 __super__.call(this);
1137 }
1138
1139 TakeLastWithTimeObservable.prototype.subscribeCore = function (o) {
1140 return this.source.subscribe(new TakeLastWithTimeObserver(o, this._d, this._s));
1141 };
1142
1143 return TakeLastWithTimeObservable;
1144 }(ObservableBase));
1145
1146 var TakeLastWithTimeObserver = (function (__super__) {
1147 inherits(TakeLastWithTimeObserver, __super__);
1148
1149 function TakeLastWithTimeObserver(o, d, s) {
1150 this._o = o;
1151 this._d = d;
1152 this._s = s;
1153 this._q = [];
1154 __super__.call(this);
1155 }
1156
1157 TakeLastWithTimeObserver.prototype.next = function (x) {
1158 var now = this._s.now();
1159 this._q.push({ interval: now, value: x });
1160 while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
1161 this._q.shift();
1162 }
1163 };
1164 TakeLastWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
1165 TakeLastWithTimeObserver.prototype.completed = function () {
1166 var now = this._s.now();
1167 while (this._q.length > 0) {
1168 var next = this._q.shift();
1169 if (now - next.interval <= this._d) { this._o.onNext(next.value); }
1170 }
1171 this._o.onCompleted();
1172 };
1173
1174 return TakeLastWithTimeObserver;
1175 }(AbstractObserver));
1176
1177 /**
1178 * Returns elements within the specified duration from the end of the observable source sequence, using the specified schedulers to run timers and to drain the collected elements.
1179 * @description
1180 * This operator accumulates a queue with a length enough to store elements received during the initial duration window.
1181 * As more elements are received, elements older than the specified duration are taken from the queue and produced on the
1182 * result sequence. This causes elements to be delayed with duration.
1183 * @param {Number} duration Duration for taking elements from the end of the sequence.
1184 * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
1185 * @returns {Observable} An observable sequence with the elements taken during the specified duration from the end of the source sequence.
1186 */
1187 observableProto.takeLastWithTime = function (duration, scheduler) {
1188 isScheduler(scheduler) || (scheduler = defaultScheduler);
1189 return new TakeLastWithTimeObservable(this, duration, scheduler);
1190 };
1191
1192 /**
1193 * Returns an array with the elements within the specified duration from the end of the observable source sequence, using the specified scheduler to run timers.
1194 * @description
1195 * This operator accumulates a queue with a length enough to store elements received during the initial duration window.
1196 * As more elements are received, elements older than the specified duration are taken from the queue and produced on the
1197 * result sequence. This causes elements to be delayed with duration.
1198 * @param {Number} duration Duration for taking elements from the end of the sequence.
1199 * @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
1200 * @returns {Observable} An observable sequence containing a single array with the elements taken during the specified duration from the end of the source sequence.
1201 */
1202 observableProto.takeLastBufferWithTime = function (duration, scheduler) {
1203 var source = this;
1204 isScheduler(scheduler) || (scheduler = defaultScheduler);
1205 return new AnonymousObservable(function (o) {
1206 var q = [];
1207 return source.subscribe(function (x) {
1208 var now = scheduler.now();
1209 q.push({ interval: now, value: x });
1210 while (q.length > 0 && now - q[0].interval >= duration) {
1211 q.shift();
1212 }
1213 }, function (e) { o.onError(e); }, function () {
1214 var now = scheduler.now(), res = [];
1215 while (q.length > 0) {
1216 var next = q.shift();
1217 now - next.interval <= duration && res.push(next.value);
1218 }
1219 o.onNext(res);
1220 o.onCompleted();
1221 });
1222 }, source);
1223 };
1224
1225 var TakeWithTimeObservable = (function (__super__) {
1226 inherits(TakeWithTimeObservable, __super__);
1227 function TakeWithTimeObservable(source, d, s) {
1228 this.source = source;
1229 this._d = d;
1230 this._s = s;
1231 __super__.call(this);
1232 }
1233
1234 function scheduleMethod(s, o) {
1235 o.onCompleted();
1236 }
1237
1238 TakeWithTimeObservable.prototype.subscribeCore = function (o) {
1239 return new BinaryDisposable(
1240 this._s.scheduleFuture(o, this._d, scheduleMethod),
1241 this.source.subscribe(o)
1242 );
1243 };
1244
1245 return TakeWithTimeObservable;
1246 }(ObservableBase));
1247
1248 /**
1249 * Takes elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers.
1250 *
1251 * @example
1252 * 1 - res = source.takeWithTime(5000, [optional scheduler]);
1253 * @description
1254 * This operator accumulates a queue with a length enough to store elements received during the initial duration window.
1255 * As more elements are received, elements older than the specified duration are taken from the queue and produced on the
1256 * result sequence. This causes elements to be delayed with duration.
1257 * @param {Number} duration Duration for taking elements from the start of the sequence.
1258 * @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
1259 * @returns {Observable} An observable sequence with the elements taken during the specified duration from the start of the source sequence.
1260 */
1261 observableProto.takeWithTime = function (duration, scheduler) {
1262 isScheduler(scheduler) || (scheduler = defaultScheduler);
1263 return new TakeWithTimeObservable(this, duration, scheduler);
1264 };
1265
1266 var SkipWithTimeObservable = (function (__super__) {
1267 inherits(SkipWithTimeObservable, __super__);
1268 function SkipWithTimeObservable(source, d, s) {
1269 this.source = source;
1270 this._d = d;
1271 this._s = s;
1272 this._open = false;
1273 __super__.call(this);
1274 }
1275
1276 function scheduleMethod(s, self) {
1277 self._open = true;
1278 }
1279
1280 SkipWithTimeObservable.prototype.subscribeCore = function (o) {
1281 return new BinaryDisposable(
1282 this._s.scheduleFuture(this, this._d, scheduleMethod),
1283 this.source.subscribe(new SkipWithTimeObserver(o, this))
1284 );
1285 };
1286
1287 return SkipWithTimeObservable;
1288 }(ObservableBase));
1289
1290 var SkipWithTimeObserver = (function (__super__) {
1291 inherits(SkipWithTimeObserver, __super__);
1292
1293 function SkipWithTimeObserver(o, p) {
1294 this._o = o;
1295 this._p = p;
1296 __super__.call(this);
1297 }
1298
1299 SkipWithTimeObserver.prototype.next = function (x) { this._p._open && this._o.onNext(x); };
1300 SkipWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
1301 SkipWithTimeObserver.prototype.completed = function () { this._o.onCompleted(); };
1302
1303 return SkipWithTimeObserver;
1304 }(AbstractObserver));
1305
1306 /**
1307 * Skips elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers.
1308 * @description
1309 * Specifying a zero value for duration doesn't guarantee no elements will be dropped from the start of the source sequence.
1310 * This is a side-effect of the asynchrony introduced by the scheduler, where the action that causes callbacks from the source sequence to be forwarded
1311 * may not execute immediately, despite the zero due time.
1312 *
1313 * Errors produced by the source sequence are always forwarded to the result sequence, even if the error occurs before the duration.
1314 * @param {Number} duration Duration for skipping elements from the start of the sequence.
1315 * @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
1316 * @returns {Observable} An observable sequence with the elements skipped during the specified duration from the start of the source sequence.
1317 */
1318 observableProto.skipWithTime = function (duration, scheduler) {
1319 isScheduler(scheduler) || (scheduler = defaultScheduler);
1320 return new SkipWithTimeObservable(this, duration, scheduler);
1321 };
1322
1323 var SkipUntilWithTimeObservable = (function (__super__) {
1324 inherits(SkipUntilWithTimeObservable, __super__);
1325 function SkipUntilWithTimeObservable(source, startTime, scheduler) {
1326 this.source = source;
1327 this._st = startTime;
1328 this._s = scheduler;
1329 __super__.call(this);
1330 }
1331
1332 function scheduleMethod(s, state) {
1333 state._open = true;
1334 }
1335
1336 SkipUntilWithTimeObservable.prototype.subscribeCore = function (o) {
1337 this._open = false;
1338 return new BinaryDisposable(
1339 this._s.scheduleFuture(this, this._st, scheduleMethod),
1340 this.source.subscribe(new SkipUntilWithTimeObserver(o, this))
1341 );
1342 };
1343
1344 return SkipUntilWithTimeObservable;
1345 }(ObservableBase));
1346
1347 var SkipUntilWithTimeObserver = (function (__super__) {
1348 inherits(SkipUntilWithTimeObserver, __super__);
1349
1350 function SkipUntilWithTimeObserver(o, p) {
1351 this._o = o;
1352 this._p = p;
1353 __super__.call(this);
1354 }
1355
1356 SkipUntilWithTimeObserver.prototype.next = function (x) { this._p._open && this._o.onNext(x); };
1357 SkipUntilWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
1358 SkipUntilWithTimeObserver.prototype.completed = function () { this._o.onCompleted(); };
1359
1360 return SkipUntilWithTimeObserver;
1361 }(AbstractObserver));
1362
1363
1364 /**
1365 * Skips elements from the observable source sequence until the specified start time, using the specified scheduler to run timers.
1366 * Errors produced by the source sequence are always forwarded to the result sequence, even if the error occurs before the start time.
1367 *
1368 * @examples
1369 * 1 - res = source.skipUntilWithTime(new Date(), [scheduler]);
1370 * 2 - res = source.skipUntilWithTime(5000, [scheduler]);
1371 * @param {Date|Number} startTime Time to start taking elements from the source sequence. If this value is less than or equal to Date(), no elements will be skipped.
1372 * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
1373 * @returns {Observable} An observable sequence with the elements skipped until the specified start time.
1374 */
1375 observableProto.skipUntilWithTime = function (startTime, scheduler) {
1376 isScheduler(scheduler) || (scheduler = defaultScheduler);
1377 return new SkipUntilWithTimeObservable(this, startTime, scheduler);
1378 };
1379
1380 /**
1381 * Takes elements for the specified duration until the specified end time, using the specified scheduler to run timers.
1382 * @param {Number | Date} endTime Time to stop taking elements from the source sequence. If this value is less than or equal to new Date(), the result stream will complete immediately.
1383 * @param {Scheduler} [scheduler] Scheduler to run the timer on.
1384 * @returns {Observable} An observable sequence with the elements taken until the specified end time.
1385 */
1386 observableProto.takeUntilWithTime = function (endTime, scheduler) {
1387 isScheduler(scheduler) || (scheduler = defaultScheduler);
1388 var source = this;
1389 return new AnonymousObservable(function (o) {
1390 return new BinaryDisposable(
1391 scheduler.scheduleFuture(o, endTime, function (_, o) { o.onCompleted(); }),
1392 source.subscribe(o));
1393 }, source);
1394 };
1395
1396 /**
1397 * Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.
1398 * @param {Number} windowDuration time to wait before emitting another item after emitting the last item
1399 * @param {Scheduler} [scheduler] the Scheduler to use internally to manage the timers that handle timeout for each item. If not provided, defaults to Scheduler.timeout.
1400 * @returns {Observable} An Observable that performs the throttle operation.
1401 */
1402 observableProto.throttle = function (windowDuration, scheduler) {
1403 isScheduler(scheduler) || (scheduler = defaultScheduler);
1404 var duration = +windowDuration || 0;
1405 if (duration <= 0) { throw new RangeError('windowDuration cannot be less or equal zero.'); }
1406 var source = this;
1407 return new AnonymousObservable(function (o) {
1408 var lastOnNext = 0;
1409 return source.subscribe(
1410 function (x) {
1411 var now = scheduler.now();
1412 if (lastOnNext === 0 || now - lastOnNext >= duration) {
1413 lastOnNext = now;
1414 o.onNext(x);
1415 }
1416 },function (e) { o.onError(e); }, function () { o.onCompleted(); }
1417 );
1418 }, source);
1419 };
1420
1421 return Rx;
1422}));