UNPKG

2.41 kBJavaScriptView Raw
1export function observableToAsyncIterable(observable) {
2 const pullQueue = [];
3 const pushQueue = [];
4 let listening = true;
5 const pushValue = (value) => {
6 if (pullQueue.length !== 0) {
7 // It is safe to use the ! operator here as we check the length.
8 pullQueue.shift()({ value, done: false });
9 }
10 else {
11 pushQueue.push({ value, done: false });
12 }
13 };
14 const pushError = (error) => {
15 if (pullQueue.length !== 0) {
16 // It is safe to use the ! operator here as we check the length.
17 pullQueue.shift()({ value: { errors: [error] }, done: false });
18 }
19 else {
20 pushQueue.push({ value: { errors: [error] }, done: false });
21 }
22 };
23 const pushDone = () => {
24 if (pullQueue.length !== 0) {
25 // It is safe to use the ! operator here as we check the length.
26 pullQueue.shift()({ done: true });
27 }
28 else {
29 pushQueue.push({ done: true });
30 }
31 };
32 const pullValue = () => new Promise(resolve => {
33 if (pushQueue.length !== 0) {
34 const element = pushQueue.shift();
35 // either {value: {errors: [...]}} or {value: ...}
36 resolve(element);
37 }
38 else {
39 pullQueue.push(resolve);
40 }
41 });
42 const subscription = observable.subscribe({
43 next(value) {
44 pushValue(value);
45 },
46 error(err) {
47 pushError(err);
48 },
49 complete() {
50 pushDone();
51 },
52 });
53 const emptyQueue = () => {
54 if (listening) {
55 listening = false;
56 subscription.unsubscribe();
57 for (const resolve of pullQueue) {
58 resolve({ value: undefined, done: true });
59 }
60 pullQueue.length = 0;
61 pushQueue.length = 0;
62 }
63 };
64 return {
65 next() {
66 // return is a defined method, so it is safe to call it.
67 return listening ? pullValue() : this.return();
68 },
69 return() {
70 emptyQueue();
71 return Promise.resolve({ value: undefined, done: true });
72 },
73 throw(error) {
74 emptyQueue();
75 return Promise.reject(error);
76 },
77 [Symbol.asyncIterator]() {
78 return this;
79 },
80 };
81}