1 | const Emitter = require('events');
|
2 | const net = require('net');
|
3 | const tls = require('tls');
|
4 | const uuidV4 = require('uuid/v4') ;
|
5 | const debug = require('debug')('drachtio:agent');
|
6 | const noop = require('node-noop').noop;
|
7 | const CRLF = '\r\n' ;
|
8 | const assert = require('assert');
|
9 | const DEFAULT_PING_INTERVAL = 15000;
|
10 | const MIN_PING_INTERVAL = 5000;
|
11 | const MAX_PING_INTERVAL = 300000;
|
12 |
|
13 | function countSymbols(text) {
|
14 | return [...text].length;
|
15 | }
|
16 |
|
17 | module.exports = class WireProtocol extends Emitter {
|
18 |
|
19 | constructor(opts) {
|
20 | super() ;
|
21 |
|
22 | this._logger = opts.logger || noop ;
|
23 | this.mapIncomingMsg = new Map() ;
|
24 |
|
25 | this.enablePing = false;
|
26 | this.pingInterval = DEFAULT_PING_INTERVAL;
|
27 | this.mapTimerPing = new Map();
|
28 | }
|
29 |
|
30 | connect(opts) {
|
31 |
|
32 | let socket;
|
33 | assert.ok(typeof this.server === 'undefined', 'WireProtocol#connect: cannot be both client and server');
|
34 | this.host = opts.host ;
|
35 | this.port = opts.port ;
|
36 | this.reconnectOpts = opts.reconnect || {} ;
|
37 | this.reconnectVars = {} ;
|
38 | this._evalPingOpts(opts);
|
39 | if (opts.tls) {
|
40 | debug(`wp connecting (tls) to ${this.host}:${this.port}`);
|
41 | socket = tls.connect(opts.port, opts.host, opts.tls, () => {
|
42 | debug(`tls socket connected: ${socket.authorized}`);
|
43 | });
|
44 | }
|
45 | else {
|
46 | debug(`wp connecting (tcp) to ${this.host}:${this.port}`);
|
47 | socket = net.connect({
|
48 | port: opts.port,
|
49 | host: opts.host
|
50 | }) ;
|
51 | }
|
52 |
|
53 | socket.setKeepAlive(true);
|
54 | this.installListeners(socket) ;
|
55 | }
|
56 |
|
57 | _evalPingOpts(opts) {
|
58 | if (opts.enablePing === true) {
|
59 | this.enablePing = true;
|
60 | if (opts.pingInterval) {
|
61 | const interval = parseInt(opts.pingInterval);
|
62 | assert.ok(interval >= MIN_PING_INTERVAL,
|
63 | `Srf#connect: opts.pingInterval must be greater than or equal to ${MIN_PING_INTERVAL}`);
|
64 | assert.ok(interval <= MAX_PING_INTERVAL,
|
65 | `Srf#connect: opts.pingInterval must be less than or equal to ${MAX_PING_INTERVAL}`);
|
66 | this.pingInterval = interval;
|
67 | }
|
68 | }
|
69 | }
|
70 |
|
71 | startPinging(socket) {
|
72 | if (!this.enablePing) return;
|
73 | assert.ok(!this.mapTimerPing.has(socket), 'duplicate call to startPinging for this socket');
|
74 | const timerPing = setInterval(() => {
|
75 | if (socket && !socket.destroyed) {
|
76 | const msgId = this.send(socket, 'ping');
|
77 | this.emit('ping', {msgId, socket});
|
78 | }
|
79 | }, this.pingInterval);
|
80 | this.mapTimerPing.set(socket, timerPing);
|
81 | }
|
82 |
|
83 | _stopPinging(socket) {
|
84 | const timerPing = this.mapTimerPing.get(socket);
|
85 | if (timerPing) {
|
86 | clearInterval(timerPing);
|
87 | this.mapTimerPing.delete(socket);
|
88 | }
|
89 | }
|
90 |
|
91 | listen(opts) {
|
92 | assert.ok(typeof this.reconnectOpts === 'undefined', 'WireProtocol#listen: cannot be both server and client');
|
93 | this._evalPingOpts(opts);
|
94 |
|
95 | let useTls = false;
|
96 | if (opts.server instanceof net.Server) {
|
97 | this.server = opts.server ;
|
98 | }
|
99 | else if (opts.tls) {
|
100 | useTls = true;
|
101 | this.server = tls.createServer(opts.tls);
|
102 | this.server.listen(opts.port, opts.host);
|
103 | }
|
104 | else {
|
105 | this.server = net.createServer() ;
|
106 | this.server.listen(opts.port, opts.host) ;
|
107 | }
|
108 | this.server.on('listening', () => {
|
109 | debug(`wp listening on ${JSON.stringify(this.server.address())} for ${useTls ? 'tls' : 'tcp'} connections`);
|
110 | this.emit('listening');
|
111 | });
|
112 |
|
113 | if (useTls) {
|
114 | this.server.on('secureConnection', (socket) => {
|
115 | debug('wp tls handshake succeeded');
|
116 | socket.setKeepAlive(true);
|
117 | this.installListeners(socket);
|
118 | this.emit('connection', socket);
|
119 | });
|
120 | }
|
121 | else {
|
122 | this.server.on('connection', (socket) => {
|
123 | debug(`wp received connection from ${socket.remoteAddress}:${socket.remotePort}`);
|
124 | socket.setKeepAlive(true);
|
125 | this.installListeners(socket);
|
126 | this.emit('connection', socket);
|
127 | });
|
128 | }
|
129 | return this.server ;
|
130 | }
|
131 |
|
132 | get isServer() {
|
133 | return this.server ;
|
134 | }
|
135 |
|
136 | get isClient() {
|
137 | return !this.isServer ;
|
138 | }
|
139 |
|
140 | setLogger(logger) {
|
141 | this._logger = logger ;
|
142 | }
|
143 | removeLogger() {
|
144 | this._logger = function() {} ;
|
145 | }
|
146 |
|
147 | installListeners(socket) {
|
148 | socket.setEncoding('utf8') ;
|
149 |
|
150 | socket.on('error', (err) => {
|
151 | debug(`wp#on error - ${err} ${this.host}:${this.port}`);
|
152 | if (this.enablePing) this._stopPinging(socket);
|
153 |
|
154 | if (this.isServer || this.closing) {
|
155 | return;
|
156 | }
|
157 |
|
158 | this.emit('error', err, socket);
|
159 |
|
160 |
|
161 |
|
162 | this._onConnectionGone();
|
163 | });
|
164 |
|
165 | socket.on('connect', () => {
|
166 | debug(`wp#on connect ${this.host}:${this.port}`);
|
167 | if (this.isClient) {
|
168 | this.initializeRetryVars() ;
|
169 | }
|
170 | this.emit('connect', socket);
|
171 | }) ;
|
172 |
|
173 | socket.on('close', () => {
|
174 | debug(`wp#on close ${this.host}:${this.port}`);
|
175 | if (this.enablePing) this._stopPinging(socket);
|
176 | if (this.isClient) {
|
177 | this._onConnectionGone();
|
178 | }
|
179 | this.mapIncomingMsg.delete(socket) ;
|
180 | this.emit('close', socket) ;
|
181 | }) ;
|
182 |
|
183 | socket.on('data', this._onData.bind(this, socket)) ;
|
184 | }
|
185 |
|
186 | initializeRetryVars() {
|
187 | assert(this.isClient);
|
188 |
|
189 | this.reconnectVars.retryTimer = null;
|
190 | this.reconnectVars.retryTotaltime = 0;
|
191 | this.reconnectVars.retryDelay = 150;
|
192 | this.reconnectVars.retryBackoff = 1.7;
|
193 | this.reconnectVars.attempts = 1;
|
194 | }
|
195 |
|
196 | _onConnectionGone() {
|
197 | assert(this.isClient);
|
198 |
|
199 |
|
200 | if (this.reconnectVars.retryTimer) {
|
201 | debug('WireProtocol#connection_gone: retry is already in progress') ;
|
202 | return;
|
203 | }
|
204 |
|
205 |
|
206 | if (this.closing) {
|
207 | this.reconnectVars.retryTimer = null;
|
208 | return;
|
209 | }
|
210 |
|
211 | const nextDelay = Math.floor(this.reconnectVars.retryDelay * this.reconnectVars.retryBackoff);
|
212 | if (this.reconnectOpts.retryMaxDelay !== null && nextDelay > this.reconnectOpts.retryMaxDelay) {
|
213 | this.reconnectVars.retryDelay = this.reconnectOpts.retryMaxDelay;
|
214 | } else {
|
215 | this.reconnectVars.retryDelay = nextDelay;
|
216 | }
|
217 |
|
218 | if (this.reconnectOpts.maxAttempts && this.reconnectVars.attempts >= this.reconnectOpts.maxAttempts) {
|
219 | this.reconnectVars.retryTimer = null;
|
220 | return;
|
221 | }
|
222 |
|
223 | this.reconnectVars.attempts += 1;
|
224 | this.emit('reconnecting', {
|
225 | delay: this.reconnectVars.retryDelay,
|
226 | attempt: this.reconnectVars.attempts
|
227 | });
|
228 | this.reconnectVars.retryTimer = setTimeout(() => {
|
229 | this.reconnectVars.retryTotaltime += this.reconnectVars.retryDelay;
|
230 |
|
231 | if (this.reconnectOpts.connectTimeout && this.reconnectVars.retryTotaltime >= this.reconnectOpts.connectTimeout) {
|
232 | this.reconnectVars.retryTimer = null;
|
233 | console.error('WireProtocol#connection_gone: ' +
|
234 | `Couldn't get drachtio connection after ${this.reconnectVars.retryTotaltime} ms`);
|
235 | return;
|
236 | }
|
237 | this.socket = net.connect({
|
238 | port: this.port,
|
239 | host: this.host
|
240 | }) ;
|
241 | this.socket.setKeepAlive(true) ;
|
242 | this.installListeners(this.socket) ;
|
243 |
|
244 | this.reconnectVars.retryTimer = null;
|
245 | }, this.reconnectVars.retryDelay);
|
246 | }
|
247 |
|
248 | send(socket, msg) {
|
249 | const msgId = uuidV4() ;
|
250 | const s = msgId + '|' + msg ;
|
251 | socket.write(Buffer.byteLength(s, 'utf8') + '#' + s, () => {
|
252 | debug(`wp#send ${this.host}:${this.port} - ${s.length}#${s}`);
|
253 | }) ;
|
254 | this._logger('===>' + CRLF + Buffer.byteLength(s, 'utf8') + '#' + s + CRLF) ;
|
255 | return msgId ;
|
256 | }
|
257 |
|
258 | |
259 |
|
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 | processMessageBuffer(socket, obj, length, start) {
|
266 | let haveMsg = false;
|
267 |
|
268 | do {
|
269 | const msg = [...obj.incomingMsg].slice(start, start + length).join('');
|
270 | this.emit('msg', socket, msg);
|
271 | obj.incomingMsg = [...obj.incomingMsg].slice(start + length).join('');
|
272 |
|
273 |
|
274 | const arr = /^(\d{1,5})#/.exec(obj.incomingMsg);
|
275 | if (arr) {
|
276 | length = parseInt(arr[1]);
|
277 | start = arr[1].length + 1;
|
278 | haveMsg = countSymbols(obj.incomingMsg) >= length + start;
|
279 | }
|
280 | else haveMsg = false;
|
281 | } while (haveMsg);
|
282 | }
|
283 |
|
284 | _onData(socket, msg) {
|
285 | this._logger(`<===${CRLF}${msg}${CRLF}`) ;
|
286 | debug(`<===${msg}`) ;
|
287 |
|
288 | if (!this.mapIncomingMsg.has(socket)) {
|
289 | this.mapIncomingMsg.set(socket, {
|
290 | incomingMsg: '',
|
291 | length: -1
|
292 | });
|
293 | }
|
294 | const obj = this.mapIncomingMsg.get(socket) ;
|
295 |
|
296 |
|
297 | obj.incomingMsg += msg;
|
298 |
|
299 |
|
300 | const arr = /^(\d{1,5})#/.exec(obj.incomingMsg);
|
301 | if (arr) {
|
302 | const msgLength = parseInt(arr[1]);
|
303 | if (countSymbols(obj.incomingMsg) >= msgLength + arr[1].length + 1) {
|
304 | this.processMessageBuffer(socket, obj, msgLength, arr[1].length + 1);
|
305 | }
|
306 | return;
|
307 | }
|
308 | else if (obj.incomingMsg.match(/^\d{1,5}/)) {
|
309 |
|
310 | return;
|
311 | }
|
312 | const err = new Error(`invalid message from server, did not start with length specifier: ${obj.incomingMsg}`);
|
313 | if (this.isServer) {
|
314 | console.error(`invalid client message, closing socket: ${err}`);
|
315 | this.disconnect(socket);
|
316 | }
|
317 | else throw err;
|
318 | }
|
319 |
|
320 | disconnect(socket) {
|
321 | this.closing = true ;
|
322 | this.mapIncomingMsg.delete(socket);
|
323 | if (!socket) { throw new Error('socket is not connected or was not provided') ; }
|
324 | this._stopPinging(socket);
|
325 | socket.end() ;
|
326 | }
|
327 |
|
328 | close(callback) {
|
329 | assert.ok(this.isServer, 'WireProtocol#close only valid in outbound connection (server) mode');
|
330 | this.server.close(callback) ;
|
331 | }
|
332 |
|
333 | } ;
|