1 | import { observable as symbolObservable } from '../observer';
|
2 | const noop = (_) => {
|
3 |
|
4 | };
|
5 | function toObserver(next, error, complete) {
|
6 | if (next && typeof next === 'object') {
|
7 | const observer = next;
|
8 | return {
|
9 | next: (observer.next || noop).bind(observer),
|
10 | error: (observer.error || noop).bind(observer),
|
11 | complete: (observer.complete || noop).bind(observer)
|
12 | };
|
13 | }
|
14 | else {
|
15 | return {
|
16 | next: typeof next === 'function' ? next : noop,
|
17 | error: typeof error === 'function' ? error : noop,
|
18 | complete: typeof complete === 'function' ? complete : noop
|
19 | };
|
20 | }
|
21 | }
|
22 | class BooleanSubscription {
|
23 | constructor() {
|
24 | this.isUnsubscribed = false;
|
25 | }
|
26 | unsubscribe() {
|
27 | this.isUnsubscribed = true;
|
28 | }
|
29 | }
|
30 | class AsyncIterableObservable {
|
31 | constructor(source) {
|
32 | this._source = source;
|
33 | }
|
34 | [symbolObservable]() {
|
35 | return this;
|
36 | }
|
37 | subscribe(next, error, complete) {
|
38 | const observer = toObserver(next, error, complete);
|
39 | const subscription = new BooleanSubscription();
|
40 | const it = this._source[Symbol.asyncIterator]();
|
41 | const f = () => {
|
42 | it.next()
|
43 | .then(({ value, done }) => {
|
44 | if (!subscription.isUnsubscribed) {
|
45 | if (done) {
|
46 | observer.complete();
|
47 | }
|
48 | else {
|
49 | observer.next(value);
|
50 | f();
|
51 | }
|
52 | }
|
53 | })
|
54 | .catch(err => {
|
55 | if (!subscription.isUnsubscribed) {
|
56 | observer.error(err);
|
57 | }
|
58 | });
|
59 | };
|
60 | f();
|
61 | return subscription;
|
62 | }
|
63 | }
|
64 | export function toObservable(source) {
|
65 | return new AsyncIterableObservable(source);
|
66 | }
|
67 |
|
68 |
|