1 | 'use strict';
|
2 |
|
3 | var _ = require('lodash');
|
4 | var diameterCodec = require('./diameter-codec');
|
5 | var diameterUtil = require('./diameter-util');
|
6 | var Q = require('bluebird');
|
7 |
|
8 |
|
9 | var DIAMETER_MESSAGE_HEADER_LENGTH_IN_BYTES = 20;
|
10 |
|
11 | var getSessionId = function(message) {
|
12 | var sessionIdAvp = _.find(message.body, function(avp) {
|
13 | return avp[0] === 'Session-Id';
|
14 | });
|
15 | if (sessionIdAvp !== undefined) return sessionIdAvp[1];
|
16 | return undefined;
|
17 | };
|
18 |
|
19 | function DiameterConnection(options, socket) {
|
20 | if (!(this instanceof DiameterConnection)) {
|
21 | return new DiameterConnection(options, socket);
|
22 | }
|
23 | options = options || {};
|
24 | var self = this;
|
25 | self.socket = socket;
|
26 | self.options = options;
|
27 | self.pendingRequests = {};
|
28 | self.hopByHopIdCounter = diameterUtil.random32BitNumber();
|
29 |
|
30 | var buffer = new Buffer(0);
|
31 |
|
32 | self.socket.on('data', function(data) {
|
33 | try {
|
34 | buffer = Buffer.concat([buffer, data instanceof Buffer ? data : new Buffer(data)]);
|
35 |
|
36 |
|
37 | if (buffer.length >= DIAMETER_MESSAGE_HEADER_LENGTH_IN_BYTES) {
|
38 | var messageLength = diameterCodec.decodeMessageHeader(buffer).header.length;
|
39 |
|
40 |
|
41 | if (buffer.length >= messageLength) {
|
42 | var message = diameterCodec.decodeMessage(buffer);
|
43 |
|
44 | if (message.header.flags.request) {
|
45 | var response = diameterCodec.constructResponse(message);
|
46 |
|
47 | if (_.isFunction(self.options.beforeAnyMessage)) {
|
48 | self.options.beforeAnyMessage(message);
|
49 | }
|
50 |
|
51 | self.socket.emit('diameterMessage', {
|
52 | sessionId: getSessionId(message),
|
53 | message: message,
|
54 | response: response,
|
55 | callback: function(response) {
|
56 | if (_.isFunction(self.options.afterAnyMessage)) {
|
57 | self.options.afterAnyMessage(response);
|
58 | }
|
59 | var responseBuffer = diameterCodec.encodeMessage(response);
|
60 | setImmediate(function() {
|
61 | self.socket.write(responseBuffer);
|
62 | });
|
63 | }
|
64 | });
|
65 | } else {
|
66 | var pendingRequest = self.pendingRequests[message.header.hopByHopId];
|
67 | if (pendingRequest != null) {
|
68 | if (_.isFunction(self.options.afterAnyMessage)) {
|
69 | self.options.afterAnyMessage(message);
|
70 | }
|
71 | delete self.pendingRequests[message.header.hopByHopId];
|
72 | pendingRequest.deferred.resolve(message);
|
73 | } else {
|
74 |
|
75 | }
|
76 | }
|
77 | buffer = buffer.slice(messageLength);
|
78 | }
|
79 | }
|
80 | } catch (err) {
|
81 | self.socket.emit('error', err);
|
82 | }
|
83 | });
|
84 |
|
85 | self.createRequest = function(application, command, sessionId) {
|
86 | if (sessionId === undefined) {
|
87 | sessionId = diameterUtil.random32BitNumber();
|
88 | }
|
89 | return diameterCodec.constructRequest(application, command, sessionId);
|
90 | };
|
91 |
|
92 | self.sendRequest = function(request, timeout) {
|
93 | var deferred = Q.defer();
|
94 | if (this.socket === undefined) {
|
95 | deferred.reject('Socket not bound to session.');
|
96 | return deferred.promise;
|
97 | }
|
98 | timeout = timeout || this.options.timeout || 3000;
|
99 | request.header.hopByHopId = this.hopByHopIdCounter++;
|
100 | if (_.isFunction(this.options.beforeAnyMessage)) {
|
101 | this.options.beforeAnyMessage(request);
|
102 | }
|
103 | var requestBuffer = diameterCodec.encodeMessage(request);
|
104 | this.socket.write(requestBuffer);
|
105 | var promise = deferred.promise.timeout(timeout, 'Request timed out, no response was received in ' + timeout + 'ms');
|
106 | this.pendingRequests[request.header.hopByHopId] = {
|
107 | 'request': request,
|
108 | 'deferred': deferred
|
109 | };
|
110 | return promise;
|
111 | };
|
112 |
|
113 | self.end = function() {
|
114 | socket.end();
|
115 | };
|
116 | }
|
117 |
|
118 | exports.DiameterConnection = DiameterConnection;
|