1 | import { Observable } from "./Observable.js";
|
2 | export function asyncMap(observable, mapFn, catchFn) {
|
3 | return new Observable(function (observer) {
|
4 | var next = observer.next, error = observer.error, complete = observer.complete;
|
5 | var activeCallbackCount = 0;
|
6 | var completed = false;
|
7 | var promiseQueue = {
|
8 | then: function (callback) {
|
9 | return new Promise(function (resolve) { return resolve(callback()); });
|
10 | },
|
11 | };
|
12 | function makeCallback(examiner, delegate) {
|
13 | if (examiner) {
|
14 | return function (arg) {
|
15 | ++activeCallbackCount;
|
16 | var both = function () { return examiner(arg); };
|
17 | promiseQueue = promiseQueue.then(both, both).then(function (result) {
|
18 | --activeCallbackCount;
|
19 | next && next.call(observer, result);
|
20 | if (completed) {
|
21 | handler.complete();
|
22 | }
|
23 | }, function (error) {
|
24 | --activeCallbackCount;
|
25 | throw error;
|
26 | }).catch(function (caught) {
|
27 | error && error.call(observer, caught);
|
28 | });
|
29 | };
|
30 | }
|
31 | else {
|
32 | return function (arg) { return delegate && delegate.call(observer, arg); };
|
33 | }
|
34 | }
|
35 | var handler = {
|
36 | next: makeCallback(mapFn, next),
|
37 | error: makeCallback(catchFn, error),
|
38 | complete: function () {
|
39 | completed = true;
|
40 | if (!activeCallbackCount) {
|
41 | complete && complete.call(observer);
|
42 | }
|
43 | },
|
44 | };
|
45 | var sub = observable.subscribe(handler);
|
46 | return function () { return sub.unsubscribe(); };
|
47 | });
|
48 | }
|
49 |
|
\ | No newline at end of file |