1 |
|
2 |
|
3 |
|
4 |
|
5 | import * as dispose from './disposable/dispose'
|
6 | import defaultScheduler from './scheduler/defaultScheduler'
|
7 |
|
8 | export function withDefaultScheduler (source) {
|
9 | return withScheduler(source, defaultScheduler)
|
10 | }
|
11 |
|
12 | export function withScheduler (source, scheduler) {
|
13 | return new Promise(function (resolve, reject) {
|
14 | runSource(source, scheduler, resolve, reject)
|
15 | })
|
16 | }
|
17 |
|
18 | function runSource (source, scheduler, resolve, reject) {
|
19 | var disposable = dispose.settable()
|
20 | var observer = new Drain(resolve, reject, disposable)
|
21 |
|
22 | disposable.setDisposable(source.run(observer, scheduler))
|
23 | }
|
24 |
|
25 | function Drain (end, error, disposable) {
|
26 | this._end = end
|
27 | this._error = error
|
28 | this._disposable = disposable
|
29 | this.active = true
|
30 | }
|
31 |
|
32 | Drain.prototype.event = function (t, x) {}
|
33 |
|
34 | Drain.prototype.end = function (t, x) {
|
35 | if (!this.active) {
|
36 | return
|
37 | }
|
38 | this.active = false
|
39 | disposeThen(this._end, this._error, this._disposable, x)
|
40 | }
|
41 |
|
42 | Drain.prototype.error = function (t, e) {
|
43 | this.active = false
|
44 | disposeThen(this._error, this._error, this._disposable, e)
|
45 | }
|
46 |
|
47 | function disposeThen (end, error, disposable, x) {
|
48 | Promise.resolve(disposable.dispose()).then(function () {
|
49 | end(x)
|
50 | }, error)
|
51 | }
|