UNPKG

6.79 kBJavaScriptView Raw
1/**
2 * Created by compass341 on 2/28/2018.
3 */
4
5var WebSocket = require('ws');
6var agent = require("./agent-setting");
7var DataMessageHandler = require("./datamessage-handler");
8var util = require('./util');
9var websocket = require('websocket-stream')
10
11var backlog = [];
12var thisInstance= undefined;
13
14function dataConnHandler(){
15 thisInstance= this;
16 this.client=null;
17 this.timeout=null;
18 this.dataMsgHandler=null;
19 this.discardedFPLength = 0;
20 this.istempDataConn = false;
21 this.protocol
22 /*
23 * Adding NDCHost & port in constructor, so if control connection is getting close during running test
24 * then agent will switch over and will try to connect with backup NDC, but Data and Auto connection will connect with
25 * previous NDC, untill control connection with new server has been made
26 * */
27 this.ndcHost=0 ;this.ndcPort=7892;
28}
29
30var closeConnListener =function(err) {
31 var self =thisInstance
32 util.logger.warn(agent.currentTestRun+" | Data connection, Received socket close event from Host : "+self.ndcHost+" ,Port="+self.ndcPort +", Error : "+err);
33 if(self.istempDataConn) {
34 self.closeConnection();
35 }
36 /*else{
37 if(this.client) {
38 self.client.removeListener('close', closeConnListener);
39 self.client.removeListener('open', connectConnListener);
40 }
41 self.connectToServer();
42 }*/
43}
44
45var connectConnListener = function() {
46 try {
47 var self =thisInstance
48 self.timeout = undefined;
49 if(!self.istempDataConn){
50 util.logger.info(agent.currentTestRun+" | Data Connection established with NDCollector : Socket[addr="+self.ndcHost+",port="+self.ndcPort + ",localport=" +this.localPort );
51 self.dataMsgHandler = new DataMessageHandler(self);
52 if( backlog.length ) {
53 for(var i= 0, len= backlog.length; len>i; ++i)
54 self.client.send(backlog[i]);
55
56 backlog.length= 0;
57 }
58 }
59 }
60 catch(e){
61 util.logger.warn(e);
62 }
63}
64
65dataConnHandler.prototype.createDataConn = function(server,istempDataConn,currProto,cbForHeapDump){
66 try {
67 this.ndcHost = server.ndcHost; //Setting new Host port, at time of new Object [connection] creation
68 this.ndcPort = currProto.port;
69 this.istempDataConn = istempDataConn; //Flag for New Data Connection.
70 this.protocol = currProto.protocol
71 var self = this;
72 this._connect(cbForHeapDump);
73 }catch(err){util.logger.warn(agent.currentTestRun + " | Error in connect in temp data connection ",err);}
74};
75
76
77dataConnHandler.prototype.connectToServer=function() {
78 try {
79 if (!agent.isTestRunning) {
80 util.logger.warn(agent.currentTestRun + " | Test run is not running .")
81 return;
82 }
83 var self = this;
84 if (self.timeout)
85 return;
86
87 self.timeout = setTimeout(function () {
88 try {
89 self.timeout = undefined;
90 self._connect();
91 util.logger.warn(agent.currentTestRun + " | Timer for retrying Data connectoion expired. trying to connect with Host : " + self.ndcHost + " ,Port=" + self.ndcPort);
92 } catch (e) {
93 util.logger.warn(e);
94 }
95 }, 60000);
96 }
97 catch(e){
98 util.logger.warn(e);
99 }
100}
101
102
103dataConnHandler.prototype.closeConnection =function() {
104 try {
105 /*
106 * 1. We are removing close listener befor closing connection because is close listener we are retrying for connection .
107 * 2. After closing connection , removing all liteners ,because in close's handshaking, server will send error, so agent should handle
108 * exceptions*/
109 util.logger.info(agent.currentTestRun + " | Closing the New Data connection .");
110 if (this.client) {
111 this.client.socket.removeListener('close',closeConnListener)
112 this.client.socket.close();
113 //this.client.socket.removeAllListeners()
114 this.client.destroy()
115 this.client.end()
116 delete this.client;
117 this.client = undefined;
118 }
119 this.ndcHost = 0;
120 this.ndcPort = 7892;
121 clearTimeout(this.timeout)
122 this.timeout = null;
123 this.istempDataConn = false;
124 this.discardedFPLength = 0;
125 delete this.dataMsgHandler;
126 }
127 catch(e){
128 util.logger.warn(e);
129 }
130}
131
132dataConnHandler.prototype._connect = function(cbForHeapDump) {
133 var self = this;
134 if(!agent.isTestRunning && !self.istempDataConn) {
135 util.logger.warn(agent.currentTestRun+" | Test is not running ,error in making data connection")
136 return;
137 }
138 try {
139 var url = this.protocol+'://'+self.ndcHost+':'+self.ndcPort+'/';
140 var options={}
141 if(this.protocol.toLowerCase() == 'wss')
142 options.rejectUnauthorized = false
143 this.client = websocket(url,options);
144
145 this.client.socket.on('error', function(err) {});
146 this.client.socket.on('end', function(err) {});
147 this.client.socket.on('close',closeConnListener);
148 if(cbForHeapDump) this.client.socket.on('open',cbForHeapDump);
149 }
150 catch(err) {
151 util.logger.warn(agent.currentTestRun+" | Error in making data connection",err);
152 }
153};
154
155dataConnHandler.prototype.write=function(data){
156 try {
157 if(!this.client ||!data || !data.length)return
158 if(this.client.socket.readyState === WebSocket.OPEN) {
159 if(!this.istempDataConn) {
160 if (this.client.socket.bufferedAmount >= agent.ndDataBufferSize) {
161 if (this.discardedFPLength % 1000 === 0) {
162 util.logger.warn(agent.currentTestRun + " | Discarding Data conn data, Buffer size : ", this.client.bufferSize, " is greater then ndDataBufferSize");
163 this.discardedFPLength = 0
164 }
165 ++this.discardedFPLength;
166 return false
167 }
168 }
169 this.client.socket.send(data);
170 }
171 else{
172 if (backlog.length <= 500)
173 backlog.push(data);
174
175 util.logger.warn(agent.currentTestRun+"| client not connected ...........")
176 return
177 }
178 }
179 catch(e){
180 util.logger.warn(agent.currentTestRun+" | Error in sending data on socket in dataConnHandler ",e);
181 }
182};
183
184dataConnHandler.prototype.getSocket=function(){
185 return this.client.socket;
186}
187module.exports = dataConnHandler;