1 | var _ = require('./_');
|
2 | var genIterator = require('./genIterator');
|
3 | var Promise = _.Promise;
|
4 |
|
5 | /**
|
6 | * Create a composable observable object.
|
7 | * Promise can't resolve multiple times, this class makes it possible, so
|
8 | * that you can easily map, filter and even back pressure events in a promise way.
|
9 | * For live example: [Double Click Demo](https://jsbin.com/niwuti/edit?html,js,output).
|
10 | * @version_added v0.7.2
|
11 | * @param {Function} executor `(next) ->` It's optional.
|
12 | * @return {Observable}
|
13 | * @example
|
14 | * ```js
|
15 | * var Observable = require("yaku/lib/Observable");
|
16 | * var linear = new Observable();
|
17 | *
|
18 | * var x = 0;
|
19 | * setInterval(linear.next, 1000, x++);
|
20 | *
|
21 | * // Wait for 2 sec then emit the next value.
|
22 | * var quad = linear.subscribe(async x => {
|
23 | * await sleep(2000);
|
24 | * return x * x;
|
25 | * });
|
26 | *
|
27 | * var another = linear.subscribe(x => -x);
|
28 | *
|
29 | * quad.subscribe(
|
30 | * value => { console.log(value); },
|
31 | * reason => { console.error(reason); }
|
32 | * );
|
33 | *
|
34 | * // Emit error
|
35 | * linear.error(new Error("reason"));
|
36 | *
|
37 | * // Unsubscribe an observable.
|
38 | * quad.unsubscribe();
|
39 | *
|
40 | * // Unsubscribe all subscribers.
|
41 | * linear.subscribers = [];
|
42 | * ```
|
43 | * @example
|
44 | * Use it with DOM.
|
45 | * ```js
|
46 | * var filter = fn => v => fn(v) ? v : new Promise(() => {});
|
47 | *
|
48 | * var keyup = new Observable((next) => {
|
49 | * document.querySelector('input').onkeyup = next;
|
50 | * });
|
51 | *
|
52 | * var keyupText = keyup.subscribe(e => e.target.value);
|
53 | *
|
54 | * // Now we only get the input when the text length is greater than 3.
|
55 | * var keyupTextGT3 = keyupText.subscribe(filter(text => text.length > 3));
|
56 | *
|
57 | * keyupTextGT3.subscribe(v => console.log(v));
|
58 | * ```
|
59 | */
|
60 | var Observable = module.exports = function Observable (executor) {
|
61 | var self = this;
|
62 |
|
63 | genHandler(self);
|
64 |
|
65 | self.subscribers = [];
|
66 |
|
67 | executor && executor(self.next, self.error);
|
68 | };
|
69 |
|
70 | _.extendPrototype(Observable, {
|
71 |
|
72 | /**
|
73 | * Emit a value.
|
74 | * @param {Any} value
|
75 | * so that the event will go to `onError` callback.
|
76 | */
|
77 | next: null,
|
78 |
|
79 | /**
|
80 | * Emit an error.
|
81 | * @param {Any} value
|
82 | */
|
83 | error: null,
|
84 |
|
85 | /**
|
86 | * The publisher observable of this.
|
87 | * @type {Observable}
|
88 | */
|
89 | publisher: null,
|
90 |
|
91 | /**
|
92 | * All the subscribers subscribed this observable.
|
93 | * @type {Array}
|
94 | */
|
95 | subscribers: null,
|
96 |
|
97 | /**
|
98 | * It will create a new Observable, like promise.
|
99 | * @param {Function} onNext
|
100 | * @param {Function} onError
|
101 | * @return {Observable}
|
102 | */
|
103 | subscribe: function (onNext, onError) {
|
104 | var self = this, subscriber = new Observable();
|
105 | subscriber._onNext = onNext;
|
106 | subscriber._onError = onError;
|
107 | subscriber._nextErr = genNextErr(subscriber.next);
|
108 |
|
109 | subscriber.publisher = self;
|
110 | self.subscribers.push(subscriber);
|
111 |
|
112 | return subscriber;
|
113 | },
|
114 |
|
115 | /**
|
116 | * Unsubscribe this.
|
117 | */
|
118 | unsubscribe: function () {
|
119 | var publisher = this.publisher;
|
120 | publisher && publisher.subscribers.splice(publisher.subscribers.indexOf(this), 1);
|
121 | }
|
122 |
|
123 | });
|
124 |
|
125 | function genHandler (self) {
|
126 | self.next = function (val) {
|
127 | var i = 0, len = self.subscribers.length, subscriber;
|
128 | while (i < len) {
|
129 | subscriber = self.subscribers[i++];
|
130 | Promise.resolve(val).then(
|
131 | subscriber._onNext,
|
132 | subscriber._onError
|
133 | ).then(
|
134 | subscriber.next,
|
135 | subscriber._nextErr
|
136 | );
|
137 | }
|
138 | };
|
139 |
|
140 | self.error = function (err) {
|
141 | self.next(Promise.reject(err));
|
142 | };
|
143 | }
|
144 |
|
145 | function genNextErr (next) {
|
146 | return function (reason) {
|
147 | next(Promise.reject(reason));
|
148 | };
|
149 | }
|
150 |
|
151 | /**
|
152 | * Merge multiple observables into one.
|
153 | * @version_added 0.9.6
|
154 | * @param {Iterable} iterable
|
155 | * @return {Observable}
|
156 | * @example
|
157 | * ```js
|
158 | * var Observable = require("yaku/lib/Observable");
|
159 | * var sleep = require("yaku/lib/sleep");
|
160 | *
|
161 | * var src = new Observable(next => setInterval(next, 1000, 0));
|
162 | *
|
163 | * var a = src.subscribe(v => v + 1; });
|
164 | * var b = src.subscribe((v) => sleep(10, v + 2));
|
165 | *
|
166 | * var out = Observable.merge([a, b]);
|
167 | *
|
168 | * out.subscribe((v) => {
|
169 | * console.log(v);
|
170 | * })
|
171 | * ```
|
172 | */
|
173 | Observable.merge = function merge (iterable) {
|
174 | var iter = genIterator(iterable);
|
175 | return new Observable(function (next) {
|
176 | var item;
|
177 |
|
178 | function onError (e) {
|
179 | next(Promise.reject(e));
|
180 | }
|
181 |
|
182 | while (!(item = iter.next()).done) {
|
183 | item.value.subscribe(next, onError);
|
184 | }
|
185 | });
|
186 | };
|
187 |
|