UNPKG

13.6 kBJavaScriptView Raw
1'use strict'
2
3const fs = require('fs')
4const rpc = require('pm2-axon-rpc')
5const axon = require('pm2-axon')
6const log = require('debug')('interactor:daemon')
7const os = require('os')
8const cst = require('../constants.js')
9const ReverseInteractor = require('./reverse/ReverseInteractor.js')
10const PushInteractor = require('./push/PushInteractor.js')
11const Utility = require('./Utility.js')
12const PM2Client = require('./PM2Client.js')
13const TransporterInterface = require('./TransporterInterface.js')
14const domain = require('domain') // eslint-disable-line
15const WatchDog = require('./WatchDog')
16const InteractorClient = require('./InteractorClient')
17const semver = require('semver')
18const path = require('path')
19const pkg = require('../package.json')
20
21global._logs = false
22
23const InteractorDaemon = module.exports = class InteractorDaemon {
24 constructor () {
25 this.opts = this.retrieveConf()
26
27 log(`MACHINE_NAME=${this.opts.MACHINE_NAME}`)
28 log(`PUBLIC_KEY=${this.opts.PUBLIC_KEY}`)
29 log(`WEBSOCKET_ENABLED=${process.env.AGENT_TRANSPORT_WEBSOCKET}`)
30 log(`AXON_ENABLED=${process.env.AGENT_TRANSPORT_AXON}`)
31 log(`ROOT_URL=${cst.KEYMETRICS_ROOT_URL}`)
32
33 this.DAEMON_ACTIVE = false
34 this.transport = new TransporterInterface(this.opts, this)
35 .bind('axon')
36 .bind('websocket')
37 this.transport.on('error', (err) => {
38 return console.error('[NETWORK] Error : ' + err.message || err)
39 })
40 this.httpClient = new Utility.HTTPClient()
41 this._online = true
42
43 this._internalDebugger()
44 }
45
46 /**
47 * Use process.send() if connected
48 * @param {Object} data
49 */
50 sendToParent (data) {
51 if (!process.connected || !process.send) return console.log('Could not send data to parent')
52
53 try {
54 process.send(data)
55 } catch (e) {
56 console.trace('Parent process disconnected')
57 }
58 }
59
60 /**
61 * Get an interface for communicating with PM2 daemon
62 * @private
63 * @return {PM2Client}
64 */
65 getPM2Client () {
66 if (!this._ipm2) {
67 this._ipm2 = new PM2Client()
68 }
69 return this._ipm2
70 }
71
72 /**
73 * Terminate aconnections and exit
74 * @param {cb} callback called at the end
75 */
76 exit (err, cb) {
77 log('Exiting Interactor')
78 // clear workers
79 if (this._workerEndpoint) clearInterval(this._workerEndpoint)
80
81 // stop interactors
82 if (this.reverse) this.reverse.stop()
83 if (this.push) this.push.stop()
84
85 if (this._ipm2) this._ipm2.disconnect()
86 if (this.watchDog) this.watchDog.stop()
87 // stop transport
88 if (this.transport) this.transport.disconnect()
89
90 if (!err) {
91 try {
92 fs.unlinkSync(cst.INTERACTOR_RPC_PORT)
93 fs.unlinkSync(cst.INTERACTOR_PID_PATH)
94 } catch (err) {}
95 }
96
97 if (!this._rpc || !this._rpc.sock) {
98 return process.exit(cst.ERROR_EXIT)
99 }
100
101 if (typeof cb === 'function') {
102 cb()
103 }
104
105 setTimeout(() => {
106 this._rpc.sock.close(() => {
107 log('RPC server closed')
108 process.exit(err ? cst.ERROR_EXIT : cst.SUCCESS_EXIT)
109 })
110 }, 10)
111 }
112
113 /**
114 * Start a RPC server and expose it throught a socket file
115 */
116 startRPC (opts) {
117 log('Launching Interactor RPC server (bind to %s)', cst.INTERACTOR_RPC_PORT)
118 const rep = axon.socket('rep')
119 const rpcServer = new rpc.Server(rep)
120 const self = this
121 rep.bind(cst.INTERACTOR_RPC_PORT)
122
123 rpcServer.expose({
124 kill: function (cb) {
125 log('Shutdown request received via RPC')
126 return self.exit(null, cb)
127 },
128 getInfos: function (cb) {
129 if (self.opts && self.DAEMON_ACTIVE === true) {
130 return cb(null, {
131 machine_name: self.opts.MACHINE_NAME,
132 public_key: self.opts.PUBLIC_KEY,
133 secret_key: self.opts.SECRET_KEY,
134 remote_host: self.km_data.endpoints.web,
135 connected: self.transport.isConnected(),
136 transporters: self.transport.getActiveTransporters(),
137 socket_path: cst.INTERACTOR_RPC_PORT,
138 pm2_home_monitored: cst.PM2_HOME
139 })
140 } else {
141 return cb(null)
142 }
143 }
144 })
145 return rpcServer
146 }
147
148 /**
149 * Handle specific signals to launch memory / cpu profiling
150 * if available in node
151 */
152 _internalDebugger () {
153 // inspector isn't available under node 8
154 if (semver.satisfies(process.version, '<8')) return
155
156 const inspector = require('inspector')
157 const state = {
158 heap: false,
159 cpu: false,
160 session: null
161 }
162 const commands = {
163 heap: {
164 start: 'HeapProfiler.startSampling',
165 stop: 'HeapProfiler.stopSampling'
166 },
167 cpu: {
168 start: 'Profiler.start',
169 stop: 'Profiler.stop'
170 }
171 }
172
173 const handleSignal = type => {
174 return _ => {
175 if (state.session === null) {
176 state.session = new inspector.Session()
177 state.session.connect()
178 }
179
180 const isAlreadyEnabled = state[type]
181 const debuggerCommands = commands[type]
182 const profilerDomain = type === 'cpu' ? 'Profiler' : 'HeapProfiler'
183 const fileExt = type === 'heap' ? '.heapprofile' : '.cpuprofile'
184
185 if (isAlreadyEnabled) {
186 // stopping the profiling and writting it to disk if its running
187 console.log(`[DEBUG] Stopping ${type.toUpperCase()} Profiling`)
188 state.session.post(debuggerCommands.stop, (err, data) => {
189 const profile = data.profile
190 if (err) return console.error(err)
191 const randomId = Math.random().toString(36)
192 const profilePath = path.resolve(os.tmpdir(), `${type}-${randomId}${fileExt}`)
193
194 fs.writeFileSync(profilePath, JSON.stringify(profile))
195 console.log(`[DEBUG] Writing file in ${profilePath}`)
196 state[type] = false
197 state.session.post(`${profilerDomain}.disable`)
198 })
199 } else {
200 // start the profiling otherwise
201 console.log(`[DEBUG] Starting ${type.toUpperCase()} Profiling`)
202 state.session.post(`${profilerDomain}.enable`, _ => {
203 state.session.post(debuggerCommands.start)
204 state[type] = true
205 })
206 }
207 }
208 }
209
210 // use hook
211 process.on('SIGUSR1', handleSignal('cpu'))
212 process.on('SIGUSR2', handleSignal('heap'))
213 }
214
215 /**
216 * Retrieve metadata about the system
217 */
218 getSystemMetadata () {
219 return {
220 MACHINE_NAME: this.opts.MACHINE_NAME,
221 PUBLIC_KEY: this.opts.PUBLIC_KEY,
222 RECYCLE: this.opts.RECYCLE || false,
223 PM2_VERSION: process.env.PM2_VERSION,
224 MEMORY: os.totalmem() / 1000 / 1000,
225 HOSTNAME: os.hostname(),
226 CPUS: os.cpus()
227 }
228 }
229
230 /**
231 * Ping root url to retrieve node info
232 * @private
233 * @param {Function} cb invoked with <Error, Object> where Object is the response sended by the server
234 */
235 _pingRoot (cb) {
236 const data = this.getSystemMetadata()
237
238 this.httpClient.open({
239 url: this.opts.ROOT_URL + '/api/node/verifyPM2',
240 method: 'POST',
241 data: {
242 public_id: this.opts.PUBLIC_KEY,
243 private_id: this.opts.SECRET_KEY,
244 data: data
245 },
246 headers: {
247 'User-Agent': `PM2 Agent v${pkg.version}`
248 }
249 }, cb)
250 }
251
252 /**
253 * Ping root to verify retrieve and connect to the km endpoint
254 * @private
255 * @param {Function} cb invoked with <Error, Boolean>
256 */
257 _verifyEndpoint (cb) {
258 if (typeof cb !== 'function') cb = function () {}
259
260 this._pingRoot((err, data) => {
261 if (err) {
262 log('Got an a error on ping root', err)
263 return cb(err)
264 }
265
266 this.km_data = data
267
268 // Verify data integrity
269 if (data.disabled === true || data.pending === true) {
270 log('Interactor is disabled by admins')
271 return cb(new Error('Connection refused, you might have hit the limit of agents you can connect (send email at contact@keymetrics.io for more infos)'))
272 }
273 if (data.active === false) {
274 log('Interactor not active: %s', data.msg || 'no message')
275 return cb(null, data)
276 }
277 if (!data.endpoints) {
278 return cb(new Error(`Endpoints field not present (${JSON.stringify(data)})`))
279 }
280
281 this.DAEMON_ACTIVE = true
282 this.transport.connect(data.endpoints, cb)
283 })
284 }
285
286 /**
287 * Retrieve configuration from environnement
288 */
289 retrieveConf () {
290 let opts = {}
291
292 opts.MACHINE_NAME = process.env.PM2_MACHINE_NAME
293 opts.PUBLIC_KEY = process.env.PM2_PUBLIC_KEY
294 opts.SECRET_KEY = process.env.PM2_SECRET_KEY
295 opts.RECYCLE = process.env.KM_RECYCLE ? JSON.parse(process.env.KM_RECYCLE) : false
296 opts.PM2_VERSION = process.env.PM2_VERSION || '0.0.0'
297 opts.AGENT_TRANSPORT_AXON = process.env.AGENT_TRANSPORT_AXON
298 opts.AGENT_TRANSPORT_WEBSOCKET = process.env.AGENT_TRANSPORT_WEBSOCKET
299 opts.internal_ip = Utility.network.v4
300
301 opts.PM2_REMOTE_METHOD_ALLOWED = [
302 'restart',
303 'reload',
304 'reset',
305 'scale',
306 'startLogging',
307 'stopLogging',
308 'ping'
309 ]
310
311 if (!opts.MACHINE_NAME) {
312 console.error('You must provide a PM2_MACHINE_NAME environment variable')
313 process.exit(cst.ERROR_EXIT)
314 } else if (!opts.PUBLIC_KEY) {
315 console.error('You must provide a PM2_PUBLIC_KEY environment variable')
316 process.exit(cst.ERROR_EXIT)
317 } else if (!opts.SECRET_KEY) {
318 console.error('You must provide a PM2_SECRET_KEY environment variable')
319 process.exit(cst.ERROR_EXIT)
320 }
321 return opts
322 }
323
324 /**
325 * Ping root url to retrieve node info
326 * @private
327 * @param {Function} cb invoked with <Error> [optional]
328 */
329 start (cb) {
330 let retries = 0
331 this._rpc = this.startRPC()
332 this.opts.ROOT_URL = cst.KEYMETRICS_ROOT_URL
333
334 const verifyEndpointCallback = (err, result) => {
335 if (err) {
336 log('Error while trying to retrieve endpoints : ' + (err.message || err))
337 if (retries++ < 30 && process.env.NODE_ENV !== 'test') {
338 log('Retrying to retrieve endpoints...')
339 return setTimeout(_ => {
340 return this._verifyEndpoint(verifyEndpointCallback)
341 }, 200 * retries)
342 }
343 this.sendToParent({ error: true, msg: err.message || err })
344 return this.exit(new Error('Error retrieving endpoints'))
345 }
346 if (result === false) {
347 log('False returned while trying to retrieve endpoints')
348 return this.exit(new Error('Error retrieving endpoints'))
349 }
350
351 // send data over IPC for CLI feedback
352 this.sendToParent({
353 error: false,
354 km_data: this.km_data,
355 online: true,
356 pid: process.pid,
357 machine_name: this.opts.MACHINE_NAME,
358 public_key: this.opts.PUBLIC_KEY,
359 secret_key: this.opts.SECRET_KEY,
360 reverse_interaction: this.opts.REVERSE_INTERACT
361 })
362
363 if (result && typeof result === 'object' &&
364 result.error === true && result.active === false) {
365 log(`Error when connecting: ${result.msg}`)
366 return this.exit(new Error(`Error when connecting: ${result.msg}`))
367 }
368
369 // start workers
370 this._workerEndpoint = setInterval(this._verifyEndpoint.bind(this, (err, result) => {
371 if (err) return
372 // We need to exit agent if bucket is disabled (trialing)
373 if (result && typeof result === 'object' && result.error === true && result.active === false) {
374 log(`Error when connecting: ${result.msg}, disconnecting transporters`)
375 return this.transport.disconnect()
376 }
377 }), 60000)
378 // start interactors
379 this.watchDog = WatchDog
380
381 setTimeout(() => {
382 log('PM2 Watchdog started')
383 this.watchDog.start({
384 conf: {
385 ipm2: this.getPM2Client()
386 }
387 })
388 }, 1000 * 60 * 3)
389
390 this.push = new PushInteractor(this.opts, this.getPM2Client(), this.transport)
391 this.reverse = new ReverseInteractor(this.opts, this.getPM2Client(), this.transport)
392 this.push.start()
393 this.reverse.start()
394 log('Interactor daemon started')
395 if (cb) {
396 setTimeout(cb, 20)
397 }
398 }
399 return this._verifyEndpoint(verifyEndpointCallback)
400 }
401}
402
403// If its the entry file launch the daemon
404// otherwise we just required it to use a function
405if (require.main === module) {
406 const d = domain.create()
407 let daemon = null
408
409 d.on('error', function (err) {
410 console.error('-- FATAL EXCEPTION happened --')
411 console.error(new Date())
412 console.error(err.stack)
413 console.log('Re-initiating Agent')
414
415 InteractorClient.getOrSetConf(cst, null, (err, infos) => {
416 if (err || !infos) {
417 if (err) {
418 console.error('[PM2 Agent] Failed to rescue agent :')
419 console.error(err || new Error(`Cannot find configuration to connect to backend`))
420 return process.exit(1)
421 }
422 }
423 console.log(`[PM2 Agent] Using (Public key: ${infos.public_key}) (Private key: ${infos.secret_key}) (Info node: ${infos.info_node})`)
424
425 // Exit anyway the errored agent
426 var timeout = setTimeout(_ => {
427 console.error('Daemonization of failsafe agent did not worked')
428 daemon.exit(err)
429 }, 2000)
430
431 InteractorClient.daemonize(cst, infos, (err) => {
432 if (err) {
433 log('[PM2 Agent] Failed to rescue agent :')
434 log(err)
435 } else {
436 log(`Succesfully launched new agent`)
437 }
438 clearTimeout(timeout)
439 daemon.exit(err)
440 })
441 })
442 })
443
444 d.run(_ => {
445 daemon = new InteractorDaemon()
446
447 process.title = `PM2 Agent v${pkg.version}: (${cst.PM2_HOME})`
448
449 log('[PM2 Agent] Launching agent')
450 daemon.start()
451 })
452}
453
\No newline at end of file