1 | import { toObserver } from '../util/toobserver';
|
2 | import { observable as symbolObservable } from '../observer';
|
3 | class BooleanSubscription {
|
4 | constructor() {
|
5 | this.isUnsubscribed = false;
|
6 | }
|
7 | unsubscribe() {
|
8 | this.isUnsubscribed = true;
|
9 | }
|
10 | }
|
11 | class AsyncIterableObservable {
|
12 | constructor(source) {
|
13 | this._source = source;
|
14 | }
|
15 | [symbolObservable]() {
|
16 | return this;
|
17 | }
|
18 | subscribe(next, error, complete) {
|
19 | const observer = toObserver(next, error, complete);
|
20 | const subscription = new BooleanSubscription();
|
21 | const it = this._source[Symbol.asyncIterator]();
|
22 | const f = () => {
|
23 | it.next()
|
24 | .then(({ value, done }) => {
|
25 | if (!subscription.isUnsubscribed) {
|
26 | if (done) {
|
27 | observer.complete();
|
28 | }
|
29 | else {
|
30 | observer.next(value);
|
31 | f();
|
32 | }
|
33 | }
|
34 | })
|
35 | .catch(err => {
|
36 | if (!subscription.isUnsubscribed) {
|
37 | observer.error(err);
|
38 | }
|
39 | });
|
40 | };
|
41 | f();
|
42 | return subscription;
|
43 | }
|
44 | }
|
45 | export function toObservable(source) {
|
46 | return new AsyncIterableObservable(source);
|
47 | }
|
48 |
|
49 |
|