UNPKG

2.58 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.observableToAsyncIterable = void 0;
4function observableToAsyncIterable(observable) {
5 const pullQueue = [];
6 const pushQueue = [];
7 let listening = true;
8 const pushValue = (value) => {
9 if (pullQueue.length !== 0) {
10 // It is safe to use the ! operator here as we check the length.
11 pullQueue.shift()({ value, done: false });
12 }
13 else {
14 pushQueue.push({ value, done: false });
15 }
16 };
17 const pushError = (error) => {
18 if (pullQueue.length !== 0) {
19 // It is safe to use the ! operator here as we check the length.
20 pullQueue.shift()({ value: { errors: [error] }, done: false });
21 }
22 else {
23 pushQueue.push({ value: { errors: [error] }, done: false });
24 }
25 };
26 const pushDone = () => {
27 if (pullQueue.length !== 0) {
28 // It is safe to use the ! operator here as we check the length.
29 pullQueue.shift()({ done: true });
30 }
31 else {
32 pushQueue.push({ done: true });
33 }
34 };
35 const pullValue = () => new Promise(resolve => {
36 if (pushQueue.length !== 0) {
37 const element = pushQueue.shift();
38 // either {value: {errors: [...]}} or {value: ...}
39 resolve(element);
40 }
41 else {
42 pullQueue.push(resolve);
43 }
44 });
45 const subscription = observable.subscribe({
46 next(value) {
47 pushValue(value);
48 },
49 error(err) {
50 pushError(err);
51 },
52 complete() {
53 pushDone();
54 },
55 });
56 const emptyQueue = () => {
57 if (listening) {
58 listening = false;
59 subscription.unsubscribe();
60 for (const resolve of pullQueue) {
61 resolve({ value: undefined, done: true });
62 }
63 pullQueue.length = 0;
64 pushQueue.length = 0;
65 }
66 };
67 return {
68 next() {
69 // return is a defined method, so it is safe to call it.
70 return listening ? pullValue() : this.return();
71 },
72 return() {
73 emptyQueue();
74 return Promise.resolve({ value: undefined, done: true });
75 },
76 throw(error) {
77 emptyQueue();
78 return Promise.reject(error);
79 },
80 [Symbol.asyncIterator]() {
81 return this;
82 },
83 };
84}
85exports.observableToAsyncIterable = observableToAsyncIterable;