UNPKG

11.5 kBJavaScriptView Raw
1// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
2
3;(function (factory) {
4 var objectTypes = {
5 'function': true,
6 'object': true
7 };
8
9 function checkGlobal(value) {
10 return (value && value.Object === Object) ? value : null;
11 }
12
13 var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
14 var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
15 var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
16 var freeSelf = checkGlobal(objectTypes[typeof self] && self);
17 var freeWindow = checkGlobal(objectTypes[typeof window] && window);
18 var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
19 var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
20 var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
21
22 // Because of build optimizers
23 if (typeof define === 'function' && define.amd) {
24 define(['./rx'], function (Rx, exports) {
25 return factory(root, exports, Rx);
26 });
27 } else if (typeof module === 'object' && module && module.exports === freeExports) {
28 module.exports = factory(root, module.exports, require('./rx'));
29 } else {
30 root.Rx = factory(root, {}, root.Rx);
31 }
32}.call(this, function (root, exp, Rx, undefined) {
33
34 // Aliases
35 var Observable = Rx.Observable,
36 observableProto = Observable.prototype,
37 AnonymousObservable = Rx.AnonymousObservable,
38 observableThrow = Observable.throwError,
39 observerCreate = Rx.Observer.create,
40 SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
41 CompositeDisposable = Rx.CompositeDisposable,
42 AbstractObserver = Rx.internals.AbstractObserver,
43 noop = Rx.helpers.noop,
44 inherits = Rx.internals.inherits,
45 isFunction = Rx.helpers.isFunction;
46
47 var errorObj = {e: {}};
48
49 function tryCatcherGen(tryCatchTarget) {
50 return function tryCatcher() {
51 try {
52 return tryCatchTarget.apply(this, arguments);
53 } catch (e) {
54 errorObj.e = e;
55 return errorObj;
56 }
57 };
58 }
59
60 var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
61 if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
62 return tryCatcherGen(fn);
63 };
64
65 function thrower(e) {
66 throw e;
67 }
68
69 var Map = root.Map || (function () {
70 function Map() {
71 this.size = 0;
72 this._values = [];
73 this._keys = [];
74 }
75
76 Map.prototype['delete'] = function (key) {
77 var i = this._keys.indexOf(key);
78 if (i === -1) { return false; }
79 this._values.splice(i, 1);
80 this._keys.splice(i, 1);
81 this.size--;
82 return true;
83 };
84
85 Map.prototype.get = function (key) {
86 var i = this._keys.indexOf(key);
87 return i === -1 ? undefined : this._values[i];
88 };
89
90 Map.prototype.set = function (key, value) {
91 var i = this._keys.indexOf(key);
92 if (i === -1) {
93 this._keys.push(key);
94 this._values.push(value);
95 this.size++;
96 } else {
97 this._values[i] = value;
98 }
99 return this;
100 };
101
102 Map.prototype.forEach = function (cb, thisArg) {
103 for (var i = 0; i < this.size; i++) {
104 cb.call(thisArg, this._values[i], this._keys[i]);
105 }
106 };
107
108 return Map;
109 }());
110
111 /**
112 * @constructor
113 * Represents a join pattern over observable sequences.
114 */
115 function Pattern(patterns) {
116 this.patterns = patterns;
117 }
118
119 /**
120 * Creates a pattern that matches the current plan matches and when the specified observable sequences has an available value.
121 * @param other Observable sequence to match in addition to the current pattern.
122 * @return {Pattern} Pattern object that matches when all observable sequences in the pattern have an available value.
123 */
124 Pattern.prototype.and = function (other) {
125 return new Pattern(this.patterns.concat(other));
126 };
127
128 /**
129 * Matches when all observable sequences in the pattern (specified using a chain of and operators) have an available value and projects the values.
130 * @param {Function} selector Selector that will be invoked with available values from the source sequences, in the same order of the sequences in the pattern.
131 * @return {Plan} Plan that produces the projected values, to be fed (with other plans) to the when operator.
132 */
133 Pattern.prototype.thenDo = function (selector) {
134 return new Plan(this, selector);
135 };
136
137 function Plan(expression, selector) {
138 this.expression = expression;
139 this.selector = selector;
140 }
141
142 function handleOnError(o) { return function (e) { o.onError(e); }; }
143 function handleOnNext(self, observer) {
144 return function onNext () {
145 var result = tryCatch(self.selector).apply(self, arguments);
146 if (result === errorObj) { return observer.onError(result.e); }
147 observer.onNext(result);
148 };
149 }
150
151 Plan.prototype.activate = function (externalSubscriptions, observer, deactivate) {
152 var joinObservers = [], errHandler = handleOnError(observer);
153 for (var i = 0, len = this.expression.patterns.length; i < len; i++) {
154 joinObservers.push(planCreateObserver(externalSubscriptions, this.expression.patterns[i], errHandler));
155 }
156 var activePlan = new ActivePlan(joinObservers, handleOnNext(this, observer), function () {
157 for (var j = 0, jlen = joinObservers.length; j < jlen; j++) {
158 joinObservers[j].removeActivePlan(activePlan);
159 }
160 deactivate(activePlan);
161 });
162 for (i = 0, len = joinObservers.length; i < len; i++) {
163 joinObservers[i].addActivePlan(activePlan);
164 }
165 return activePlan;
166 };
167
168 function planCreateObserver(externalSubscriptions, observable, onError) {
169 var entry = externalSubscriptions.get(observable);
170 if (!entry) {
171 var observer = new JoinObserver(observable, onError);
172 externalSubscriptions.set(observable, observer);
173 return observer;
174 }
175 return entry;
176 }
177
178 function ActivePlan(joinObserverArray, onNext, onCompleted) {
179 this.joinObserverArray = joinObserverArray;
180 this.onNext = onNext;
181 this.onCompleted = onCompleted;
182 this.joinObservers = new Map();
183 for (var i = 0, len = this.joinObserverArray.length; i < len; i++) {
184 var joinObserver = this.joinObserverArray[i];
185 this.joinObservers.set(joinObserver, joinObserver);
186 }
187 }
188
189 ActivePlan.prototype.dequeue = function () {
190 this.joinObservers.forEach(function (v) { v.queue.shift(); });
191 };
192
193 ActivePlan.prototype.match = function () {
194 var i, len, hasValues = true;
195 for (i = 0, len = this.joinObserverArray.length; i < len; i++) {
196 if (this.joinObserverArray[i].queue.length === 0) {
197 hasValues = false;
198 break;
199 }
200 }
201 if (hasValues) {
202 var firstValues = [],
203 isCompleted = false;
204 for (i = 0, len = this.joinObserverArray.length; i < len; i++) {
205 firstValues.push(this.joinObserverArray[i].queue[0]);
206 this.joinObserverArray[i].queue[0].kind === 'C' && (isCompleted = true);
207 }
208 if (isCompleted) {
209 this.onCompleted();
210 } else {
211 this.dequeue();
212 var values = [];
213 for (i = 0, len = firstValues.length; i < firstValues.length; i++) {
214 values.push(firstValues[i].value);
215 }
216 this.onNext.apply(this, values);
217 }
218 }
219 };
220
221 var JoinObserver = (function (__super__) {
222 inherits(JoinObserver, __super__);
223
224 function JoinObserver(source, onError) {
225 __super__.call(this);
226 this.source = source;
227 this.onError = onError;
228 this.queue = [];
229 this.activePlans = [];
230 this.subscription = new SingleAssignmentDisposable();
231 this.isDisposed = false;
232 }
233
234 var JoinObserverPrototype = JoinObserver.prototype;
235
236 JoinObserverPrototype.next = function (notification) {
237 if (!this.isDisposed) {
238 if (notification.kind === 'E') {
239 return this.onError(notification.error);
240 }
241 this.queue.push(notification);
242 var activePlans = this.activePlans.slice(0);
243 for (var i = 0, len = activePlans.length; i < len; i++) {
244 activePlans[i].match();
245 }
246 }
247 };
248
249 JoinObserverPrototype.error = noop;
250 JoinObserverPrototype.completed = noop;
251
252 JoinObserverPrototype.addActivePlan = function (activePlan) {
253 this.activePlans.push(activePlan);
254 };
255
256 JoinObserverPrototype.subscribe = function () {
257 this.subscription.setDisposable(this.source.materialize().subscribe(this));
258 };
259
260 JoinObserverPrototype.removeActivePlan = function (activePlan) {
261 this.activePlans.splice(this.activePlans.indexOf(activePlan), 1);
262 this.activePlans.length === 0 && this.dispose();
263 };
264
265 JoinObserverPrototype.dispose = function () {
266 __super__.prototype.dispose.call(this);
267 if (!this.isDisposed) {
268 this.isDisposed = true;
269 this.subscription.dispose();
270 }
271 };
272
273 return JoinObserver;
274 } (AbstractObserver));
275
276 /**
277 * Creates a pattern that matches when both observable sequences have an available value.
278 *
279 * @param right Observable sequence to match with the current sequence.
280 * @return {Pattern} Pattern object that matches when both observable sequences have an available value.
281 */
282 observableProto.and = function (right) {
283 return new Pattern([this, right]);
284 };
285
286 /**
287 * Matches when the observable sequence has an available value and projects the value.
288 *
289 * @param {Function} selector Selector that will be invoked for values in the source sequence.
290 * @returns {Plan} Plan that produces the projected values, to be fed (with other plans) to the when operator.
291 */
292 observableProto.thenDo = function (selector) {
293 return new Pattern([this]).thenDo(selector);
294 };
295
296 /**
297 * Joins together the results from several patterns.
298 *
299 * @param plans A series of plans (specified as an Array of as a series of arguments) created by use of the Then operator on patterns.
300 * @returns {Observable} Observable sequence with the results form matching several patterns.
301 */
302 Observable.when = function () {
303 var len = arguments.length, plans;
304 if (Array.isArray(arguments[0])) {
305 plans = arguments[0];
306 } else {
307 plans = new Array(len);
308 for(var i = 0; i < len; i++) { plans[i] = arguments[i]; }
309 }
310 return new AnonymousObservable(function (o) {
311 var activePlans = [],
312 externalSubscriptions = new Map();
313 var outObserver = observerCreate(
314 function (x) { o.onNext(x); },
315 function (err) {
316 externalSubscriptions.forEach(function (v) { v.onError(err); });
317 o.onError(err);
318 },
319 function (x) { o.onCompleted(); }
320 );
321 try {
322 for (var i = 0, len = plans.length; i < len; i++) {
323 activePlans.push(plans[i].activate(externalSubscriptions, outObserver, function (activePlan) {
324 var idx = activePlans.indexOf(activePlan);
325 activePlans.splice(idx, 1);
326 activePlans.length === 0 && o.onCompleted();
327 }));
328 }
329 } catch (e) {
330 observableThrow(e).subscribe(o);
331 }
332 var group = new CompositeDisposable();
333 externalSubscriptions.forEach(function (joinObserver) {
334 joinObserver.subscribe();
335 group.add(joinObserver);
336 });
337
338 return group;
339 });
340 };
341
342 return Rx;
343}));