1 | 'use strict';
|
2 |
|
3 | var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
|
4 |
|
5 | var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
|
6 |
|
7 | var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn');
|
8 |
|
9 | var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2);
|
10 |
|
11 | var _inherits2 = require('babel-runtime/helpers/inherits');
|
12 |
|
13 | var _inherits3 = _interopRequireDefault(_inherits2);
|
14 |
|
15 | var _AsyncSubject = require('rxjs/AsyncSubject');
|
16 |
|
17 | var _BehaviorSubject = require('rxjs/BehaviorSubject');
|
18 |
|
19 | var _Subject2 = require('rxjs/Subject');
|
20 |
|
21 | var _Observable = require('rxjs/Observable');
|
22 |
|
23 | var _merge = require('rxjs/observable/merge');
|
24 |
|
25 | var _filter = require('rxjs/operator/filter');
|
26 |
|
27 | var _share = require('rxjs/operator/share');
|
28 |
|
29 | var _shim = require('./shim.js');
|
30 |
|
31 | var _serialization = require('./serialization.js');
|
32 |
|
33 | var _logging = require('./logging.js');
|
34 |
|
35 | function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
|
36 |
|
37 | var PROTOCOL_VERSION = 'rethinkdb-horizon-v0';
|
38 |
|
39 |
|
40 | var STATUS_UNCONNECTED = { type: 'unconnected' };
|
41 |
|
42 | var STATUS_CONNECTED = { type: 'connected' };
|
43 |
|
44 | var STATUS_READY = { type: 'ready' };
|
45 |
|
46 | var STATUS_ERROR = { type: 'error' };
|
47 |
|
48 | var STATUS_DISCONNECTED = { type: 'disconnected' };
|
49 |
|
50 | var ProtocolError = function (_Error) {
|
51 | (0, _inherits3.default)(ProtocolError, _Error);
|
52 |
|
53 | function ProtocolError(msg, errorCode) {
|
54 | (0, _classCallCheck3.default)(this, ProtocolError);
|
55 |
|
56 | var _this = (0, _possibleConstructorReturn3.default)(this, _Error.call(this, msg));
|
57 |
|
58 | _this.errorCode = errorCode;
|
59 | return _this;
|
60 | }
|
61 |
|
62 | ProtocolError.prototype.toString = function toString() {
|
63 | return this.message + ' (Code: ' + this.errorCode + ')';
|
64 | };
|
65 |
|
66 | return ProtocolError;
|
67 | }(Error);
|
68 |
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 | var HorizonSocket = function (_Subject) {
|
79 | (0, _inherits3.default)(HorizonSocket, _Subject);
|
80 |
|
81 | function HorizonSocket(host, secure, path, handshaker) {
|
82 | var _context;
|
83 |
|
84 | (0, _classCallCheck3.default)(this, HorizonSocket);
|
85 |
|
86 | var hostString = 'ws' + (secure ? 's' : '') + '://' + host + '/' + path;
|
87 | var msgBuffer = [];
|
88 | var ws = void 0,
|
89 | handshakeDisp = void 0;
|
90 |
|
91 |
|
92 | var handshake = new _AsyncSubject.AsyncSubject();
|
93 | var statusSubject = new _BehaviorSubject.BehaviorSubject(STATUS_UNCONNECTED);
|
94 |
|
95 | var isOpen = function isOpen() {
|
96 | return Boolean(ws) && ws.readyState === _shim.WebSocket.OPEN;
|
97 | };
|
98 |
|
99 |
|
100 | function wsSend(msg) {
|
101 | var stringMsg = JSON.stringify((0, _serialization.serialize)(msg));
|
102 | ws.send(stringMsg);
|
103 | }
|
104 |
|
105 |
|
106 |
|
107 | var socketObservable = (_context = _Observable.Observable.create(function (subscriber) {
|
108 | ws = new _shim.WebSocket(hostString, PROTOCOL_VERSION);
|
109 | ws.onerror = function () {
|
110 |
|
111 |
|
112 |
|
113 |
|
114 | statusSubject.next(STATUS_ERROR);
|
115 | var errMsg = 'Websocket ' + hostString + ' experienced an error';
|
116 | subscriber.error(new Error(errMsg));
|
117 | };
|
118 | ws.onopen = function () {
|
119 |
|
120 | statusSubject.next(STATUS_CONNECTED);
|
121 | handshakeDisp = _this2.makeRequest(handshaker()).subscribe(function (x) {
|
122 | handshake.next(x);
|
123 | handshake.complete();
|
124 |
|
125 | handshake.next(STATUS_READY);
|
126 | }, function (err) {
|
127 | return handshake.error(err);
|
128 | }, function () {
|
129 | return handshake.complete();
|
130 | });
|
131 |
|
132 | while (msgBuffer.length > 0) {
|
133 | var msg = msgBuffer.shift();
|
134 | (0, _logging.log)('Sending buffered:', msg);
|
135 | wsSend(msg);
|
136 | }
|
137 | };
|
138 | ws.onmessage = function (event) {
|
139 | var deserialized = (0, _serialization.deserialize)(JSON.parse(event.data));
|
140 | (0, _logging.log)('Received', deserialized);
|
141 | subscriber.next(deserialized);
|
142 | };
|
143 | ws.onclose = function (e) {
|
144 |
|
145 |
|
146 |
|
147 | statusSubject.next(STATUS_DISCONNECTED);
|
148 | if (e.code !== 1000 || !e.wasClean) {
|
149 | subscriber.error(new Error('Socket closed unexpectedly with code: ' + e.code));
|
150 | } else {
|
151 | subscriber.complete();
|
152 | }
|
153 | };
|
154 | return function () {
|
155 | if (handshakeDisp) {
|
156 | handshakeDisp.unsubscribe();
|
157 | }
|
158 |
|
159 | closeSocket(1000, '');
|
160 | };
|
161 | }), _share.share).call(_context);
|
162 |
|
163 |
|
164 |
|
165 |
|
166 |
|
167 |
|
168 | var socketSubscriber = {
|
169 | next: function next(messageToSend) {
|
170 |
|
171 |
|
172 | if (isOpen()) {
|
173 | (0, _logging.log)('Sending', messageToSend);
|
174 | wsSend(messageToSend);
|
175 | } else {
|
176 | (0, _logging.log)('Buffering', messageToSend);
|
177 | msgBuffer.push(messageToSend);
|
178 | }
|
179 | },
|
180 | error: function error(_error) {
|
181 |
|
182 |
|
183 | if (!_error.code) {
|
184 | throw new Error('no code specified. Be sure to pass ' + '{ code: ###, reason: "" } to error()');
|
185 | }
|
186 | closeSocket(_error.code, _error.reason);
|
187 | },
|
188 | complete: function complete() {
|
189 |
|
190 |
|
191 | closeSocket(1000, '');
|
192 | }
|
193 | };
|
194 |
|
195 | function closeSocket(code, reason) {
|
196 | statusSubject.next(STATUS_DISCONNECTED);
|
197 | if (!code) {
|
198 | ws.close();
|
199 | } else {
|
200 | ws.close(code, reason);
|
201 | }
|
202 | ws.onopen = undefined;
|
203 | ws.onclose = undefined;
|
204 | ws.onmessage = undefined;
|
205 | }
|
206 |
|
207 |
|
208 |
|
209 |
|
210 |
|
211 | var _this2 = (0, _possibleConstructorReturn3.default)(this, _Subject.call(this, socketSubscriber, socketObservable));
|
212 |
|
213 | var subscriptions = new _Subject2.Subject();
|
214 |
|
215 |
|
216 |
|
217 | var unsubscriptions = new _Subject2.Subject();
|
218 | var outgoing = _merge.merge.call(_Observable.Observable, subscriptions, unsubscriptions);
|
219 |
|
220 | var activeRequests = 0;
|
221 |
|
222 | var requestCounter = 0;
|
223 |
|
224 | var subDisp = null;
|
225 |
|
226 | _this2.handshake = handshake;
|
227 |
|
228 |
|
229 | _this2.status = statusSubject;
|
230 |
|
231 | var incrementActive = function incrementActive() {
|
232 | if (++activeRequests === 1) {
|
233 |
|
234 |
|
235 |
|
236 |
|
237 | subDisp = outgoing.subscribe(_this2);
|
238 | }
|
239 | };
|
240 |
|
241 |
|
242 |
|
243 | var decrementActive = function decrementActive() {
|
244 | if (--activeRequests === 0) {
|
245 | subDisp.unsubscribe();
|
246 | }
|
247 | };
|
248 |
|
249 |
|
250 | _this2.makeRequest = function (rawRequest) {
|
251 | return _Observable.Observable.create(function (reqSubscriber) {
|
252 |
|
253 | var request_id = requestCounter++;
|
254 |
|
255 |
|
256 | rawRequest.request_id = request_id;
|
257 | var unsubscribeRequest = void 0;
|
258 | if (rawRequest.type === 'subscribe') {
|
259 | unsubscribeRequest = { request_id: request_id, type: 'end_subscription' };
|
260 | }
|
261 |
|
262 |
|
263 | incrementActive();
|
264 |
|
265 |
|
266 | subscriptions.next(rawRequest);
|
267 |
|
268 |
|
269 | var unsubscribeFilter = _filter.filter.call(_this2, function (x) {
|
270 | return x.request_id === request_id;
|
271 | }).subscribe(function (resp) {
|
272 |
|
273 | if (resp.error !== undefined) {
|
274 | reqSubscriber.error(new ProtocolError(resp.error, resp.error_code));
|
275 | } else if (resp.data !== undefined || resp.token !== undefined) {
|
276 | try {
|
277 | reqSubscriber.next(resp);
|
278 | } catch (e) {}
|
279 | }
|
280 | if (resp.state === 'synced') {
|
281 |
|
282 | reqSubscriber.next({
|
283 | type: 'state',
|
284 | state: 'synced'
|
285 | });
|
286 | } else if (resp.state === 'complete') {
|
287 | reqSubscriber.complete();
|
288 | }
|
289 | }, function (err) {
|
290 | return reqSubscriber.error(err);
|
291 | }, function () {
|
292 | return reqSubscriber.complete();
|
293 | });
|
294 | return function () {
|
295 |
|
296 | if (unsubscribeRequest) {
|
297 | unsubscriptions.next(unsubscribeRequest);
|
298 | }
|
299 | decrementActive();
|
300 | unsubscribeFilter.unsubscribe();
|
301 | };
|
302 | });
|
303 | };
|
304 | return _this2;
|
305 | }
|
306 |
|
307 | return HorizonSocket;
|
308 | }(_Subject2.Subject);
|
309 |
|
310 | module.exports = HorizonSocket;
|
311 |
|
\ | No newline at end of file |