UNPKG

4.51 kBJavaScriptView Raw
1var _ = require("./_");
2var genIterator = require("./genIterator");
3var Promise = _.Promise;
4
5/**
6 * Create a composable observable object.
7 * Promise can't resolve multiple times, this class makes it possible, so
8 * that you can easily map, filter and even back pressure events in a promise way.
9 * For live example: [Double Click Demo](https://jsbin.com/niwuti/edit?html,js,output).
10 * @version_added v0.7.2
11 * @param {Function} executor `(next) ->` It's optional.
12 * @return {Observable}
13 * @example
14 * ```js
15 * var Observable = require("yaku/lib/Observable");
16 * var linear = new Observable();
17 *
18 * var x = 0;
19 * setInterval(linear.next, 1000, x++);
20 *
21 * // Wait for 2 sec then emit the next value.
22 * var quad = linear.subscribe(async x => {
23 * await sleep(2000);
24 * return x * x;
25 * });
26 *
27 * var another = linear.subscribe(x => -x);
28 *
29 * quad.subscribe(
30 * value => { console.log(value); },
31 * reason => { console.error(reason); }
32 * );
33 *
34 * // Emit error
35 * linear.error(new Error("reason"));
36 *
37 * // Unsubscribe an observable.
38 * quad.unsubscribe();
39 *
40 * // Unsubscribe all subscribers.
41 * linear.subscribers = [];
42 * ```
43 * @example
44 * Use it with DOM.
45 * ```js
46 * var filter = fn => v => fn(v) ? v : new Promise(() => {});
47 *
48 * var keyup = new Observable((next) => {
49 * document.querySelector('input').onkeyup = next;
50 * });
51 *
52 * var keyupText = keyup.subscribe(e => e.target.value);
53 *
54 * // Now we only get the input when the text length is greater than 3.
55 * var keyupTextGT3 = keyupText.subscribe(filter(text => text.length > 3));
56 *
57 * keyupTextGT3.subscribe(v => console.log(v));
58 * ```
59 */
60var Observable = module.exports = function Observable (executor) {
61 var self = this;
62
63 genHandler(self);
64
65 self.subscribers = [];
66
67 executor && executor(self.next, self.error);
68};
69
70_.extendPrototype(Observable, {
71
72 /**
73 * Emit a value.
74 * @param {Any} value
75 * so that the event will go to `onError` callback.
76 */
77 next: null,
78
79 /**
80 * Emit an error.
81 * @param {Any} value
82 */
83 error: null,
84
85 /**
86 * The publisher observable of this.
87 * @type {Observable}
88 */
89 publisher: null,
90
91 /**
92 * All the subscribers subscribed this observable.
93 * @type {Array}
94 */
95 subscribers: null,
96
97 /**
98 * It will create a new Observable, like promise.
99 * @param {Function} onNext
100 * @param {Function} onError
101 * @return {Observable}
102 */
103 subscribe: function (onNext, onError) {
104 var self = this, subscriber = new Observable();
105 subscriber._onNext = onNext;
106 subscriber._onError = onError;
107 subscriber._nextErr = genNextErr(subscriber.next);
108
109 subscriber.publisher = self;
110 self.subscribers.push(subscriber);
111
112 return subscriber;
113 },
114
115 /**
116 * Unsubscribe this.
117 */
118 unsubscribe: function () {
119 var publisher = this.publisher;
120 publisher && publisher.subscribers.splice(publisher.subscribers.indexOf(this), 1);
121 }
122
123});
124
125function genHandler (self) {
126 self.next = function (val) {
127 var i = 0, len = self.subscribers.length, subscriber;
128 while (i < len) {
129 subscriber = self.subscribers[i++];
130 Promise.resolve(val).then(
131 subscriber._onNext,
132 subscriber._onError
133 ).then(
134 subscriber.next,
135 subscriber._nextErr
136 );
137 }
138 };
139
140 self.error = function (err) {
141 self.next(Promise.reject(err));
142 };
143}
144
145function genNextErr (next) {
146 return function (reason) {
147 next(Promise.reject(reason));
148 };
149}
150
151/**
152 * Merge multiple observables into one.
153 * @version_added 0.9.6
154 * @param {Iterable} iterable
155 * @return {Observable}
156 * @example
157 * ```js
158 * var Observable = require("yaku/lib/Observable");
159 * var sleep = require("yaku/lib/sleep");
160 *
161 * var src = new Observable(next => setInterval(next, 1000, 0));
162 *
163 * var a = src.subscribe(v => v + 1; });
164 * var b = src.subscribe((v) => sleep(10, v + 2));
165 *
166 * var out = Observable.merge([a, b]);
167 *
168 * out.subscribe((v) => {
169 * console.log(v);
170 * })
171 * ```
172 */
173Observable.merge = function merge (iterable) {
174 var iter = genIterator(iterable);
175 return new Observable(function (next) {
176 var item;
177
178 function onError (e) {
179 next(Promise.reject(e));
180 }
181
182 while (!(item = iter.next()).done) {
183 item.value.subscribe(next, onError);
184 }
185 });
186};
187