UNPKG

1.33 kBJavaScriptView Raw
1/** @license MIT License (c) copyright 2010-2016 original author or authors */
2/** @author Brian Cavalier */
3/** @author John Hann */
4
5import * as dispose from './disposable/dispose'
6import defaultScheduler from './scheduler/defaultScheduler'
7
8export function withDefaultScheduler (source) {
9 return withScheduler(source, defaultScheduler)
10}
11
12export function withScheduler (source, scheduler) {
13 return new Promise(function (resolve, reject) {
14 runSource(source, scheduler, resolve, reject)
15 })
16}
17
18function runSource (source, scheduler, resolve, reject) {
19 var disposable = dispose.settable()
20 var observer = new Drain(resolve, reject, disposable)
21
22 disposable.setDisposable(source.run(observer, scheduler))
23}
24
25function Drain (end, error, disposable) {
26 this._end = end
27 this._error = error
28 this._disposable = disposable
29 this.active = true
30}
31
32Drain.prototype.event = function (t, x) {}
33
34Drain.prototype.end = function (t, x) {
35 if (!this.active) {
36 return
37 }
38 this.active = false
39 disposeThen(this._end, this._error, this._disposable, x)
40}
41
42Drain.prototype.error = function (t, e) {
43 this.active = false
44 disposeThen(this._error, this._error, this._disposable, e)
45}
46
47function disposeThen (end, error, disposable, x) {
48 Promise.resolve(disposable.dispose()).then(function () {
49 end(x)
50 }, error)
51}