UNPKG

20.5 kBJavaScriptView Raw
1
2/**
3 * Created by Sahil on 7/29/16.
4 */
5var fs = require('fs'),
6 zlib = require('zlib');
7var profiler
8try {
9 profiler = require('cavisson-profiling-tool');
10}catch(e){console.log("Can't find module cavisson-profiling-tool ")}
11var stringBuffer = require('./flowpath/StringBuffer.js').StringBuffer;
12var agentSetting = require('./agent-setting.js');
13var ut = require('./util');
14var timeOutToStartHeartBeat = undefined
15var mkdirp = require('mkdirp')
16
17function v8_profiler () {
18
19 this.stream;
20 this.snapshot1;
21 this.compressMode = 1;
22 this.downloadFile = 2;
23 this.fileName = '/tmp/'+new Date()+'.heapsnapshot'
24 this.gzip
25 this.dataSocket
26
27}
28
29v8_profiler.prototype.initProfilerObject = function(clientMsg,dataSocket,errorCallBack){
30 /* * *
31 * Parsing the clientMsg
32 * * */
33 try{
34 var self = this
35 var splitArr = clientMsg.split(";"),dir,compleFilePath;
36 for(var i=0; i<splitArr.length; i++){
37
38 if(splitArr[i] == '') continue
39
40 if(splitArr[i].indexOf("CompressMode") != -1){
41 var compressMode = parseInt(splitArr[i].split("=")[1]);
42 if(compressMode != undefined )self.compressMode = compressMode
43 }
44 else if(splitArr[i].indexOf("DownloadFile") != -1){
45 var downloadFile = parseInt(splitArr[i].split("=")[1])
46 if(downloadFile != undefined )self.downloadFile = downloadFile
47 }
48 else if(splitArr[i].indexOf("FileName") != -1){
49
50 var tmpfileName;
51 compleFilePath = splitArr[i].split("=")[1]
52 if(compleFilePath) {
53 dir = compleFilePath.substring(0, compleFilePath.lastIndexOf('/'))
54 tmpfileName = compleFilePath.substring(compleFilePath.lastIndexOf('/') + 1)
55 }
56 if(!tmpfileName || tmpfileName == '')
57 self.fileName = (compleFilePath ? compleFilePath : '/tmp/') +new Date()+'.heapsnapshot'
58 else
59 self.fileName = compleFilePath
60 }
61 }
62
63 if(self.compressMode == 1){
64 var ext = self.fileName.substring(self.fileName.lastIndexOf('.')+1)
65 if(ext !== 'gz'){
66 self.fileName = self.fileName + '.gz'
67 }
68 self.gzip = zlib.createGzip({chunkSize:1024*1024*5,level:9,highWaterMark:1024*1024*10});
69 }
70
71 self.dataSocket = dataSocket
72
73 if(self.downloadFile == 1 || self.downloadFile == 0){
74 self.mkDirOnServer(dir,errorCallBack)
75 }
76 }
77 catch(e){
78 ut.logger.error(agentSetting.currentTestRun, '| Error While Initializing the v8_Object',e )
79 }
80}
81
82v8_profiler.prototype.mkDirOnServer = function(fpath,errorCallBack){
83 try{
84 var self = this
85 if(!fs.existsSync(fpath)){
86 mkdirp.sync(fpath,function(err){
87 if(err){
88 self.cleanV8Object(err,errorCallBack)
89 }
90 })
91 }
92 else
93 fs.accessSync(fpath, fs.constants.X_OK | fs.constants.W_OK | fs.constants.R_OK)
94 }catch(e){
95 self.cleanV8Object(e,errorCallBack)
96 ut.logger.error(agentSetting.currentTestRun, '| Error: While Creating the Directory : ',e)
97 }
98}
99
100v8_profiler.prototype.TakeSnapShot = function() {
101
102 try{
103 var self = this
104 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler.takeHeapSnapShot , Taking heapsnapshot On New Connection (mb)", parseInt((process.memoryUsage().heapUsed / 1048576).toFixed(3)))
105 self.snapshot1 = profiler.takeSnapshot();
106 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler.takeHeapSnapShot , Heapdump taken successfully (mb)", parseInt((process.memoryUsage().heapUsed / 1048576).toFixed(3)))
107 self.stream = self.snapshot1.export()
108 self.stream._readableState.highWaterMark = 1024 * 1024 * 10;
109 self.stream._writableState.highWaterMark = 1024 * 1024 * 10;
110
111 }catch(e){
112 ut.logger.error(agentSetting.currentTestRun, '| Error: While Taking Heap Snapshot : ',e )
113 }
114}
115
116v8_profiler.prototype.takeHeapSnapShotOnNewConn = function(clientSocket,dataSocket,clientMsg,asyncId,command,errorCallBack){
117
118 try {
119 var self = this
120 if (!profiler) {
121 ut.logger.error(agentSetting.currentTestRun, '| Cannot load cavisson-profiling-tool ,cavisson-profiling-tool value is :', profiler)
122 throw new Error('! profiler')
123 }
124
125 self.initProfilerObject(clientMsg,dataSocket,errorCallBack)
126
127 self.dataSocket.write("run_async_command_data_req:Command=" + command + ";Id=" + asyncId + ";Tier=" + agentSetting.tier + ";Server=" + agentSetting.server + ";Instance=" + agentSetting.instance + ";Size=-1;CompressMode="+self.compressMode+"\n")
128 self.dataSocket.write("Complete\n")
129
130 self.TakeSnapShot()
131
132 if ( self.downloadFile == 0 ){
133 if(self.compressMode == 0){
134 self.CreateUnCompressHeapFile(errorCallBack)
135 }
136 else if(self.compressMode == 1){
137 self.CreateCompressHeapFile(errorCallBack)
138 }
139 }
140 else if ( self.downloadFile == 1){
141 if(self.compressMode == 0){
142 self.CreateAndDownloadUncompressHeap(errorCallBack)
143 }
144 else if(self.compressMode == 1){
145 self.CreateAndDownloadCompressHeap(errorCallBack)
146 }
147 }
148 else if ( self.downloadFile == 2 ){
149 if ( self.compressMode == 0 ){
150 self.DownloadUnCompressHeap(errorCallBack)
151 }
152 else if ( self.compressMode == 1 ){
153 self.DownloadCompressHeap(errorCallBack)
154 }
155 }
156
157 } catch (e){
158 self.cleanV8Object(e,errorCallBack)
159 ut.logger.info(agentSetting.currentTestRun + "| Error occured during Taking HeapDump main (Catch): "+e)
160 }
161}
162
163/* * *
164* case : UnCompress HeapDump on Server Only : [function] CreateUnCompressHeapFile
165* case : UnCompress HeapDump on Server and NDC : [function] CreateAndDownloadUncompressHeap
166* case : Compress HeapDump on Server and NDC : [function] CreateAndDownloadCompressHeap
167* case : UnCompress HeapDump on NDC Only : [function] DownloadUnCompressHeap
168* case : Compress HeapDump on NDC Only : [function] DownloadCompressHeap
169* * */
170
171v8_profiler.prototype.DownloadCompressHeap = function(errorCallBack){
172
173 try{
174 var self = this
175 var socket = self.dataSocket.getSocket()
176 var zstream = self.stream.pipe(self.gzip)
177 zstream.pipe(self.dataSocket.client,{end:false}).on('error',function(err){
178 if(err)
179 zstream.pause()
180 })
181 socket.on('drain',function(){zstream.resume()})
182 zstream.on('end',function(){self.handleEnd(errorCallBack)})
183 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Compressed File Transfered")
184 }
185 catch(e){
186 self.cleanV8Object(e,errorCallBack)
187 }
188}
189
190v8_profiler.prototype.DownloadUnCompressHeap = function(errorCallBack){
191
192 try{
193 var self = this
194 var socket = self.dataSocket.getSocket()
195 self.stream.pipe(self.dataSocket.client,{end:false}).on(('error'),function(err){
196 if(err)
197 self.stream.pause()
198 })
199 socket.on('drain',function(){self.stream.resume()})
200 self.stream.on('end',function(){self.handleEnd(errorCallBack)})
201 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : UnCompressed File Transfered")
202 }
203 catch(e){
204 self.cleanV8Object(e,errorCallBack)
205 }
206}
207
208v8_profiler.prototype.CreateCompressHeapFile = function(errorCallBack) {
209 try {
210 var self = this
211 var wStream = fs.createWriteStream(self.fileName, {highWaterMark: 1024 * 1024 * 10})
212 var zipped = self.stream.pipe(self.gzip).on('end',function(){self.handleEnd(errorCallBack)})
213 zipped.pipe(wStream)
214 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Created Compressed file on server, Path :", self.fileName)
215 }
216 catch(e){
217 self.cleanV8Object(e,errorCallBack)
218 }
219}
220
221v8_profiler.prototype.CreateUnCompressHeapFile = function(errorCallBack) {
222
223 try {
224 var self = this
225 var wStream = fs.createWriteStream(self.fileName, {highWaterMark: 1024 * 1024 * 10})
226 self.stream.pipe(wStream)
227 self.stream.on('end',function(){self.handleEnd(errorCallBack)})
228 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Created Uncompressed file on server, Path :", self.fileName)
229 }
230 catch(e){
231 self.cleanV8Object(e,errorCallBack)
232 }
233}
234
235v8_profiler.prototype.CreateAndDownloadCompressHeap = function(errorCallBack) {
236
237 try{
238 var self = this
239 var wStream = fs.createWriteStream(self.fileName,{highWaterMark:1024 * 1024 * 10})
240 var zipped = self.stream.pipe(self.gzip).on('end',function(){self.sendFileToNDC(errorCallBack)})
241 zipped.pipe(wStream)
242 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Created Compressed file on server, Path :",self.fileName)
243 }
244 catch(e){
245 self.cleanV8Object(e,errorCallBack)
246 }
247}
248
249v8_profiler.prototype.CreateAndDownloadUncompressHeap = function(errorCallBack){
250
251 try{
252 var self = this
253 var wStream = fs.createWriteStream(self.fileName,{highWaterMark:1024 * 1024 * 10})
254 self.stream.pipe(wStream)
255 self.stream.on('end',function(){self.sendFileToNDC(errorCallBack)})
256 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Created UnCompressed file on server, Path :",self.fileName)
257 }
258 catch(e){
259 self.cleanV8Object(e,errorCallBack)
260 }
261
262}
263
264v8_profiler.prototype.sendFileToNDC = function(errorCallBack) {
265
266 try{
267 var self = this
268 var socket = self.dataSocket.getSocket()
269 var rStream = fs.createReadStream(self.fileName, {highWaterMark: 1024 * 1024 * 10})
270 socket.on('drain', function () {
271 rStream.resume()
272 })
273 rStream.pipe(self.dataSocket.client,{end: false}).on('error', function (err) {
274 if (err) {
275 rStream.pause()
276 }
277 })
278
279 rStream.on('end',function(){self.handleEnd(errorCallBack)})
280 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Sending File to NDC")
281 }
282 catch(e){
283 self.cleanV8Object(e,errorCallBack)
284 }
285}
286
287v8_profiler.prototype.downloadFileToNDC = function(fileName,deleteFile,errorCallBack) {
288
289 try{
290 var self = this
291 var rStream = fs.createReadStream(fileName, {highWaterMark: 1024 * 1024 * 10})
292 self.dataSocket.client.on('drain', function () {
293 rStream.resume()
294 })
295
296 rStream.pipe(self.dataSocket.client, {end: false}).on('error', function (err) {
297 if (err)
298 rStream.pause()
299 })
300
301 rStream.on('end',function(){
302 ut.logger.info(agentSetting.currentTestRun + " | End of Download Process Cycle");
303 if(deleteFile == 1)
304 fs.unlinkSync(fileName);
305 self.handleEndDownloadFile(errorCallBack)
306 });
307 ut.logger.info(agentSetting.currentTestRun + " | v8_profiler : Downloading File to NDC")
308 }
309 catch(e){
310 self.cleanV8Object(e,errorCallBack)
311 }
312}
313
314
315v8_profiler.prototype.handleEnd = function(errorCallBack){
316
317 try{
318 var self = this
319 if (self.dataSocket.client && self.dataSocket.client._writableState && self.dataSocket.client._writableState.length == 0 ) {
320 self.dataSocket.write('Heapdump:Result=OK;')
321 ut.logger.info(agentSetting.currentTestRun + " | End of Taking Heapdump Process Cycle");
322 self.cleanV8Object(undefined,errorCallBack)
323 }
324 else {
325 if(self.dataSocket.client && self.dataSocket.client.writable)
326 setTimeout(function(){self.handleEnd(errorCallBack)}, 2000)
327 else
328 self.cleanV8Object(new Error('Connection is not there'),errorCallBack)
329 }
330 }
331 catch(e) {
332 self.cleanV8Object(e,errorCallBack)
333 }
334}
335
336v8_profiler.prototype.cleanV8Object = function(e,errorCallBack){
337
338 try{
339 var self = this
340 if(self.dataSocket){
341 self.dataSocket.closeConnection();
342 }
343 if(self.snapshot1){
344 delete self.snapshot1
345 }
346 self.stream = undefined;
347 self.gzip = undefined;
348 self.dataSocket = undefined;
349 errorCallBack(e)
350 ut.logger.info(agentSetting.currentTestRun + " | Cleaned the V8Object Instance . ")
351 }
352 catch(e){
353 ut.logger.error(agentSetting.currentTestRun, '| Error: While Cleaning the V8 Object : ',e )
354 }
355
356}
357
358v8_profiler.prototype.handleEndDownloadFile = function(errorCallBack){
359
360 try{
361 var self = this
362 if (self.dataSocket.client && self.dataSocket.client._writableState && self.dataSocket.client._writableState.length == 0 ) {
363 ut.logger.info(agentSetting.currentTestRun + " | End of downloading file Process Cycle");
364 self.cleanV8Object(undefined,errorCallBack)
365 }
366 else {
367 if(self.dataSocket.client && self.dataSocket.client.writable)
368 setTimeout(function(){self.handleEndDownloadFile(errorCallBack)}, 2000)
369 else{
370 self.cleanV8Object(undefined,errorCallBack)
371
372 }
373 }
374 }
375 catch(e) {
376 self.cleanV8Object(e,errorCallBack)
377 }
378}
379
380v8_profiler.takeHeapSnapShot = function(clientSocket) {
381 try {
382 if (!profiler) {
383 ut.logger.error(agentSetting.currentTestRun, '| Cannot load cavisson-profiling-tool ,cavisson-profiling-tool value is :', profiler)
384 agentSetting.isHeapDumpInProgress = false
385 startTimer(clientSocket)
386 return;
387 }
388 ut.logger.info(agentSetting.currentTestRun, "| profiler.takeHeapSnapShot , Taking heapsnapshot")
389
390 /*
391 1. If agent is busy in taking heap dump then agent will not send Heart beat msg to NDC,
392 Because NDC will ignore that heartbeat and in that case agent will not rcv any heart beat reply,
393 and after some threshold agent will close connection and make switchover
394 2. NDC have timeout of 10 min for any request, so for each request agent pauses Heart beat interval for 10 min
395 because if agent is sending any file that is taking more then 10 min to process, so agent will wait for complete transfer.
396 * */
397
398 clearInterval(agentSetting.reconnectTimer) ;agentSetting.reconnectTimer = undefined; //Clearing reconnect timer interval
399 clearTimeout(timeOutToStartHeartBeat) ; timeOutToStartHeartBeat= undefined
400 if(!timeOutToStartHeartBeat){
401 timeOutToStartHeartBeat = setTimeout(function(){
402 startTimer(clientSocket)
403 },600000)
404 }
405 var snapshot1 = profiler.takeSnapshot();
406
407 var size =0,stream,chunked=0
408 var gzip = zlib.createGzip();
409
410 clientSocket.write("nd_meta_data_rep:action=get_heap_dump;Size=" + 0 + ";CompressMode=1\n")
411 stream = snapshot1.export()
412 .pipe(gzip) //This is faster because in pipes, transformes uses async function to transform and transmitt the data
413 .on('error', function (e) {})
414 .on('data', function (chunk) {
415 size += chunk.length
416 ++chunked
417 var flag = (clientSocket.write(chunk), function (err) {
418 }) //For back pressuring we are using data event ,not pipe
419 if (!flag) {
420 stream.pause();
421 ut.logger.info(agentSetting.currentTestRun + "There will be no additional data for 0.5 second.");
422 setTimeout(function () {
423 ut.logger.info(agentSetting.currentTestRun + 'Now data will start flowing again.');
424 stream.resume();
425 }, 500);
426 }
427 })
428 .on('end', function (chunk) {
429 snapshot1.delete();
430 clientSocket.write("nd_meta_data_rep:action=get_heap_dump;result=OK\n")
431 ut.logger.info(agentSetting.currentTestRun + "nd_meta_data_rep:action=get_heap_dump;result=OK")
432 ut.logger.info(agentSetting.currentTestRun + "Total dumped data : ", size,"chunked : ",chunked)
433 })
434 }
435 catch(e){
436 ut.logger.info(agentSetting.currentTestRun + "| Error occured during Taking HeapDump (error): "+e)
437 }
438}
439
440v8_profiler.createData = function(sb,data) {
441 sb.clear();
442
443 sb.add(data);
444 sb.add('\n')
445
446 return sb;
447}
448
449v8_profiler.startCpuProfiling = function(clientSocket) {
450 try {
451 if (!profiler) {
452 ut.logger.error(agentSetting.currentTestRun, '| Cannot load cavisson-profilling-tool ,cavisson-profilling-tool value is :', profiler)
453 startTimer(clientSocket);
454 return
455 }
456 ut.logger.info(agentSetting.currentTestRun, "| Starting cpuProfiling ");
457 clearInterval(agentSetting.reconnectTimer) //Clearing reconnect timer interval
458 agentSetting.reconnectTimer = undefined;
459 profiler.startProfiling('', true);
460 setTimeout(function () {
461 var profile1 = profiler.stopProfiling();
462 var sb = new stringBuffer();
463 profile1.export(function (err, data) {
464 ut.logger.info(agentSetting.currentTestRun, '| Going to export CPU profiling data')
465 if (err)
466 ut.logger.info(agentSetting.currentTestRun + "| Error in cpu_profiling : " + err);
467 try {
468 var profilingData = v8_profiler.createData(sb, data).toString() + "\n";
469
470 if (profilingData.length) {
471
472 clientSocket.write("nd_meta_data_rep:action=get_thread_dump;Size=" + profilingData.length + ";CompressMode= +(compressMode == false ? 0:1)"+ "\n");
473 clientSocket.write(profilingData + "\n");
474 clientSocket.write("nd_meta_data_rep:action=get_thread_dump;result=Ok;" + "Size=" + profilingData.length + ";CompressMode= +(compressMode == false ? 0:1) "+ "\n");
475 ut.logger.info(agentSetting.currentTestRun + " | Dumping cpu profiling data : \n" + profilingData.length);
476 }
477 else {
478 clientSocket.write("nd_meta_data_rep:action=get_thread_dump;result=Error:'<'Unable to take cpu_profiling please check in bci error log.>\n");
479 ut.logger.info(agentSetting.currentTestRun + " | Size of cpu profiling data is 0");
480 }
481 profile1.delete();
482 startTimer(clientSocket);
483 }
484 catch (err) {
485 startTimer(clientSocket);
486 ut.logger.warn(agentSetting.currentTestRun + "| Error in Dumping metarecord for Backend :" + err);
487 }
488
489 })
490 }, agentSetting.nodejsCpuProfilingTime); //Profiling CPU for particular time
491 }
492 catch(e){
493 startTimer(clientSocket);
494 ut.logger.error("Error in CPU profiling",e)
495 }
496}
497
498function startTimer(clientSocket){
499 var conn = require('./controlmessage-handler')
500 conn.startHealthCheckTimer(clientSocket);
501}
502
503v8_profiler.prototype.downloadFileOnNewConn = function(clientSocket,dataSocket,clientMsg,id,fileName,deleteFile,errorCallBack) {
504 try {
505 var self = this
506 self.dataSocket = dataSocket;
507 var stats = fs.statSync(fileName)
508 var fileSizeInBytes = stats["size"]
509 var msg = "download_file_data_req:Id=" + id + ";FileName=" + fileName + ";Tier=" + agentSetting.tier + ";Server=" + agentSetting.server + ";Instance=" + agentSetting.instance + ";Size=" + fileSizeInBytes + ";\n"
510 ut.logger.info(msg)
511 self.dataSocket.write(msg)
512 self.dataSocket.write("File Downloaded\n")
513 self.downloadFileToNDC(fileName, deleteFile, errorCallBack)
514
515 } catch (e) {
516 self.cleanV8Object(e,errorCallBack)
517 ut.logger.error(" Error in Downloading File - : ", e)
518 }
519}
520
521module.exports = v8_profiler;