UNPKG

4.54 kBJavaScriptView Raw
1'use strict';
2
3var _ = require('lodash');
4var diameterCodec = require('./diameter-codec');
5var diameterUtil = require('./diameter-util');
6var Q = require('bluebird');
7
8
9var DIAMETER_MESSAGE_HEADER_LENGTH_IN_BYTES = 20;
10
11var getSessionId = function(message) {
12 var sessionIdAvp = _.find(message.body, function(avp) {
13 return avp[0] === 'Session-Id';
14 });
15 if (sessionIdAvp !== undefined) return sessionIdAvp[1];
16 return undefined;
17};
18
19function DiameterConnection(options, socket) {
20 if (!(this instanceof DiameterConnection)) {
21 return new DiameterConnection(options, socket);
22 }
23 options = options || {};
24 var self = this;
25 self.socket = socket;
26 self.options = options;
27 self.pendingRequests = {};
28 self.hopByHopIdCounter = diameterUtil.random32BitNumber();
29
30 var buffer = new Buffer(0);
31
32 self.socket.on('data', function(data) {
33 try {
34 buffer = Buffer.concat([buffer, data instanceof Buffer ? data : new Buffer(data)]);
35
36 // If we collected header
37 if (buffer.length >= DIAMETER_MESSAGE_HEADER_LENGTH_IN_BYTES) {
38 var messageLength = diameterCodec.decodeMessageHeader(buffer).header.length;
39
40 // If we collected the entire message
41 if (buffer.length >= messageLength) {
42 var message = diameterCodec.decodeMessage(buffer);
43
44 if (message.header.flags.request) {
45 var response = diameterCodec.constructResponse(message);
46
47 if (_.isFunction(self.options.beforeAnyMessage)) {
48 self.options.beforeAnyMessage(message);
49 }
50
51 self.socket.emit('diameterMessage', {
52 sessionId: getSessionId(message),
53 message: message,
54 response: response,
55 callback: function(response) {
56 if (_.isFunction(self.options.afterAnyMessage)) {
57 self.options.afterAnyMessage(response);
58 }
59 var responseBuffer = diameterCodec.encodeMessage(response);
60 setImmediate(function() {
61 self.socket.write(responseBuffer);
62 });
63 }
64 });
65 } else {
66 var pendingRequest = self.pendingRequests[message.header.hopByHopId];
67 if (pendingRequest != null) {
68 if (_.isFunction(self.options.afterAnyMessage)) {
69 self.options.afterAnyMessage(message);
70 }
71 delete self.pendingRequests[message.header.hopByHopId];
72 pendingRequest.deferred.resolve(message);
73 } else {
74 // handle this
75 }
76 }
77 buffer = buffer.slice(messageLength);
78 }
79 }
80 } catch (err) {
81 self.socket.emit('error', err);
82 }
83 });
84
85 self.createRequest = function(application, command, sessionId) {
86 if (sessionId === undefined) {
87 sessionId = diameterUtil.random32BitNumber();
88 }
89 return diameterCodec.constructRequest(application, command, sessionId);
90 };
91
92 self.sendRequest = function(request, timeout) {
93 var deferred = Q.defer();
94 if (this.socket === undefined) {
95 deferred.reject('Socket not bound to session.');
96 return deferred.promise;
97 }
98 timeout = timeout || this.options.timeout || 3000;
99 request.header.hopByHopId = this.hopByHopIdCounter++;
100 if (_.isFunction(this.options.beforeAnyMessage)) {
101 this.options.beforeAnyMessage(request);
102 }
103 var requestBuffer = diameterCodec.encodeMessage(request);
104 this.socket.write(requestBuffer);
105 var promise = deferred.promise.timeout(timeout, 'Request timed out, no response was received in ' + timeout + 'ms');
106 this.pendingRequests[request.header.hopByHopId] = {
107 'request': request,
108 'deferred': deferred
109 };
110 return promise;
111 };
112
113 self.end = function() {
114 socket.end();
115 };
116}
117
118exports.DiameterConnection = DiameterConnection;