UNPKG

7.08 kBJavaScriptView Raw
1'use strict';
2// https://github.com/tc39/proposal-observable
3var $ = require('../internals/export');
4var DESCRIPTORS = require('../internals/descriptors');
5var setSpecies = require('../internals/set-species');
6var aFunction = require('../internals/a-function');
7var anObject = require('../internals/an-object');
8var isObject = require('../internals/is-object');
9var anInstance = require('../internals/an-instance');
10var defineProperty = require('../internals/object-define-property').f;
11var createNonEnumerableProperty = require('../internals/create-non-enumerable-property');
12var redefineAll = require('../internals/redefine-all');
13var getIterator = require('../internals/get-iterator');
14var iterate = require('../internals/iterate');
15var hostReportErrors = require('../internals/host-report-errors');
16var wellKnownSymbol = require('../internals/well-known-symbol');
17var InternalStateModule = require('../internals/internal-state');
18
19var OBSERVABLE = wellKnownSymbol('observable');
20var getInternalState = InternalStateModule.get;
21var setInternalState = InternalStateModule.set;
22
23var getMethod = function (fn) {
24 return fn == null ? undefined : aFunction(fn);
25};
26
27var cleanupSubscription = function (subscriptionState) {
28 var cleanup = subscriptionState.cleanup;
29 if (cleanup) {
30 subscriptionState.cleanup = undefined;
31 try {
32 cleanup();
33 } catch (error) {
34 hostReportErrors(error);
35 }
36 }
37};
38
39var subscriptionClosed = function (subscriptionState) {
40 return subscriptionState.observer === undefined;
41};
42
43var close = function (subscription, subscriptionState) {
44 if (!DESCRIPTORS) {
45 subscription.closed = true;
46 var subscriptionObserver = subscriptionState.subscriptionObserver;
47 if (subscriptionObserver) subscriptionObserver.closed = true;
48 } subscriptionState.observer = undefined;
49};
50
51var Subscription = function (observer, subscriber) {
52 var subscriptionState = setInternalState(this, {
53 cleanup: undefined,
54 observer: anObject(observer),
55 subscriptionObserver: undefined
56 });
57 var start;
58 if (!DESCRIPTORS) this.closed = false;
59 try {
60 if (start = getMethod(observer.start)) start.call(observer, this);
61 } catch (error) {
62 hostReportErrors(error);
63 }
64 if (subscriptionClosed(subscriptionState)) return;
65 var subscriptionObserver = subscriptionState.subscriptionObserver = new SubscriptionObserver(this);
66 try {
67 var cleanup = subscriber(subscriptionObserver);
68 var subscription = cleanup;
69 if (cleanup != null) subscriptionState.cleanup = typeof cleanup.unsubscribe === 'function'
70 ? function () { subscription.unsubscribe(); }
71 : aFunction(cleanup);
72 } catch (error) {
73 subscriptionObserver.error(error);
74 return;
75 } if (subscriptionClosed(subscriptionState)) cleanupSubscription(subscriptionState);
76};
77
78Subscription.prototype = redefineAll({}, {
79 unsubscribe: function unsubscribe() {
80 var subscriptionState = getInternalState(this);
81 if (!subscriptionClosed(subscriptionState)) {
82 close(this, subscriptionState);
83 cleanupSubscription(subscriptionState);
84 }
85 }
86});
87
88if (DESCRIPTORS) defineProperty(Subscription.prototype, 'closed', {
89 configurable: true,
90 get: function () {
91 return subscriptionClosed(getInternalState(this));
92 }
93});
94
95var SubscriptionObserver = function (subscription) {
96 setInternalState(this, { subscription: subscription });
97 if (!DESCRIPTORS) this.closed = false;
98};
99
100SubscriptionObserver.prototype = redefineAll({}, {
101 next: function next(value) {
102 var subscriptionState = getInternalState(getInternalState(this).subscription);
103 if (!subscriptionClosed(subscriptionState)) {
104 var observer = subscriptionState.observer;
105 try {
106 var nextMethod = getMethod(observer.next);
107 if (nextMethod) nextMethod.call(observer, value);
108 } catch (error) {
109 hostReportErrors(error);
110 }
111 }
112 },
113 error: function error(value) {
114 var subscription = getInternalState(this).subscription;
115 var subscriptionState = getInternalState(subscription);
116 if (!subscriptionClosed(subscriptionState)) {
117 var observer = subscriptionState.observer;
118 close(subscription, subscriptionState);
119 try {
120 var errorMethod = getMethod(observer.error);
121 if (errorMethod) errorMethod.call(observer, value);
122 else hostReportErrors(value);
123 } catch (err) {
124 hostReportErrors(err);
125 } cleanupSubscription(subscriptionState);
126 }
127 },
128 complete: function complete() {
129 var subscription = getInternalState(this).subscription;
130 var subscriptionState = getInternalState(subscription);
131 if (!subscriptionClosed(subscriptionState)) {
132 var observer = subscriptionState.observer;
133 close(subscription, subscriptionState);
134 try {
135 var completeMethod = getMethod(observer.complete);
136 if (completeMethod) completeMethod.call(observer);
137 } catch (error) {
138 hostReportErrors(error);
139 } cleanupSubscription(subscriptionState);
140 }
141 }
142});
143
144if (DESCRIPTORS) defineProperty(SubscriptionObserver.prototype, 'closed', {
145 configurable: true,
146 get: function () {
147 return subscriptionClosed(getInternalState(getInternalState(this).subscription));
148 }
149});
150
151var $Observable = function Observable(subscriber) {
152 anInstance(this, $Observable, 'Observable');
153 setInternalState(this, { subscriber: aFunction(subscriber) });
154};
155
156redefineAll($Observable.prototype, {
157 subscribe: function subscribe(observer) {
158 var length = arguments.length;
159 return new Subscription(typeof observer === 'function' ? {
160 next: observer,
161 error: length > 1 ? arguments[1] : undefined,
162 complete: length > 2 ? arguments[2] : undefined
163 } : isObject(observer) ? observer : {}, getInternalState(this).subscriber);
164 }
165});
166
167redefineAll($Observable, {
168 from: function from(x) {
169 var C = typeof this === 'function' ? this : $Observable;
170 var observableMethod = getMethod(anObject(x)[OBSERVABLE]);
171 if (observableMethod) {
172 var observable = anObject(observableMethod.call(x));
173 return observable.constructor === C ? observable : new C(function (observer) {
174 return observable.subscribe(observer);
175 });
176 }
177 var iterator = getIterator(x);
178 return new C(function (observer) {
179 iterate(iterator, function (it, stop) {
180 observer.next(it);
181 if (observer.closed) return stop();
182 }, { IS_ITERATOR: true, INTERRUPTED: true });
183 observer.complete();
184 });
185 },
186 of: function of() {
187 var C = typeof this === 'function' ? this : $Observable;
188 var length = arguments.length;
189 var items = new Array(length);
190 var index = 0;
191 while (index < length) items[index] = arguments[index++];
192 return new C(function (observer) {
193 for (var i = 0; i < length; i++) {
194 observer.next(items[i]);
195 if (observer.closed) return;
196 } observer.complete();
197 });
198 }
199});
200
201createNonEnumerableProperty($Observable.prototype, OBSERVABLE, function () { return this; });
202
203$({ global: true }, {
204 Observable: $Observable
205});
206
207setSpecies('Observable');