UNPKG

17.3 kBJavaScriptView Raw
1'use strict'
2
3const log = require('debug')('interactor:client')
4const fs = require('fs')
5const path = require('path')
6const rpc = require('pm2-axon-rpc')
7const axon = require('pm2-axon')
8const chalk = require('chalk')
9const os = require('os')
10const constants = require('../constants')
11const childProcess = require('child_process')
12
13const printError = function (msg) {
14 if (process.env.PM2_SILENT || process.env.PM2_PROGRAMMATIC) return false
15 if (msg instanceof Error) return console.error(msg.message)
16 return console.error.apply(console, arguments)
17}
18const printOut = function (msg) {
19 if (process.env.PM2_SILENT || process.env.PM2_PROGRAMMATIC) return false
20 return console.log.apply(console, arguments)
21}
22
23module.exports = class InteractorDaemonizer {
24 /**
25 * Ping the Interactor to see if its online
26 * @param {Object} opts global constants
27 * @param {String} opts.INTERACTOR_RPC_PORT path used to connect to the interactor
28 * @param {Function} cb invoked with <err, result>
29 */
30 static ping (opts, cb) {
31 if (typeof cb !== 'function') {
32 throw new Error('Missing parameters')
33 } else if (typeof opts !== 'object' || !opts || !opts.INTERACTOR_RPC_PORT) {
34 return cb(new Error('Missing parameters'))
35 }
36 const req = axon.socket('req')
37 const client = new rpc.Client(req)
38
39 log('[PING INTERACTOR] Trying to connect to Interactor daemon')
40
41 client.sock.once('reconnect attempt', _ => {
42 client.sock.close()
43 log('Interactor Daemon not launched')
44 return cb(null, false)
45 })
46
47 client.sock.once('connect', _ => {
48 client.sock.once('close', _ => {
49 return cb(null, true)
50 })
51 client.sock.close()
52 log('Interactor Daemon alive')
53 })
54
55 client.sock.once('error', (e) => {
56 if (e.code === 'EACCES') {
57 fs.stat(opts.INTERACTOR_RPC_PORT, (e, stats) => {
58 if (stats.uid === 0) {
59 console.error('Permission denied, activate current user')
60 return process.exit(1)
61 }
62 })
63 } else {
64 console.error('unexpected error')
65 console.error(e)
66 }
67 })
68
69 req.connect(opts.INTERACTOR_RPC_PORT)
70 }
71
72 /**
73 * Try to kill the interactor daemon via RPC
74 * @param {Object} conf global constants
75 * @param {String} conf.INTERACTOR_RPC_PORT path used to connect to the interactor
76 * @param {Function} cb invoked with <err>
77 */
78 static killInteractorDaemon (conf, cb) {
79 process.env.PM2_INTERACTOR_PROCESSING = true
80
81 log('Killing interactor #1 ping')
82 this.ping(conf, (err, online) => {
83 log(`Interactor is ${!online || err ? 'offline' : 'online'}`)
84
85 if (!online || err) {
86 return cb ? err ? cb(err) : cb(new Error('Interactor not launched')) : printError('Interactor not launched')
87 }
88
89 this.launchRPC(conf, (err, data) => {
90 if (err) {
91 setTimeout(_ => {
92 this.disconnectRPC(cb)
93 }, 100)
94 return false
95 }
96 this.rpc.kill((err) => {
97 if (err) printError(err)
98 setTimeout(_ => {
99 this.disconnectRPC(cb)
100 }, 100)
101 })
102 return false
103 })
104 return false
105 })
106 }
107
108 /**
109 * Start a RPC client that connect to the InteractorDaemon
110 * @param {Object} conf global constants
111 * @param {Function} cb invoked with <err>
112 */
113 static launchRPC (conf, cb) {
114 const req = axon.socket('req')
115 this.rpc = {}
116 this.client = new rpc.Client(req)
117
118 log('Generating Interactor methods of RPC client')
119
120 // attach known methods to RPC client
121 const generateMethods = (cb) => {
122 this.client.methods((err, methods) => {
123 if (err) return cb(err)
124 Object.keys(methods).forEach((key) => {
125 let method = methods[key]
126 log('+ Adding %s method to interactor RPC client', method.name);
127 ((name) => {
128 let self = this
129 this.rpc[name] = function () {
130 let args = Array.prototype.slice.call(arguments)
131 args.unshift(name)
132 self.client.call.apply(self.client, args)
133 }
134 })(method.name)
135 })
136 return cb()
137 })
138 }
139
140 this.client.sock.once('reconnect attempt', (err) => {
141 this.client.sock.removeAllListeners()
142 return cb(err, { success: false, msg: 'reconnect attempt' })
143 })
144
145 this.client.sock.once('error', (err) => {
146 log('-- Error in error catch all on Interactor --', err)
147 return cb(err, { success: false, msg: 'reconnect attempt' })
148 })
149
150 this.client.sock.once('connect', () => {
151 this.client.sock.removeAllListeners()
152 generateMethods(_ => {
153 log('Methods of RPC client for Interaction ready.')
154 return cb(null, { success: true })
155 })
156 })
157
158 this.client_sock = req.connect(conf.INTERACTOR_RPC_PORT)
159 }
160
161 /**
162 * Start or Restart the Interaction Daemon depending if its online or not
163 * @private
164 * @param {Object} conf global constants
165 * @param {Object} infos data used to start the interactor [can be recovered from FS]
166 * @param {String} infos.secret_key the secret key used to cipher data
167 * @param {String} infos.public_key the public key used identify the user
168 * @param {String} infos.machine_name [optional] override name of the machine
169 * @param {Function} cb invoked with <err, msg, process>
170 */
171 static daemonize (cst, conf, cb) {
172 const InteractorJS = path.resolve(path.dirname(module.filename), 'InteractorDaemon.js')
173
174 // Redirect PM2 internal err and out
175 // to STDERR STDOUT when running with Travis
176 const testEnv = process.env.TRAVIS || (process.env.NODE_ENV && process.env.NODE_ENV.match(/test/))
177 const out = testEnv ? 1 : fs.openSync(constants.INTERACTOR_LOG_FILE_PATH, 'a')
178 const err = testEnv ? 2 : fs.openSync(constants.INTERACTOR_LOG_FILE_PATH, 'a')
179
180 let binary = process.execPath
181 if (binary.indexOf('node') === -1) {
182 binary = 'node'
183 }
184 if (process.env.NODEJS_EXECUTABLE) {
185 binary = process.env.NODEJS_EXECUTABLE
186 }
187
188 const child = childProcess.spawn(binary, [InteractorJS], {
189 silent: false,
190 detached: true,
191 cwd: process.cwd(),
192 env: Object.assign({
193 PM2_HOME: cst.PM2_HOME,
194 PM2_MACHINE_NAME: conf.machine_name,
195 PM2_SECRET_KEY: conf.secret_key,
196 PM2_PUBLIC_KEY: conf.public_key,
197 PM2_REVERSE_INTERACT: conf.reverse_interact,
198 KEYMETRICS_NODE: conf.info_node,
199 AGENT_TRANSPORT_AXON: conf.agent_transport_axon,
200 AGENT_TRANSPORT_WEBSOCKET: conf.agent_transport_websocket,
201 PM2_VERSION: conf.pm2_version,
202 DEBUG: process.env.DEBUG || 'interactor:*,-interactor:axon,-interactor:websocket,-interactor:pm2:client,-interactor:push'
203 }, process.env),
204 stdio: ['ipc', out, err]
205 })
206
207 try {
208 let prevPid = fs.readFileSync(constants.INTERACTOR_PID_PATH)
209 prevPid = parseInt(prevPid)
210 process.kill(prevPid)
211 } catch (e) {
212 }
213
214 let pid = ''
215
216 if (child.pid)
217 pid = child.pid.toString()
218
219 fs.writeFileSync(cst.INTERACTOR_PID_PATH, pid)
220
221 child.on('close', (status) => {
222 if (status === constants.ERROR_EXIT) {
223 return cb(new Error('Agent has shutdown for unknown reason'))
224 }
225 return cb()
226 })
227
228 child.once('error', (err) => {
229 log('Error when launching Interactor, please check the agent logs')
230 return cb(err)
231 })
232
233 child.unref()
234
235 const timeout = setTimeout(_ => {
236 printOut(`${chalk.yellow('[PM2.IO][WARNING]')} Not managed to connect to PM2 Plus, retrying in background.`)
237 child.removeAllListeners()
238 child.disconnect()
239 return cb(null, {}, child)
240 }, 7000)
241
242 child.once('message', (msg) => {
243 clearTimeout(timeout)
244 log('Interactor daemon launched :', msg)
245
246 if (msg.log) {
247 return cb(null, msg, child)
248 }
249
250 child.removeAllListeners('error')
251 child.disconnect()
252
253 // Handle and show to user the different error message that can happen
254 if (msg.km_data && msg.km_data.error === true) {
255 if (!process.env.PM2_SILENT) {
256 console.log(chalk.red('[PM2.IO][ERROR]'), msg.km_data.msg)
257 console.log(chalk.cyan('[PM2.IO]') + ' Contact support contact@keymetrics.io and send us the error message')
258 }
259 return cb(msg)
260 } else if (msg.km_data && msg.km_data.disabled === true) {
261 if (!process.env.PM2_SILENT) {
262 console.log(chalk.cyan('[PM2.IO]') + ' Server DISABLED BY ADMINISTRATION contact support contact@keymetrics.io with reference to your public and secret keys)')
263 }
264 return cb(msg)
265 } else if (msg.km_data && msg.km_data.error === true) {
266 if (!process.env.PM2_SILENT) {
267 console.log('%s %s (Public: %s) (Secret: %s) (Machine name: %s)', chalk.red('[PM2.IO][ERROR]'),
268 msg.km_data.msg, msg.public_key, msg.secret_key, msg.machine_name)
269 }
270 return cb(msg)
271 } else if (msg.km_data && msg.km_data.active === false && msg.km_data.pending === true) {
272 if (!process.env.PM2_SILENT) {
273 console.log('%s You must upgrade your bucket in order to monitor more servers.', chalk.red('[PM2.IO]'))
274 }
275 return cb(msg)
276 }
277
278 return cb(null, msg, child)
279 })
280 }
281
282 /**
283 * Start or Restart the Interaction Daemon depending if its online or not
284 * @private
285 * @param {Object} conf global constants
286 * @param {Object} infos data used to start the interactor [can be recovered from FS]
287 * @param {String} infos.secret_key the secret key used to cipher data
288 * @param {String} infos.public_key the public key used identify the user
289 * @param {String} infos.machine_name [optional] override name of the machine
290 * @param {Function} cb invoked with <err, msg, process>
291 */
292 static launchOrAttach (conf, infos, cb) {
293 this.ping(conf, (err, online) => {
294 if (!err && online) {
295 log('Interactor online, restarting it...')
296 this.launchRPC(conf, _ => {
297 this.rpc.kill((ignoredErr) => {
298 this.daemonize(conf, infos, cb)
299 })
300 })
301 } else {
302 log('Interactor offline, launching it...')
303 this.daemonize(conf, infos, cb)
304 }
305 })
306 }
307
308 /**
309 * Restart the Interactor Daemon
310 * @param {Object} conf global constants
311 * @param {Function} cb invoked with <err, msg>
312 */
313 static update (conf, cb) {
314 this.ping(conf, (err, online) => {
315 if (err || !online) {
316 return cb ? cb(new Error('Interactor not launched')) : printError('Interactor not launched')
317 }
318 this.launchRPC(conf, _ => {
319 this.rpc.kill((err) => {
320 if (err) {
321 return cb ? cb(err) : printError(err)
322 }
323 printOut('Interactor successfully killed')
324 setTimeout(_ => {
325 this.launchAndInteract(conf, {}, _ => {
326 return cb(null, { msg: 'Daemon launched' })
327 })
328 }, 500)
329 })
330 })
331 })
332 }
333
334 /**
335 * Retrieve Interactor configuration from env, params and filesystem.
336 * @param {Object} cst global constants
337 * @param {Object} infos data used to start the interactor [optional]
338 * @param {String} infos.secret_key the secret key used to cipher data [optional]
339 * @param {String} infos.public_key the public key used identify the user [optional]
340 * @param {String} infos.machine_name override name of the machine [optional]
341 * @param {Function} cb invoked with <err, configuration>
342 */
343 static getOrSetConf (cst, infos, cb) {
344 infos = infos || {}
345 let configuration = {
346 version_management: {
347 active: true
348 }
349 }
350 let confFS = {}
351
352 // Try loading configuration file on FS
353 try {
354 let fileContent = fs.readFileSync(cst.INTERACTION_CONF).toString()
355 // Handle old configuration with json5
356 fileContent = fileContent.replace(/\s(\w+):/g, '"$1":')
357 // parse
358 confFS = JSON.parse(fileContent)
359
360 if (confFS.version_management) {
361 configuration.version_management.active = confFS.version_management.active
362 }
363 } catch (e) {
364 log('Interaction file does not exists')
365 }
366
367 // load the configration (first have priority)
368 // -> from env variable
369 // -> from params (eg. CLI)
370 // -> from configuration on FS
371 configuration.public_key = process.env.PM2_PUBLIC_KEY || process.env.KEYMETRICS_PUBLIC || infos.public_key || confFS.public_key
372 configuration.secret_key = process.env.PM2_SECRET_KEY || process.env.KEYMETRICS_SECRET || infos.secret_key || confFS.secret_key
373 configuration.machine_name = process.env.PM2_MACHINE_NAME || process.env.INSTANCE_NAME || infos.machine_name || confFS.machine_name || os.hostname()
374 configuration.pm2_version = process.env.PM2_VERSION || infos.pm2_version || confFS.pm2_version
375 configuration.reverse_interact = confFS.reverse_interact || true
376 // is setup empty ? use the one provided in env OR root OTHERWISE get the one on FS conf OR fallback on root
377 configuration.info_node = process.env.KEYMETRICS_NODE || infos.info_node || confFS.info_node || cst.KEYMETRICS_ROOT_URL
378 configuration.agent_transport_websocket = process.env.AGENT_TRANSPORT_WEBSOCKET || infos.agent_transport_websocket || confFS.agent_transport_websocket || 'false'
379 configuration.agent_transport_axon = process.env.AGENT_TRANSPORT_AXON || infos.agent_transport_axon || confFS.agent_transport_axon || 'true'
380
381 if (!configuration.secret_key) {
382 log('Secret key is not defined in configuration', configuration)
383 return cb(new Error('secret key is not defined'))
384 }
385 if (!configuration.public_key) {
386 log('Public key is not defined in configuration', configuration)
387 return cb(new Error('public key is not defined'))
388 }
389
390 // write configuration on FS
391 try {
392 fs.writeFileSync(cst.INTERACTION_CONF, JSON.stringify(configuration, null, 4))
393 } catch (e) {
394 console.error('Error when writting configuration file %s', cst.INTERACTION_CONF)
395 return cb(e)
396 }
397 if (configuration.info_node.indexOf('http') === -1) { // handle old file
398 configuration.info_node = `https://${configuration.info_node}`
399 }
400 return cb(null, configuration)
401 }
402
403 /**
404 * Disconnect the RPC client from Interactor Daemon
405 * @param {Function} cb invoked with <err, msg>
406 */
407 static disconnectRPC (cb) {
408 log('Disconnect RPC')
409 if (!this.client_sock || !this.client_sock.close) {
410 log('RPC not launched')
411 return cb(null, {
412 success: false,
413 msg: 'RPC connection to Interactor Daemon is not launched'
414 })
415 }
416
417 if (this.client_sock.closing === true) {
418 log('RPC already closed')
419 return cb(null, {
420 success: false,
421 msg: 'RPC closed'
422 })
423 }
424
425 try {
426 let timer
427
428 log('Closing RPC INTERACTOR')
429
430 this.client_sock.once('close', _ => {
431 log('RPC INTERACTOR cleanly closed')
432 clearTimeout(timer)
433 return cb ? cb(null, { success: true }) : false
434 })
435
436 timer = setTimeout(_ => {
437 if (this.client_sock.destroy) {
438 this.client_sock.destroy()
439 }
440 return cb ? cb(null, { success: true }) : false
441 }, 200)
442
443 this.client_sock.close()
444 } catch (err) {
445 log('Error while closing RPC INTERACTOR : %s', err.message || err)
446 return cb ? cb(err) : false
447 }
448 }
449
450 /**
451 * Start the Interactor Daemon
452 * @param {Object} cst global constants
453 * @param {Object} infos data used to start the interactor [can be recovered from FS]
454 * @param {String} infos.secret_key the secret key used to cipher data
455 * @param {String} infos.public_key the public key used identify the user
456 * @param {String} infos.machine_name [optional] override name of the machine
457 * @param {Function} cb invoked with <err, msg, process>
458 */
459 static launchAndInteract (cst, opts, cb) {
460 // For Watchdog
461 if (process.env.PM2_AGENT_ONLINE) {
462 return cb()
463 }
464
465 process.env.PM2_INTERACTOR_PROCESSING = true
466
467 this.getOrSetConf(Object.assign(cst, constants), opts, (err, conf) => {
468 if (err || !conf) return cb(err || new Error('Cant retrieve configuration'))
469
470 if (!process.env.PM2_SILENT) {
471 console.log(chalk.cyan('[PM2 I/O]') + ' Using: Public key: %s | Private key: %s | Machine name: %s', conf.public_key, conf.secret_key, conf.machine_name)
472 }
473 return this.launchOrAttach(cst, conf, cb)
474 })
475 }
476
477 /**
478 * Retrieve configuration used by the Interaction Daemon
479 * @param {Object} cst global constants
480 * @param {Function} cb invoked with <err, data>
481 */
482 static getInteractInfo (cst, cb) {
483 log('Getting interaction info')
484 if (process.env.PM2_NO_INTERACTION) return cb(new Error('PM2_NO_INTERACTION set'))
485
486 this.ping(cst, (err, online) => {
487 if (err || !online) return cb(new Error('Interactor is offline'))
488
489 this.launchRPC(cst, _ => {
490 this.rpc.getInfos((err, infos) => {
491 if (err) return cb(err)
492
493 // Avoid general CLI to interfere with Keymetrics CLI commands
494 if (process.env.PM2_INTERACTOR_PROCESSING) return cb(null, infos)
495
496 this.disconnectRPC(() => {
497 return cb(null, infos)
498 })
499 })
500 })
501 })
502 }
503}