UNPKG

10 kBJavaScriptView Raw
1const Emitter = require('events');
2const net = require('net');
3const tls = require('tls');
4const uuidV4 = require('uuid/v4') ;
5const debug = require('debug')('drachtio:agent');
6const noop = require('node-noop').noop;
7const CRLF = '\r\n' ;
8const assert = require('assert');
9const DEFAULT_PING_INTERVAL = 15000;
10const MIN_PING_INTERVAL = 5000;
11const MAX_PING_INTERVAL = 300000;
12
13function countSymbols(text) {
14 return [...text].length;
15}
16
17module.exports = class WireProtocol extends Emitter {
18
19 constructor(opts) {
20 super() ;
21
22 this._logger = opts.logger || noop ;
23 this.mapIncomingMsg = new Map() ;
24
25 this.enablePing = false;
26 this.pingInterval = DEFAULT_PING_INTERVAL;
27 this.mapTimerPing = new Map();
28 }
29
30 connect(opts) {
31 // inbound connection to drachtio server
32 let socket;
33 assert.ok(typeof this.server === 'undefined', 'WireProtocol#connect: cannot be both client and server');
34 this.host = opts.host ;
35 this.port = opts.port ;
36 this.reconnectOpts = opts.reconnect || {} ;
37 this.reconnectVars = {} ;
38 this._evalPingOpts(opts);
39 if (opts.tls) {
40 debug(`wp connecting (tls) to ${this.host}:${this.port}`);
41 socket = tls.connect(opts.port, opts.host, opts.tls, () => {
42 debug(`tls socket connected: ${socket.authorized}`);
43 });
44 }
45 else {
46 debug(`wp connecting (tcp) to ${this.host}:${this.port}`);
47 socket = net.connect({
48 port: opts.port,
49 host: opts.host
50 }) ;
51 }
52
53 socket.setKeepAlive(true);
54 this.installListeners(socket) ;
55 }
56
57 _evalPingOpts(opts) {
58 if (opts.enablePing === true) {
59 this.enablePing = true;
60 if (opts.pingInterval) {
61 const interval = parseInt(opts.pingInterval);
62 assert.ok(interval >= MIN_PING_INTERVAL,
63 `Srf#connect: opts.pingInterval must be greater than or equal to ${MIN_PING_INTERVAL}`);
64 assert.ok(interval <= MAX_PING_INTERVAL,
65 `Srf#connect: opts.pingInterval must be less than or equal to ${MAX_PING_INTERVAL}`);
66 this.pingInterval = interval;
67 }
68 }
69 }
70
71 startPinging(socket) {
72 if (!this.enablePing) return;
73 assert.ok(!this.mapTimerPing.has(socket), 'duplicate call to startPinging for this socket');
74 const timerPing = setInterval(() => {
75 if (socket && !socket.destroyed) {
76 const msgId = this.send(socket, 'ping');
77 this.emit('ping', {msgId, socket});
78 }
79 }, this.pingInterval);
80 this.mapTimerPing.set(socket, timerPing);
81 }
82
83 _stopPinging(socket) {
84 const timerPing = this.mapTimerPing.get(socket);
85 if (timerPing) {
86 clearInterval(timerPing);
87 this.mapTimerPing.delete(socket);
88 }
89 }
90
91 listen(opts) {
92 assert.ok(typeof this.reconnectOpts === 'undefined', 'WireProtocol#listen: cannot be both server and client');
93 this._evalPingOpts(opts);
94
95 let useTls = false;
96 if (opts.server instanceof net.Server) {
97 this.server = opts.server ;
98 }
99 else if (opts.tls) {
100 useTls = true;
101 this.server = tls.createServer(opts.tls);
102 this.server.listen(opts.port, opts.host);
103 }
104 else {
105 this.server = net.createServer() ;
106 this.server.listen(opts.port, opts.host) ;
107 }
108 this.server.on('listening', () => {
109 debug(`wp listening on ${JSON.stringify(this.server.address())} for ${useTls ? 'tls' : 'tcp'} connections`);
110 this.emit('listening');
111 });
112
113 if (useTls) {
114 this.server.on('secureConnection', (socket) => {
115 debug('wp tls handshake succeeded');
116 socket.setKeepAlive(true);
117 this.installListeners(socket);
118 this.emit('connection', socket);
119 });
120 }
121 else {
122 this.server.on('connection', (socket) => {
123 debug(`wp received connection from ${socket.remoteAddress}:${socket.remotePort}`);
124 socket.setKeepAlive(true);
125 this.installListeners(socket);
126 this.emit('connection', socket);
127 });
128 }
129 return this.server ;
130 }
131
132 get isServer() {
133 return this.server ;
134 }
135
136 get isClient() {
137 return !this.isServer ;
138 }
139
140 setLogger(logger) {
141 this._logger = logger ;
142 }
143 removeLogger() {
144 this._logger = function() {} ;
145 }
146
147 installListeners(socket) {
148 socket.setEncoding('utf8') ;
149
150 socket.on('error', (err) => {
151 debug(`wp#on error - ${err} ${this.host}:${this.port}`);
152 if (this.enablePing) this._stopPinging(socket);
153
154 if (this.isServer || this.closing) {
155 return;
156 }
157
158 this.emit('error', err, socket);
159
160 // "error" events get turned into exceptions if they aren't listened for. If the user handled this error
161 // then we should try to reconnect.
162 this._onConnectionGone();
163 });
164
165 socket.on('connect', () => {
166 debug(`wp#on connect ${this.host}:${this.port}`);
167 if (this.isClient) {
168 this.initializeRetryVars() ;
169 }
170 this.emit('connect', socket);
171 }) ;
172
173 socket.on('close', () => {
174 debug(`wp#on close ${this.host}:${this.port}`);
175 if (this.enablePing) this._stopPinging(socket);
176 if (this.isClient) {
177 this._onConnectionGone();
178 }
179 this.mapIncomingMsg.delete(socket) ;
180 this.emit('close', socket) ;
181 }) ;
182
183 socket.on('data', this._onData.bind(this, socket)) ;
184 }
185
186 initializeRetryVars() {
187 assert(this.isClient);
188
189 this.reconnectVars.retryTimer = null;
190 this.reconnectVars.retryTotaltime = 0;
191 this.reconnectVars.retryDelay = 150;
192 this.reconnectVars.retryBackoff = 1.7;
193 this.reconnectVars.attempts = 1;
194 }
195
196 _onConnectionGone() {
197 assert(this.isClient);
198
199 // If a retry is already in progress, just let that happen
200 if (this.reconnectVars.retryTimer) {
201 debug('WireProtocol#connection_gone: retry is already in progress') ;
202 return;
203 }
204
205 // If this is a requested shutdown, then don't retry
206 if (this.closing) {
207 this.reconnectVars.retryTimer = null;
208 return;
209 }
210
211 const nextDelay = Math.floor(this.reconnectVars.retryDelay * this.reconnectVars.retryBackoff);
212 if (this.reconnectOpts.retryMaxDelay !== null && nextDelay > this.reconnectOpts.retryMaxDelay) {
213 this.reconnectVars.retryDelay = this.reconnectOpts.retryMaxDelay;
214 } else {
215 this.reconnectVars.retryDelay = nextDelay;
216 }
217
218 if (this.reconnectOpts.maxAttempts && this.reconnectVars.attempts >= this.reconnectOpts.maxAttempts) {
219 this.reconnectVars.retryTimer = null;
220 return;
221 }
222
223 this.reconnectVars.attempts += 1;
224 this.emit('reconnecting', {
225 delay: this.reconnectVars.retryDelay,
226 attempt: this.reconnectVars.attempts
227 });
228 this.reconnectVars.retryTimer = setTimeout(() => {
229 this.reconnectVars.retryTotaltime += this.reconnectVars.retryDelay;
230
231 if (this.reconnectOpts.connectTimeout && this.reconnectVars.retryTotaltime >= this.reconnectOpts.connectTimeout) {
232 this.reconnectVars.retryTimer = null;
233 console.error('WireProtocol#connection_gone: ' +
234 `Couldn't get drachtio connection after ${this.reconnectVars.retryTotaltime} ms`);
235 return;
236 }
237 this.socket = net.connect({
238 port: this.port,
239 host: this.host
240 }) ;
241 this.socket.setKeepAlive(true) ;
242 this.installListeners(this.socket) ;
243
244 this.reconnectVars.retryTimer = null;
245 }, this.reconnectVars.retryDelay);
246 }
247
248 send(socket, msg) {
249 const msgId = uuidV4() ;
250 const s = msgId + '|' + msg ;
251 socket.write(Buffer.byteLength(s, 'utf8') + '#' + s, () => {
252 debug(`wp#send ${this.host}:${this.port} - ${s.length}#${s}`);
253 }) ;
254 this._logger('===>' + CRLF + Buffer.byteLength(s, 'utf8') + '#' + s + CRLF) ;
255 return msgId ;
256 }
257
258 /*
259 * Note: if you are wondering about the use of the spread operator,
260 * it is because we can get SIP messages with things like emojis in them;
261 * i.e. UTF8-encoded strings.
262 * See https://mathiasbynens.be/notes/javascript-unicode#other-grapheme-clusters for background
263 */
264
265 processMessageBuffer(socket, obj, length, start) {
266 let haveMsg = false;
267
268 do {
269 const msg = [...obj.incomingMsg].slice(start, start + length).join('');
270 this.emit('msg', socket, msg);
271 obj.incomingMsg = [...obj.incomingMsg].slice(start + length).join('');
272
273 // check for another message
274 const arr = /^(\d{1,5})#/.exec(obj.incomingMsg);
275 if (arr) {
276 length = parseInt(arr[1]);
277 start = arr[1].length + 1;
278 haveMsg = countSymbols(obj.incomingMsg) >= length + start;
279 }
280 else haveMsg = false;
281 } while (haveMsg);
282 }
283
284 _onData(socket, msg) {
285 this._logger(`<===${CRLF}${msg}${CRLF}`) ;
286 debug(`<===${msg}`) ;
287
288 if (!this.mapIncomingMsg.has(socket)) {
289 this.mapIncomingMsg.set(socket, {
290 incomingMsg: '',
291 length: -1
292 });
293 }
294 const obj = this.mapIncomingMsg.get(socket) ;
295
296 // append new text to cached
297 obj.incomingMsg += msg;
298
299 // check if we have a full message to process
300 const arr = /^(\d{1,5})#/.exec(obj.incomingMsg);
301 if (arr) {
302 const msgLength = parseInt(arr[1]);
303 if (countSymbols(obj.incomingMsg) >= msgLength + arr[1].length + 1) {
304 this.processMessageBuffer(socket, obj, msgLength, arr[1].length + 1);
305 }
306 return;
307 }
308 else if (obj.incomingMsg.match(/^\d{1,5}/)) {
309 // split in the middle of length specifier
310 return;
311 }
312 const err = new Error(`invalid message from server, did not start with length specifier: ${obj.incomingMsg}`);
313 if (this.isServer) {
314 console.error(`invalid client message, closing socket: ${err}`);
315 this.disconnect(socket);
316 }
317 else throw err;
318 }
319
320 disconnect(socket) {
321 this.closing = true ;
322 this.mapIncomingMsg.delete(socket);
323 if (!socket) { throw new Error('socket is not connected or was not provided') ; }
324 this._stopPinging(socket);
325 socket.end() ;
326 }
327
328 close(callback) {
329 assert.ok(this.isServer, 'WireProtocol#close only valid in outbound connection (server) mode');
330 this.server.close(callback) ;
331 }
332
333} ;