UNPKG

4.98 kBJavaScriptView Raw
1import { Observable } from "observable-fns";
2const doNothing = () => undefined;
3const returnInput = (input) => input;
4const runDeferred = (fn) => Promise.resolve().then(fn);
5function fail(error) {
6 throw error;
7}
8function isThenable(thing) {
9 return thing && typeof thing.then === "function";
10}
11/**
12 * Creates a hybrid, combining the APIs of an Observable and a Promise.
13 *
14 * It is used to proxy async process states when we are initially not sure
15 * if that async process will yield values once (-> Promise) or multiple
16 * times (-> Observable).
17 *
18 * Note that the observable promise inherits some of the observable's characteristics:
19 * The `init` function will be called *once for every time anyone subscribes to it*.
20 *
21 * If this is undesired, derive a hot observable from it using `makeHot()` and
22 * subscribe to that.
23 */
24export class ObservablePromise extends Observable {
25 constructor(init) {
26 super(originalObserver => {
27 // tslint:disable-next-line no-this-assignment
28 const self = this;
29 const observer = Object.assign(Object.assign({}, originalObserver), { complete() {
30 originalObserver.complete();
31 self.onCompletion();
32 },
33 error(error) {
34 originalObserver.error(error);
35 self.onError(error);
36 },
37 next(value) {
38 originalObserver.next(value);
39 self.onNext(value);
40 } });
41 try {
42 this.initHasRun = true;
43 return init(observer);
44 }
45 catch (error) {
46 observer.error(error);
47 }
48 });
49 this.initHasRun = false;
50 this.fulfillmentCallbacks = [];
51 this.rejectionCallbacks = [];
52 this.firstValueSet = false;
53 this.state = "pending";
54 }
55 onNext(value) {
56 if (!this.firstValueSet) {
57 this.firstValue = value;
58 this.firstValueSet = true;
59 }
60 }
61 onError(error) {
62 this.state = "rejected";
63 this.rejection = error;
64 for (const onRejected of this.rejectionCallbacks) {
65 // Promisifying the call to turn errors into unhandled promise rejections
66 // instead of them failing sync and cancelling the iteration
67 runDeferred(() => onRejected(error));
68 }
69 }
70 onCompletion() {
71 this.state = "fulfilled";
72 for (const onFulfilled of this.fulfillmentCallbacks) {
73 // Promisifying the call to turn errors into unhandled promise rejections
74 // instead of them failing sync and cancelling the iteration
75 runDeferred(() => onFulfilled(this.firstValue));
76 }
77 }
78 then(onFulfilledRaw, onRejectedRaw) {
79 const onFulfilled = onFulfilledRaw || returnInput;
80 const onRejected = onRejectedRaw || fail;
81 let onRejectedCalled = false;
82 return new Promise((resolve, reject) => {
83 const rejectionCallback = (error) => {
84 if (onRejectedCalled)
85 return;
86 onRejectedCalled = true;
87 try {
88 resolve(onRejected(error));
89 }
90 catch (anotherError) {
91 reject(anotherError);
92 }
93 };
94 const fulfillmentCallback = (value) => {
95 try {
96 resolve(onFulfilled(value));
97 }
98 catch (error) {
99 rejectionCallback(error);
100 }
101 };
102 if (!this.initHasRun) {
103 this.subscribe({ error: rejectionCallback });
104 }
105 if (this.state === "fulfilled") {
106 return resolve(onFulfilled(this.firstValue));
107 }
108 if (this.state === "rejected") {
109 onRejectedCalled = true;
110 return resolve(onRejected(this.rejection));
111 }
112 this.fulfillmentCallbacks.push(fulfillmentCallback);
113 this.rejectionCallbacks.push(rejectionCallback);
114 });
115 }
116 catch(onRejected) {
117 return this.then(undefined, onRejected);
118 }
119 finally(onCompleted) {
120 const handler = onCompleted || doNothing;
121 return this.then((value) => {
122 handler();
123 return value;
124 }, () => handler());
125 }
126 static from(thing) {
127 if (isThenable(thing)) {
128 return new ObservablePromise(observer => {
129 const onFulfilled = (value) => {
130 observer.next(value);
131 observer.complete();
132 };
133 const onRejected = (error) => {
134 observer.error(error);
135 };
136 thing.then(onFulfilled, onRejected);
137 });
138 }
139 else {
140 return super.from(thing);
141 }
142 }
143}