UNPKG

11.5 kBJavaScriptView Raw
1'use strict';
2
3var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
4
5var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
6
7var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn');
8
9var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2);
10
11var _inherits2 = require('babel-runtime/helpers/inherits');
12
13var _inherits3 = _interopRequireDefault(_inherits2);
14
15var _AsyncSubject = require('rxjs/AsyncSubject');
16
17var _BehaviorSubject = require('rxjs/BehaviorSubject');
18
19var _Subject2 = require('rxjs/Subject');
20
21var _Observable = require('rxjs/Observable');
22
23var _merge = require('rxjs/observable/merge');
24
25var _filter = require('rxjs/operator/filter');
26
27var _share = require('rxjs/operator/share');
28
29var _shim = require('./shim.js');
30
31var _serialization = require('./serialization.js');
32
33var _logging = require('./logging.js');
34
35function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
36
37var PROTOCOL_VERSION = 'rethinkdb-horizon-v0';
38
39// Before connecting the first time
40var STATUS_UNCONNECTED = { type: 'unconnected' };
41// After the websocket is opened, but before handshake
42var STATUS_CONNECTED = { type: 'connected' };
43// After the websocket is opened and handshake is completed
44var STATUS_READY = { type: 'ready' };
45// After unconnected, maybe before or after connected. Any socket level error
46var STATUS_ERROR = { type: 'error' };
47// Occurs when the socket closes
48var STATUS_DISCONNECTED = { type: 'disconnected' };
49
50var ProtocolError = function (_Error) {
51 (0, _inherits3.default)(ProtocolError, _Error);
52
53 function ProtocolError(msg, errorCode) {
54 (0, _classCallCheck3.default)(this, ProtocolError);
55
56 var _this = (0, _possibleConstructorReturn3.default)(this, _Error.call(this, msg));
57
58 _this.errorCode = errorCode;
59 return _this;
60 }
61
62 ProtocolError.prototype.toString = function toString() {
63 return this.message + ' (Code: ' + this.errorCode + ')';
64 };
65
66 return ProtocolError;
67}(Error);
68
69// Wraps native websockets with a Subject, which is both an Subscriber
70// and an Observable (it is bi-directional after all!). This
71// implementation is adapted from Rx.DOM.fromWebSocket and
72// RxSocketSubject by Ben Lesh, but it also deals with some simple
73// protocol level things like serializing from/to JSON, routing
74// request_ids, looking at the `state` field to decide when an
75// observable is closed.
76
77
78var HorizonSocket = function (_Subject) {
79 (0, _inherits3.default)(HorizonSocket, _Subject);
80
81 function HorizonSocket(host, secure, path, handshaker) {
82 var _context;
83
84 (0, _classCallCheck3.default)(this, HorizonSocket);
85
86 var hostString = 'ws' + (secure ? 's' : '') + '://' + host + '/' + path;
87 var msgBuffer = [];
88 var ws = void 0,
89 handshakeDisp = void 0;
90 // Handshake is an asyncsubject because we want it to always cache
91 // the last value it received, like a promise
92 var handshake = new _AsyncSubject.AsyncSubject();
93 var statusSubject = new _BehaviorSubject.BehaviorSubject(STATUS_UNCONNECTED);
94
95 var isOpen = function isOpen() {
96 return Boolean(ws) && ws.readyState === _shim.WebSocket.OPEN;
97 };
98
99 // Serializes to a string before sending
100 function wsSend(msg) {
101 var stringMsg = JSON.stringify((0, _serialization.serialize)(msg));
102 ws.send(stringMsg);
103 }
104
105 // This is the observable part of the Subject. It forwards events
106 // from the underlying websocket
107 var socketObservable = (_context = _Observable.Observable.create(function (subscriber) {
108 ws = new _shim.WebSocket(hostString, PROTOCOL_VERSION);
109 ws.onerror = function () {
110 // If the websocket experiences the error, we forward it through
111 // to the observable. Unfortunately, the event we receive in
112 // this callback doesn't tell us much of anything, so there's no
113 // reason to forward it on and we just send a generic error.
114 statusSubject.next(STATUS_ERROR);
115 var errMsg = 'Websocket ' + hostString + ' experienced an error';
116 subscriber.error(new Error(errMsg));
117 };
118 ws.onopen = function () {
119 // Send the handshake
120 statusSubject.next(STATUS_CONNECTED);
121 handshakeDisp = _this2.makeRequest(handshaker()).subscribe(function (x) {
122 handshake.next(x);
123 handshake.complete();
124
125 handshake.next(STATUS_READY);
126 }, function (err) {
127 return handshake.error(err);
128 }, function () {
129 return handshake.complete();
130 });
131 // Send any messages that have been buffered
132 while (msgBuffer.length > 0) {
133 var msg = msgBuffer.shift();
134 (0, _logging.log)('Sending buffered:', msg);
135 wsSend(msg);
136 }
137 };
138 ws.onmessage = function (event) {
139 var deserialized = (0, _serialization.deserialize)(JSON.parse(event.data));
140 (0, _logging.log)('Received', deserialized);
141 subscriber.next(deserialized);
142 };
143 ws.onclose = function (e) {
144 // This will happen if the socket is closed by the server If
145 // .close is called from the client (see closeSocket), this
146 // listener will be removed
147 statusSubject.next(STATUS_DISCONNECTED);
148 if (e.code !== 1000 || !e.wasClean) {
149 subscriber.error(new Error('Socket closed unexpectedly with code: ' + e.code));
150 } else {
151 subscriber.complete();
152 }
153 };
154 return function () {
155 if (handshakeDisp) {
156 handshakeDisp.unsubscribe();
157 }
158 // This is the "unsubscribe" method on the final Subject
159 closeSocket(1000, '');
160 };
161 }), _share.share).call(_context); // This makes it a "hot" observable, and refCounts it
162 // Note possible edge cases: the `share` operator is equivalent to
163 // .multicast(() => new Subject()).refCount() // RxJS 5
164 // .multicast(new Subject()).refCount() // RxJS 4
165
166 // This is the Subscriber part of the Subject. How we can send stuff
167 // over the websocket
168 var socketSubscriber = {
169 next: function next(messageToSend) {
170 // When next is called on this subscriber
171 // Note: If we aren't ready, the message is silently dropped
172 if (isOpen()) {
173 (0, _logging.log)('Sending', messageToSend);
174 wsSend(messageToSend); // wsSend serializes to a string
175 } else {
176 (0, _logging.log)('Buffering', messageToSend);
177 msgBuffer.push(messageToSend);
178 }
179 },
180 error: function error(_error) {
181 // The subscriber is receiving an error. Better close the
182 // websocket with an error
183 if (!_error.code) {
184 throw new Error('no code specified. Be sure to pass ' + '{ code: ###, reason: "" } to error()');
185 }
186 closeSocket(_error.code, _error.reason);
187 },
188 complete: function complete() {
189 // complete for the subscriber here is equivalent to "close
190 // this socket successfully (which is what code 1000 is)"
191 closeSocket(1000, '');
192 }
193 };
194
195 function closeSocket(code, reason) {
196 statusSubject.next(STATUS_DISCONNECTED);
197 if (!code) {
198 ws.close(); // successful close
199 } else {
200 ws.close(code, reason);
201 }
202 ws.onopen = undefined;
203 ws.onclose = undefined;
204 ws.onmessage = undefined;
205 }
206
207 // Subscriptions will be the observable containing all
208 // queries/writes/changefeed requests. Specifically, the documents
209 // that initiate them, each one with a different request_id
210
211 var _this2 = (0, _possibleConstructorReturn3.default)(this, _Subject.call(this, socketSubscriber, socketObservable));
212
213 var subscriptions = new _Subject2.Subject();
214 // Unsubscriptions is similar, only it holds only requests to
215 // close a particular request_id on the server. Currently we only
216 // need these for changefeeds.
217 var unsubscriptions = new _Subject2.Subject();
218 var outgoing = _merge.merge.call(_Observable.Observable, subscriptions, unsubscriptions);
219 // How many requests are outstanding
220 var activeRequests = 0;
221 // Monotonically increasing counter for request_ids
222 var requestCounter = 0;
223 // Unsubscriber for subscriptions/unsubscriptions
224 var subDisp = null;
225 // Now that super has been called, we can add attributes to this
226 _this2.handshake = handshake;
227 // Lets external users keep track of the current websocket status
228 // without causing it to connect
229 _this2.status = statusSubject;
230
231 var incrementActive = function incrementActive() {
232 if (++activeRequests === 1) {
233 // We subscribe the socket itself to the subscription and
234 // unsubscription requests. Since the socket is both an
235 // observable and an subscriber. Here it's acting as an subscriber,
236 // watching our requests.
237 subDisp = outgoing.subscribe(_this2);
238 }
239 };
240
241 // Decrement the number of active requests on the socket, and
242 // close the socket if we're the last request
243 var decrementActive = function decrementActive() {
244 if (--activeRequests === 0) {
245 subDisp.unsubscribe();
246 }
247 };
248
249 // This is used externally to send requests to the server
250 _this2.makeRequest = function (rawRequest) {
251 return _Observable.Observable.create(function (reqSubscriber) {
252 // Get a new request id
253 var request_id = requestCounter++;
254 // Add the request id to the request and the unsubscribe request
255 // if there is one
256 rawRequest.request_id = request_id;
257 var unsubscribeRequest = void 0;
258 if (rawRequest.type === 'subscribe') {
259 unsubscribeRequest = { request_id: request_id, type: 'end_subscription' };
260 }
261 // First, increment activeRequests and decide if we need to
262 // connect to the socket
263 incrementActive();
264
265 // Now send the request to the server
266 subscriptions.next(rawRequest);
267
268 // Create an observable from the socket that filters by request_id
269 var unsubscribeFilter = _filter.filter.call(_this2, function (x) {
270 return x.request_id === request_id;
271 }).subscribe(function (resp) {
272 // Need to faithfully end the stream if there is an error
273 if (resp.error !== undefined) {
274 reqSubscriber.error(new ProtocolError(resp.error, resp.error_code));
275 } else if (resp.data !== undefined || resp.token !== undefined) {
276 try {
277 reqSubscriber.next(resp);
278 } catch (e) {}
279 }
280 if (resp.state === 'synced') {
281 // Create a little dummy object for sync notifications
282 reqSubscriber.next({
283 type: 'state',
284 state: 'synced'
285 });
286 } else if (resp.state === 'complete') {
287 reqSubscriber.complete();
288 }
289 }, function (err) {
290 return reqSubscriber.error(err);
291 }, function () {
292 return reqSubscriber.complete();
293 });
294 return function () {
295 // Unsubscribe if necessary
296 if (unsubscribeRequest) {
297 unsubscriptions.next(unsubscribeRequest);
298 }
299 decrementActive();
300 unsubscribeFilter.unsubscribe();
301 };
302 });
303 };
304 return _this2;
305 }
306
307 return HorizonSocket;
308}(_Subject2.Subject);
309
310module.exports = HorizonSocket;
311//# sourceMappingURL=socket.js.map
\No newline at end of file