UNPKG

25.5 kBJavaScriptView Raw
1const Emitter = require('events');
2const debug = require('debug')('drachtio:agent');
3const debugSocket = require('debug')('drachtio:socket');
4const WireProtocol = require('./wire-protocol') ;
5const SipMessage = require('drachtio-sip').SipMessage ;
6const Request = require('./request') ;
7const Response = require('./response') ;
8const DigestClient = require('./digest-client') ;
9const noop = require('node-noop').noop;
10const assert = require('assert');
11const net = require('net');
12const delegate = require('delegates') ;
13const tls = require('tls');
14const CR = '\r' ;
15const CRLF = '\r\n' ;
16
17const defer = typeof setImmediate === 'function' ?
18 setImmediate : function(fn) { process.nextTick(fn.bind.apply(fn, arguments)); } ;
19
20function typeSocket(socket) {
21 return socket instanceof net.Socket || socket instanceof tls.TLSSocket;
22}
23function sockPort(socket) {
24 assert(typeSocket(socket));
25 return '' + socket.remotePort + ':' + socket.localPort;
26}
27
28function serverVersionAtLeast(serverVersion, minSupportedVersion) {
29 if (serverVersion) {
30 try {
31 const regex = /^v(\d+)\.(\d+)\.(\d+)/;
32 const actual = regex.exec(serverVersion);
33 if (actual) {
34 const desired = regex.exec(minSupportedVersion);
35 if (desired) {
36 debug(`parsed serverVersion: ${JSON.stringify(actual)}, desired is ${JSON.stringify(desired)}`);
37 if (parseInt(actual[1]) > parseInt(desired[1])) return true;
38 if (parseInt(actual[1]) < parseInt(desired[1])) return false;
39 if (parseInt(actual[2]) > parseInt(desired[2])) return true;
40 if (parseInt(actual[2]) < parseInt(desired[2])) return false;
41 if (parseInt(actual[3]) >= parseInt(desired[3])) return true;
42 }
43 else assert.ok(false, `failed parsing desired ${minSupportedVersion}`);
44 }
45 else assert.ok(false, `failed parsing actual ${serverVersion}, please fix`);
46 } catch (err) {
47 console.log(`Error parsing server version: ${serverVersion}: ${err}, please fix`);
48 }
49 }
50 return false;
51}
52class DrachtioAgent extends Emitter {
53
54 constructor(callback) {
55 super();
56
57 this.puntUpTheMiddleware = callback ;
58 this.params = new Map() ;
59
60 this.mapServer = new Map() ;
61 this.verbs = new Map() ;
62 this.cdrHandlers = new Map() ;
63
64 //map of stack transaction ids => pending requests, where txn id for request has been challenged
65 this.pendingSipAuthTxnIdUpdate = new Map(),
66
67 this._listen = false;
68 }
69
70 get isListening() {
71 return this._listen;
72 }
73 get idle() {
74
75 let pendingCount = 0 ;
76 let pendingSipCount = 0 ;
77 let pendingAckOrPrack = 0 ;
78
79 this.mapServer.forEach((obj, socket) => {
80 pendingCount += obj.pendingRequests.size ;
81 pendingSipCount += obj.pendingSipRequests.size ;
82 pendingAckOrPrack += obj.pendingAckOrPrack.size ;
83
84 if (pendingCount > 0) {
85 debug(`count of pending requests: ${pendingCount}`) ;
86 for (const key of obj.pendingRequests.keys()) {
87 debug(key);
88 }
89 }
90 if (pendingSipCount > 0) {
91 debug(`count of pending sip requests: ${pendingSipCount}`) ;
92 for (const key of obj.pendingSipRequests.keys()) {
93 debug(key);
94 }
95 }
96 if (pendingAckOrPrack > 0) {
97 debug(`count of pending ack/prack: ${pendingAckOrPrack}`) ;
98 for (const key of obj.pendingAckOrPrack.keys()) {
99 debug(key);
100 }
101 }
102
103 });
104
105 debug(`idle check: ${pendingCount + pendingSipCount + pendingAckOrPrack}`);
106 return (pendingCount + pendingSipCount + pendingAckOrPrack) === 0 ;
107 }
108
109 connect(opts, callback) {
110 this.secret = opts.secret ;
111 this.tags = opts.tags || [];
112
113 this.wp = new WireProtocol(opts) ;
114 this.wp.connect(opts);
115
116 // pass on some of the socket events
117 ['reconnecting', 'close', 'error'].forEach((evt) => {
118 this.wp.on(evt, (...args) => {
119 this.emit(evt, ...args);
120 }) ;
121 }) ;
122
123 this.wp.on('connect', this._onConnect.bind(this)) ;
124 this.wp.on('close', this._onClose.bind(this));
125 this.wp.on('msg', this._onMsg.bind(this)) ;
126
127 if (callback) {
128 Emitter.prototype.on.call(this, 'connect', callback);
129 }
130 }
131
132 listen(opts, callback) {
133 this.secret = opts.secret ;
134 this.tags = opts.tags || [];
135
136 this._listen = true;
137 this.wp = new WireProtocol(opts) ;
138 const server = this.wp.listen(opts);
139
140 delegate(this, 'wp')
141 .method('close') ;
142
143
144 // pass on some of the socket events
145 ['reconnecting', 'close', 'error', 'listening'].forEach((evt) => {
146 this.wp.on(evt, (...args) => {
147 this.emit(evt, ...args);
148 }) ;
149 }) ;
150
151 this.wp.on('close', this._onClose.bind(this));
152 this.wp.on('connection', this._onConnect.bind(this)) ;
153 this.wp.on('msg', this._onMsg.bind(this)) ;
154
155 if (callback) {
156 Emitter.prototype.on.call(this, 'listening', callback);
157 }
158
159 return server ;
160 }
161
162 on(event, fn) {
163
164 //cdr events are handled through a different mechanism - we register with the server
165 if (0 === event.indexOf('cdr:')) {
166 this.cdrHandlers.set(event.slice(4), fn) ;
167 this.route(event) ;
168 }
169 else if (event === 'ping') {
170 const {msgId, socket} = fn;
171 const obj = this.mapServer.get(socket);
172 if (obj) {
173 debug(`sent ping request with msgId ${msgId}`);
174 obj.pendingPingRequests.add(msgId);
175 }
176 }
177 else {
178 //delegate to EventEmitter
179 Emitter.prototype.on.apply(this, arguments);
180 }
181 return this ;
182 }
183
184 sendMessage(socket, msg, opts) {
185 if (!typeSocket(socket)) {
186 opts = msg;
187 msg = socket ;
188 socket = this._getDefaultSocket() ;
189 }
190
191 debug(`sendMessage: ${msg}`);
192 let m = msg ;
193 opts = opts || {} ;
194
195 debug(`opts: ${JSON.stringify(opts)}`);
196
197 if (opts && (opts.headers || opts.body)) {
198 m = new SipMessage(msg) ;
199 for (const hdr in (opts.headers || {})) {
200 m.set(hdr, opts.headers[hdr]) ;
201 }
202 if (opts.body) { m.body = opts.body ; }
203 }
204
205 const s = `sip|${opts.stackTxnId || ''}|${opts.stackDialogId || ''}${CRLF}${m.toString()}`;
206
207 return this.wp.send(socket, s) ;
208 }
209
210 _normalizeParams(socket, uri, options, callback) {
211 if (!typeSocket(socket)) {
212 callback = options ;
213 options = uri ;
214 uri = socket ;
215 socket = null ;
216 }
217
218 if (typeof uri === 'undefined') {
219 const err = new Error('undefined is not a valid request_uri or options object.') ;
220 console.error(err.stack) ;
221 throw err ;
222 }
223
224 // request( request_uri, options, callback, ..)
225 if (options && typeof options === 'object') {
226 options.uri = uri ;
227 }
228 // request( request_uri, callback, ..)
229 else if (typeof uri === 'string') {
230 options = {uri:uri } ;
231 }
232 // request( option, callback, ..)
233 else {
234 callback = options ;
235 options = uri ;
236 uri = options.uri;
237 }
238 callback = callback || noop ;
239
240 if (options._socket) {
241 debugSocket(`_normalizeParams: using socket provided in options._socket: ${sockPort(options._socket)}`);
242 socket = options._socket ;
243 delete options._socket ;
244 }
245 else {
246 socket = this._getDefaultSocket() ;
247 debugSocket(
248 `_normalizeParams: using default socket provided in options._socket: ${sockPort(socket)}`);
249 }
250
251 debug(`options: ${JSON.stringify(options)}`);
252 options.method = options.method.toUpperCase() ;
253
254 return { socket, uri, options, callback } ;
255 }
256
257 _makeRequest(params) {
258 debugSocket(`_makeRequest: there are ${this.mapServer.size} entries in mapServer`);
259 const obj = this.mapServer.get(params.socket) ;
260
261 //allow for requests within a dialog, where caller does not need to supply a uri
262 if (!params.options.uri && !!params.options.stackDialogId) {
263 params.options.uri = 'sip:placeholder' ;
264 }
265
266 const m = new SipMessage(params.options) ;
267
268 //new outgoing request
269 let msg = `sip|${params.options.stackTxnId || ''}|${params.options.stackDialogId || ''}`;
270 if (params.options.proxy) {
271 msg += `|${params.options.proxy}`;
272 }
273 msg += `${CRLF}${m.toString()}` ;
274
275 debugSocket(`_makeRequest: calling wp.send using socket ${sockPort(params.socket)}`);
276 assert.ok(typeSocket(params.socket), 'provided socket is not a net.Socket or tls.TLSSocket');
277 assert.ok(params.socket.destroyed !== true, 'provided socket has been destroyed');
278
279 const msgId = this.wp.send(params.socket, msg) ;
280
281 obj.pendingRequests.set(msgId, (token, msg) => {
282 if (token[0] === 'OK') {
283 const transactionId = token[7] ;
284 const meta = {
285 source: token[1],
286 address: token[4],
287 port: token[5],
288 protocol: token[3],
289 time: token[6],
290 transactionId: transactionId
291 } ;
292
293 const req = new Request(new SipMessage(msg), meta) ;
294 req.agent = this ;
295 req.socket = obj.socket ;
296 if (params.options.auth) {
297 req.auth = params.options.auth ;
298 req._originalParams = params ;
299 }
300
301 //Note: unfortunately, sofia (the nta layer) does not pass up the 200 OK response to a CANCEL
302 //so we are unable to route it up to the application.
303 //Therefore, we can't allocate this callback since it would never be called or freed
304 if (params.options.method !== 'CANCEL') {
305 obj.pendingSipRequests.set(transactionId, {
306 req: req
307 }) ;
308 }
309
310 params.callback(null, req) ;
311
312 }
313 else {
314 params.callback(token[1] || 'request failed') ;
315 }
316 });
317 }
318
319 request(socket, request_uri, options, callback) {
320 const params = this._normalizeParams(socket, request_uri, options, callback) ;
321
322 // check for race condition where we are canceling an INVITE that just got challenged
323 // (so the stackTxnId needs to be upgraded to the new INVITE w credentials we just sent)
324 if (params.options && params.options.stackTxnId) {
325 if (this.pendingSipAuthTxnIdUpdate.has(params.options.stackTxnId)) {
326 debug(`uac-auth: holding ${params.options.method} for ${params.options.stackTxnId} that is being replaced`);
327 this.pendingSipAuthTxnIdUpdate.set(params.options.stackTxnId, params);
328 return;
329 }
330 }
331 return this._makeRequest(params) ;
332 }
333
334 sendResponse(res, opts, callback, fnAck) {
335 const obj = this.mapServer.get(res.socket) ;
336 debug(`agent#sendResponse: ${JSON.stringify(res.msg)}`);
337 const msgId = this.sendMessage(res.socket, res.msg, Object.assign({stackTxnId: res.req.stackTxnId}, opts)) ;
338 if (callback || fnAck) {
339
340 obj.pendingRequests.set(msgId, (token, msg, meta) => {
341 obj.pendingRequests.delete(msgId) ;
342 if ('OK' !== token[0]) { return callback(token[1]) ; }
343 const responseMsg = new SipMessage(msg) ;
344 res.meta = meta ;
345 if (callback) {
346 callback(null, responseMsg) ;
347 }
348
349 // for reliable provisional responses or does caller want to be notified on receipt of prack / ack ?
350 if (fnAck && typeof fnAck === 'function' &&
351 (responseMsg.has('RSeq') || res.status === 200)) {
352 obj.pendingAckOrPrack.set(meta.dialogId, fnAck) ;
353 }
354 }) ;
355 }
356 if (res.statusCode >= 200) {
357 defer(() => {
358 res.finished = true ;
359 res.emit('finish');
360 });
361
362 // clear out pending incoming INVITEs when we send a final response
363 if (res.req.method === 'INVITE') {
364 const callId = res.get('call-id') ;
365 obj.pendingNetworkInvites.delete(callId) ;
366 debug(`Agent#sendResponse: deleted pending invite for call-id ${callId}, ` +
367 `there are now ${obj.pendingNetworkInvites.size} pending invites`);
368 }
369 }
370 }
371
372 sendAck(method, dialogId, req, res, opts, callback) {
373 assert(this.mapServer.has(res.socket));
374 const obj = this.mapServer.get(res.socket) ;
375 const m = new SipMessage() ;
376 m.method = method ;
377 m.uri = req.uri ;
378 opts = opts || {} ;
379
380 Object.assign(opts, {stackDialogId: dialogId}) ;
381
382 const msgId = this.sendMessage(res.socket, m, opts) ;
383 if (callback) {
384 obj.pendingRequests.set(msgId, (token, msg, meta) => {
385 if ('OK' !== token[0]) {
386 return callback(token[1]) ;
387 }
388 callback(null, new SipMessage(msg)) ;
389 }) ;
390 }
391 }
392
393 proxy(req, opts, callback) {
394 const obj = this.mapServer.get(req.socket) ;
395
396 const m = new SipMessage({
397 uri: opts.destination[0],
398 method: req.method
399 }) ;
400
401 if (opts.headers) {
402 for (const hdr in (opts.headers || {})) {
403 m.set(hdr, opts.headers[hdr]) ;
404 }
405 }
406
407 const msg = `proxy|${opts.stackTxnId}|${(opts.remainInDialog ? 'remainInDialog' : '')}` +
408 `|${(opts.fullResponse ? 'fullResponse' : '')}|${(opts.followRedirects ? 'followRedirects' : '')}` +
409 `|${(opts.simultaneous ? 'simultaneous' : 'serial')}|${opts.provisionalTimeout}|${opts.finalTimeout}` +
410 `|${opts.destination.join('|')}${CRLF}${m.toString()}` ;
411
412 const msgId = this.wp.send(req.socket, msg) ;
413 obj.pendingRequests.set(msgId, callback) ;
414 }
415
416 set(prop, val) {
417
418 switch (prop) {
419 case 'handler':
420 this.puntUpTheMiddleware = val ;
421 break ;
422
423 default:
424 this.params.set(prop, val) ;
425 break ;
426 }
427 }
428
429 get(prop) {
430 return this.params.get(prop) ;
431 }
432
433 route(verb) {
434 if (this.verbs.has(verb)) { throw new Error('duplicate route request for ' + verb) ; }
435 this.verbs.set(verb, {sent: false }) ;
436
437 this.mapServer.forEach((obj, socket) => {
438 if (obj.authenticated) {
439 this.routeVerbs(socket) ;
440 }
441 });
442 }
443
444 routeVerbs(socket) {
445 this.verbs.forEach((obj, verb) => {
446 if (obj.sent === true) {
447 return ;
448 }
449
450 obj = {
451 sent: true,
452 acknowledged: false,
453 rid: this.wp.send(socket, 'route|' + verb)
454 } ;
455 });
456 }
457
458 disconnect(socket) {
459 const sock = socket || this._getDefaultSocket();
460 debugSocket(`disconnect: removing socket ${sockPort(sock)}`);
461 this.wp.disconnect(sock) ;
462 if (socket) {
463 this.mapServer.delete(socket);
464 debugSocket(`disconnect: after delete there are ${this.mapServer.size} entries in mapServer`);
465 }
466 }
467 close() {
468 this.wp.close() ;
469 }
470
471 _getDefaultSocket() {
472 debugSocket(`_getDefaultSocket: there are ${this.mapServer.size} entries in mapServer`);
473 const socket = this.mapServer.keys().next().value ;
474 debugSocket(`_getDefaultSocket: returning socket ${sockPort(socket)}`);
475 return socket;
476 }
477 _initServer(socket) {
478 assert(!this.mapServer.has(socket));
479 this.mapServer.set(socket, {
480 //any ping request awaiting a response from a drachtio server
481 pendingPingRequests: new Set(),
482 //any request message awaiting a response from a drachtio server
483 pendingRequests: new Map(),
484 //any sip request generated by us awaiting a final response from a drachtio server
485 pendingSipRequests: new Map(),
486 //any sip request generated by us that we are resending with Authorization header; key=call-id
487 pendingSipAuthRequests: new Map(),
488 //any sip INVITE we've received that we've not yet generated a final response for
489 pendingNetworkInvites: new Map(),
490 // a reliable provisional response or 200 OK to INVITE that is waiting on a PRACK/ACK
491 pendingAckOrPrack: new Map(),
492 authenticated: false,
493 ready: false,
494 hostport: null
495 });
496 debugSocket(`_initServer: added socket: ${sockPort(socket)}, count now: ${this.mapServer.size}`);
497 return this.mapServer.get(socket);
498 }
499
500 _onConnect(socket) {
501 const obj = this._initServer(socket) ;
502 const msgId = this.wp.send(socket, `authenticate|${this.secret}|${this.tags.join(',')}`) ;
503 obj.pendingRequests.set(msgId, (response) => {
504 if (obj.authenticated = ('OK' === response[0])) {
505 obj.ready = true ;
506 obj.hostport = response[1] ;
507 obj.serverVersion = response.length > 2 ? response[2] : null;
508 debug('sucessfully authenticated, hostport is ', obj.hostport) ;
509
510 if (this.wp.isClient) {
511 this.routeVerbs(socket, obj) ;
512 setImmediate(() => {
513 this.emit('connect', null, obj.hostport, obj.serverVersion);
514 });
515 }
516 else {
517 this.emit('connect', null, obj.hostport, obj.serverVersion);
518 }
519 if (serverVersionAtLeast(obj.serverVersion, 'v0.8.2')) {
520 debug(`server version ${obj.serverVersion} supports pinging`);
521 this.wp.startPinging(socket);
522 }
523 }
524 else {
525 this.emit('connect', new Error('failed to authenticate to server')) ;
526 }
527 }) ;
528 }
529 _onClose(socket) {
530 this.mapServer.delete(socket);
531 debugSocket(`_initServer: removed socket: ${sockPort(socket)}, count now: ${this.mapServer.size}`);
532 }
533
534 _onMsg(socket, msg) {
535 const obj = this.mapServer.get(socket) ;
536 const pos = msg.indexOf(CR) ;
537 const leader = -1 === pos ? msg : msg.slice(0, pos) ;
538 const token = leader.split('|') ;
539 let res, sr, rawMsg ;
540
541 switch (token[1]) {
542 case 'sip':
543 if (!obj) {
544 debug('socket not found, message discarding');
545 return ;
546 }
547 rawMsg = msg.slice(pos + 2) ;
548 const sipMsg = new SipMessage(rawMsg) ;
549 const source = token[2] ;
550 const protocol = token[4] ;
551 const address = token[5] ;
552 const port = token[6] ;
553 const time = token[7] ;
554 const transactionId = token[8] ;
555 const dialogId = token[9] ;
556 const server = {
557 address: socket.remoteAddress,
558 hostport: obj.hostport
559 };
560 let receivedOn;
561 if (token.length > 11) {
562 receivedOn = token[10] + ':' + token[11];
563 }
564 const meta = { source, address, port, protocol, time, transactionId, dialogId, server, receivedOn } ;
565 debug(`tokens: ${JSON.stringify(token)}`);
566
567 if (token.length > 9) {
568
569 if ('network' === source && sipMsg.type === 'request') {
570
571 //handle CANCELS by locating the associated INVITE and emitting a 'cancel' event
572 const callId = sipMsg.get('call-id') ;
573 if ('CANCEL' === sipMsg.method) {
574
575 // hopefully, this pertains to an INVITE we have received earlier
576 if (obj.pendingNetworkInvites.has(callId)) {
577 obj.pendingNetworkInvites.get(callId).req.emit('cancel') ;
578 obj.pendingNetworkInvites.delete(callId) ;
579 debug(`Agent#handle - emitted cancel event for INVITE with call-id ${callId}` +
580 `, remaining count of invites in progress: ${obj.pendingNetworkInvites.size}`);
581 }
582 else {
583 // if not, don't punt up the middleware because the drachtio server will have already
584 // responded to the CANCEL and we dont want to send another 404 which is what would happen
585 debug(`Agent#handle - got CANCEL for call-id ${callId} that was not found`);
586 }
587 return;
588 }
589
590 debug(`DrachtioAgent#_onMsg: meta: ${JSON.stringify(meta)}`);
591
592 const req = new Request(sipMsg, meta) ;
593 res = new Response() ;
594 req.res = res ;
595 res.req = req ;
596 req.agent = res.agent = this ;
597 req.socket = res.socket = socket ;
598
599 if ('INVITE' === req.method) {
600 obj.pendingNetworkInvites.set(callId, { req, res }) ;
601 debug(`Agent#handle: tracking an incoming invite with call-id ${callId}, ` +
602 `currently tracking ${obj.pendingNetworkInvites.size} invites in progress`);
603 }
604 else if (('PRACK' === req.method || 'ACK' === req.method) && obj.pendingAckOrPrack.has(dialogId)) {
605 const fnAck = obj.pendingAckOrPrack.get(dialogId);
606 obj.pendingAckOrPrack.delete(dialogId);
607 fnAck(req) ;
608 }
609
610 this.puntUpTheMiddleware(req, res) ;
611 }
612 else if ('network' === source) {
613 debug('received sip response');
614 if (obj.pendingSipRequests.has(transactionId)) {
615 sr = obj.pendingSipRequests.get(transactionId) ;
616 res = new Response(this) ;
617 res.msg = sipMsg ;
618 res.meta = meta ;
619 res.req = sr.req ;
620 res.socket = res.req.socket = socket ;
621
622 debug('Agent#handle: got a response with status: %d', res.status) ;
623
624 if (res.status >= 200) {
625 obj.pendingSipRequests.delete(transactionId) ;
626 }
627
628 //prepare a function to be called for prack or ack, if appropriate
629 let ack = noop ;
630 if (res.status >= 200 && res.req.method === 'INVITE') {
631 ack = Response.prototype.sendAck.bind(res, token[9]) ;
632 }
633 else if (res.status > 100 && res.status < 200) {
634 const prackNeeded = res.get('RSeq');
635 if (prackNeeded) {
636 ack = Response.prototype.sendPrack.bind(res, token[9]) ;
637 }
638 }
639 // If its a challenge and the user supplied username and password, automatically handle it
640 const cid = res.msg.headers['call-id'];
641 if (obj.pendingSipAuthRequests.has(cid)) {
642 obj.pendingSipAuthRequests.delete(cid) ;
643 this.pendingSipAuthTxnIdUpdate.delete(res.req.stackTxnId);
644 }
645 else if ((401 === res.status || 407 === res.status) && (!!res.req.auth)) {
646 obj.pendingSipAuthRequests.set(cid, true) ;
647 this.pendingSipAuthTxnIdUpdate.set(res.req.stackTxnId, {});
648 const client = new DigestClient(res) ;
649 client.authenticate((err, req) => {
650 // move all listeners from the old request to the new one we just generated
651 res.req.listeners('response').forEach((l) => { req.on('response', l) ; }) ;
652 res.req.emit('authenticate', req) ;
653
654 //if we got a quick CANCEL before we got the new txn id, it was held and can now be sent
655 const params = this.pendingSipAuthTxnIdUpdate.get(res.req.stackTxnId);
656 if (params && params.options && params.options.stackTxnId) {
657 debug(
658 `uac-auth: sending out delayed ${params.options.method} originally for ${res.req.stackTxnId}`);
659 params.options.stackTxnId = req.stackTxnId;
660 this._makeRequest(params);
661 }
662 this.pendingSipAuthTxnIdUpdate.delete(res.req.stackTxnId);
663
664 // the app may still call req.cancel() on the old request, so make that work
665 debug(`uac-auth: new transaction ${req.stackTxnId} overwrites ${res.req.stackTxnId}`);
666 res.req.stackTxnId = req.stackTxnId;
667 }) ;
668 return ;
669 }
670 sr.req.emit('response', res, ack) ;
671 }
672 }
673 else if ('application' === source && sipMsg.type === 'request' && transactionId === 'unsolicited') {
674 debug('received unsolicited request sent from application; probably BYE due to ACK timeout or the like');
675 const req = new Request(sipMsg, meta) ;
676 res = new Response() ;
677 req.res = res ;
678 res.req = req ;
679 req.agent = res.agent = this ;
680 req.socket = res.socket = socket ;
681
682 //stub out send
683 res.send = noop;
684
685 this.puntUpTheMiddleware(req, res);
686 }
687 }
688
689 break ;
690
691 case 'response':
692 if (!obj) {
693 debug('socket not found, message discarding');
694 return ;
695 }
696 const rId = token[2] ;
697
698 if (obj.pendingPingRequests.has(rId)) {
699 obj.pendingPingRequests.delete(rId);
700 debug(`got pong response with msgId ${rId}, count outstanding: ${obj.pendingPingRequests.length}`);
701 }
702 else if (obj.pendingRequests.has(rId)) {
703 if (-1 !== pos) { rawMsg = msg.slice(pos + 2) ; }
704 const meta2 = {
705 source: token[4],
706 address: token[7],
707 port: token[8],
708 protocol: token[6],
709 time: token[9],
710 transactionId: token[10],
711 dialogId: token[11]
712 } ;
713 const fn = obj.pendingRequests.get(rId).bind(this, token.slice(3), rawMsg, meta2) ;
714 if ('continue' !== token[12]) {
715 obj.pendingRequests.delete(rId) ;
716 }
717 fn() ;
718 }
719 break ;
720
721 case 'cdr:attempt':
722 case 'cdr:start':
723 case 'cdr:stop':
724 const cdrEvent = token[1].slice(4) ;
725 const msgSource = token[2] ;
726 const msgTime = token[3] ;
727 rawMsg = msg.slice(pos + 2) ;
728 const cdrSipMsg = new SipMessage(rawMsg) ;
729 const args = [msgSource, msgTime] ;
730 if (cdrEvent !== 'attempt') { args.push(token[4]) ; }
731 args.push(cdrSipMsg) ;
732
733 if (this.cdrHandlers.has(cdrEvent)) {
734 this.cdrHandlers.get(cdrEvent).apply(this, args) ;
735 }
736 break ;
737
738 default:
739 throw new Error('unexpected message with type: ' + token[1]) ;
740 }
741 }
742}
743
744DrachtioAgent.prototype.uac = DrachtioAgent.prototype.request ; // alias
745
746module.exports = DrachtioAgent ;