UNPKG

17.3 kBJavaScriptView Raw
1'use strict'
2
3const log = require('debug')('interactor:client')
4const fs = require('fs')
5const path = require('path')
6const rpc = require('pm2-axon-rpc')
7const axon = require('pm2-axon')
8const chalk = require('chalk')
9const os = require('os')
10const constants = require('../constants')
11const childProcess = require('child_process')
12
13const printError = function (msg) {
14 if (process.env.PM2_SILENT || process.env.PM2_PROGRAMMATIC) return false
15 if (msg instanceof Error) return console.error(msg.message)
16 return console.error.apply(console, arguments)
17}
18const printOut = function (msg) {
19 if (process.env.PM2_SILENT || process.env.PM2_PROGRAMMATIC) return false
20 return console.log.apply(console, arguments)
21}
22
23module.exports = class InteractorDaemonizer {
24 /**
25 * Ping the Interactor to see if its online
26 * @param {Object} opts global constants
27 * @param {String} opts.INTERACTOR_RPC_PORT path used to connect to the interactor
28 * @param {Function} cb invoked with <err, result>
29 */
30 static ping (opts, cb) {
31 if (typeof cb !== 'function') {
32 throw new Error('Missing parameters')
33 } else if (typeof opts !== 'object' || !opts || !opts.INTERACTOR_RPC_PORT) {
34 return cb(new Error('Missing parameters'))
35 }
36 const req = axon.socket('req')
37 const client = new rpc.Client(req)
38
39 log('[PING INTERACTOR] Trying to connect to Interactor daemon')
40
41 client.sock.once('reconnect attempt', _ => {
42 client.sock.close()
43 log('Interactor Daemon not launched')
44 return cb(null, false)
45 })
46
47 client.sock.once('connect', _ => {
48 client.sock.once('close', _ => {
49 return cb(null, true)
50 })
51 client.sock.close()
52 log('Interactor Daemon alive')
53 })
54
55 client.sock.once('error', (e) => {
56 if (e.code === 'EACCES') {
57 fs.stat(opts.INTERACTOR_RPC_PORT, (e, stats) => {
58 if (stats.uid === 0) {
59 console.error('Permission denied, activate current user')
60 return process.exit(1)
61 }
62 })
63 } else {
64 console.error('unexpected error')
65 console.error(e)
66 }
67 })
68
69 req.connect(opts.INTERACTOR_RPC_PORT)
70 }
71
72 /**
73 * Try to kill the interactor daemon via RPC
74 * @param {Object} conf global constants
75 * @param {String} conf.INTERACTOR_RPC_PORT path used to connect to the interactor
76 * @param {Function} cb invoked with <err>
77 */
78 static killInteractorDaemon (conf, cb) {
79 process.env.PM2_INTERACTOR_PROCESSING = true
80
81 log('Killing interactor #1 ping')
82 this.ping(conf, (err, online) => {
83 log(`Interactor is ${!online || err ? 'offline' : 'online'}`)
84
85 if (!online || err) {
86 return cb ? err ? cb(err) : cb(new Error('Interactor not launched')) : printError('Interactor not launched')
87 }
88
89 this.launchRPC(conf, (err, data) => {
90 if (err) {
91 setTimeout(_ => {
92 this.disconnectRPC(cb)
93 }, 100)
94 return false
95 }
96 this.rpc.kill((err) => {
97 if (err) printError(err)
98 setTimeout(_ => {
99 this.disconnectRPC(cb)
100 }, 100)
101 })
102 return false
103 })
104 return false
105 })
106 }
107
108 /**
109 * Start a RPC client that connect to the InteractorDaemon
110 * @param {Object} conf global constants
111 * @param {Function} cb invoked with <err>
112 */
113 static launchRPC (conf, cb) {
114 const req = axon.socket('req')
115 this.rpc = {}
116 this.client = new rpc.Client(req)
117
118 log('Generating Interactor methods of RPC client')
119
120 // attach known methods to RPC client
121 const generateMethods = (cb) => {
122 this.client.methods((err, methods) => {
123 if (err) return cb(err)
124 Object.keys(methods).forEach((key) => {
125 let method = methods[key]
126 log('+ Adding %s method to interactor RPC client', method.name);
127 ((name) => {
128 let self = this
129 this.rpc[name] = function () {
130 let args = Array.prototype.slice.call(arguments)
131 args.unshift(name)
132 self.client.call.apply(self.client, args)
133 }
134 })(method.name)
135 })
136 return cb()
137 })
138 }
139
140 this.client.sock.once('reconnect attempt', (err) => {
141 this.client.sock.removeAllListeners()
142 return cb(err, { success: false, msg: 'reconnect attempt' })
143 })
144
145 this.client.sock.once('error', (err) => {
146 log('-- Error in error catch all on Interactor --', err)
147 return cb(err, { success: false, msg: 'reconnect attempt' })
148 })
149
150 this.client.sock.once('connect', () => {
151 this.client.sock.removeAllListeners()
152 generateMethods(_ => {
153 log('Methods of RPC client for Interaction ready.')
154 return cb(null, { success: true })
155 })
156 })
157
158 this.client_sock = req.connect(conf.INTERACTOR_RPC_PORT)
159 }
160
161 /**
162 * Start or Restart the Interaction Daemon depending if its online or not
163 * @private
164 * @param {Object} conf global constants
165 * @param {Object} infos data used to start the interactor [can be recovered from FS]
166 * @param {String} infos.secret_key the secret key used to cipher data
167 * @param {String} infos.public_key the public key used identify the user
168 * @param {String} infos.machine_name [optional] override name of the machine
169 * @param {Function} cb invoked with <err, msg, process>
170 */
171 static daemonize (cst, conf, cb) {
172 const InteractorJS = path.resolve(path.dirname(module.filename), 'InteractorDaemon.js')
173
174 // Redirect PM2 internal err and out
175 // to STDERR STDOUT when running with Travis
176 const testEnv = process.env.TRAVIS || (process.env.NODE_ENV && process.env.NODE_ENV.match(/test/))
177 const out = testEnv ? 1 : fs.openSync(constants.INTERACTOR_LOG_FILE_PATH, 'a')
178 const err = testEnv ? 2 : fs.openSync(constants.INTERACTOR_LOG_FILE_PATH, 'a')
179
180 let binary = process.execPath
181 if (binary.indexOf('node') === -1) {
182 binary = 'node'
183 }
184 if (process.env.NODEJS_EXECUTABLE) {
185 binary = process.env.NODEJS_EXECUTABLE
186 }
187
188 const child = childProcess.spawn(binary, [InteractorJS], {
189 silent: false,
190 detached: true,
191 cwd: process.cwd(),
192 env: Object.assign({
193 PM2_HOME: cst.PM2_HOME,
194 PM2_MACHINE_NAME: conf.machine_name,
195 PM2_SECRET_KEY: conf.secret_key,
196 PM2_PUBLIC_KEY: conf.public_key,
197 PM2_REVERSE_INTERACT: conf.reverse_interact,
198 KEYMETRICS_NODE: conf.info_node,
199 AGENT_TRANSPORT_AXON: conf.agent_transport_axon,
200 AGENT_TRANSPORT_WEBSOCKET: conf.agent_transport_websocket,
201 PM2_VERSION: conf.pm2_version,
202 DEBUG: process.env.DEBUG || 'interactor:*,-interactor:axon,-interactor:websocket,-interactor:pm2:client,-interactor:push'
203 }, process.env),
204 stdio: ['ipc', out, err]
205 })
206
207 try {
208 let prevPid = fs.readFileSync(constants.INTERACTOR_PID_PATH)
209 prevPid = parseInt(prevPid)
210 process.kill(prevPid)
211 } catch (e) {
212 }
213
214 fs.writeFileSync(cst.INTERACTOR_PID_PATH, child.pid)
215
216 child.on('close', (status) => {
217 if (status === constants.ERROR_EXIT) {
218 return cb(new Error('Agent has shutdown for unknown reason'))
219 }
220 return cb()
221 })
222
223 child.once('error', (err) => {
224 log('Error when launching Interactor, please check the agent logs')
225 return cb(err)
226 })
227
228 child.unref()
229
230 const timeout = setTimeout(_ => {
231 printOut(`${chalk.yellow('[PM2.IO][WARNING]')} Not managed to connect to PM2 Plus, retrying in background.`)
232 child.removeAllListeners()
233 child.disconnect()
234 return cb(null, {}, child)
235 }, 7000)
236
237 child.once('message', (msg) => {
238 clearTimeout(timeout)
239 log('Interactor daemon launched :', msg)
240
241 if (msg.log) {
242 return cb(null, msg, child)
243 }
244
245 child.removeAllListeners('error')
246 child.disconnect()
247
248 // Handle and show to user the different error message that can happen
249 if (msg.km_data && msg.km_data.error === true) {
250 if (!process.env.PM2_SILENT) {
251 console.log(chalk.red('[PM2.IO][ERROR]'), msg.km_data.msg)
252 console.log(chalk.cyan('[PM2.IO]') + ' Contact support contact@keymetrics.io and send us the error message')
253 }
254 return cb(msg)
255 } else if (msg.km_data && msg.km_data.disabled === true) {
256 if (!process.env.PM2_SILENT) {
257 console.log(chalk.cyan('[PM2.IO]') + ' Server DISABLED BY ADMINISTRATION contact support contact@keymetrics.io with reference to your public and secret keys)')
258 }
259 return cb(msg)
260 } else if (msg.km_data && msg.km_data.error === true) {
261 if (!process.env.PM2_SILENT) {
262 console.log('%s %s (Public: %s) (Secret: %s) (Machine name: %s)', chalk.red('[PM2.IO][ERROR]'),
263 msg.km_data.msg, msg.public_key, msg.secret_key, msg.machine_name)
264 }
265 return cb(msg)
266 } else if (msg.km_data && msg.km_data.active === false && msg.km_data.pending === true) {
267 if (!process.env.PM2_SILENT) {
268 console.log('%s You must upgrade your bucket in order to monitor more servers.', chalk.red('[PM2.IO]'))
269 }
270 return cb(msg)
271 }
272
273 return cb(null, msg, child)
274 })
275 }
276
277 /**
278 * Start or Restart the Interaction Daemon depending if its online or not
279 * @private
280 * @param {Object} conf global constants
281 * @param {Object} infos data used to start the interactor [can be recovered from FS]
282 * @param {String} infos.secret_key the secret key used to cipher data
283 * @param {String} infos.public_key the public key used identify the user
284 * @param {String} infos.machine_name [optional] override name of the machine
285 * @param {Function} cb invoked with <err, msg, process>
286 */
287 static launchOrAttach (conf, infos, cb) {
288 this.ping(conf, (err, online) => {
289 if (!err && online) {
290 log('Interactor online, restarting it...')
291 this.launchRPC(conf, _ => {
292 this.rpc.kill((ignoredErr) => {
293 this.daemonize(conf, infos, cb)
294 })
295 })
296 } else {
297 log('Interactor offline, launching it...')
298 this.daemonize(conf, infos, cb)
299 }
300 })
301 }
302
303 /**
304 * Restart the Interactor Daemon
305 * @param {Object} conf global constants
306 * @param {Function} cb invoked with <err, msg>
307 */
308 static update (conf, cb) {
309 this.ping(conf, (err, online) => {
310 if (err || !online) {
311 return cb ? cb(new Error('Interactor not launched')) : printError('Interactor not launched')
312 }
313 this.launchRPC(conf, _ => {
314 this.rpc.kill((err) => {
315 if (err) {
316 return cb ? cb(err) : printError(err)
317 }
318 printOut('Interactor successfully killed')
319 setTimeout(_ => {
320 this.launchAndInteract(conf, {}, _ => {
321 return cb(null, { msg: 'Daemon launched' })
322 })
323 }, 500)
324 })
325 })
326 })
327 }
328
329 /**
330 * Retrieve Interactor configuration from env, params and filesystem.
331 * @param {Object} cst global constants
332 * @param {Object} infos data used to start the interactor [optional]
333 * @param {String} infos.secret_key the secret key used to cipher data [optional]
334 * @param {String} infos.public_key the public key used identify the user [optional]
335 * @param {String} infos.machine_name override name of the machine [optional]
336 * @param {Function} cb invoked with <err, configuration>
337 */
338 static getOrSetConf (cst, infos, cb) {
339 infos = infos || {}
340 let configuration = {
341 version_management: {
342 active: true
343 }
344 }
345 let confFS = {}
346
347 // Try loading configuration file on FS
348 try {
349 let fileContent = fs.readFileSync(cst.INTERACTION_CONF).toString()
350 // Handle old configuration with json5
351 fileContent = fileContent.replace(/\s(\w+):/g, '"$1":')
352 // parse
353 confFS = JSON.parse(fileContent)
354
355 if (confFS.version_management) {
356 configuration.version_management.active = confFS.version_management.active
357 }
358 } catch (e) {
359 log('Interaction file does not exists')
360 }
361
362 // load the configration (first have priority)
363 // -> from env variable
364 // -> from params (eg. CLI)
365 // -> from configuration on FS
366 configuration.public_key = process.env.PM2_PUBLIC_KEY || process.env.KEYMETRICS_PUBLIC || infos.public_key || confFS.public_key
367 configuration.secret_key = process.env.PM2_SECRET_KEY || process.env.KEYMETRICS_SECRET || infos.secret_key || confFS.secret_key
368 configuration.machine_name = process.env.PM2_MACHINE_NAME || process.env.INSTANCE_NAME || infos.machine_name || confFS.machine_name || os.hostname()
369 configuration.pm2_version = process.env.PM2_VERSION || infos.pm2_version || confFS.pm2_version
370 configuration.reverse_interact = confFS.reverse_interact || true
371 // is setup empty ? use the one provided in env OR root OTHERWISE get the one on FS conf OR fallback on root
372 configuration.info_node = process.env.KEYMETRICS_NODE || infos.info_node || confFS.info_node || cst.KEYMETRICS_ROOT_URL
373 configuration.agent_transport_websocket = process.env.AGENT_TRANSPORT_WEBSOCKET || infos.agent_transport_websocket || confFS.agent_transport_websocket || 'false'
374 configuration.agent_transport_axon = process.env.AGENT_TRANSPORT_AXON || infos.agent_transport_axon || confFS.agent_transport_axon || 'true'
375
376 if (!configuration.secret_key) {
377 log('Secret key is not defined in configuration', configuration)
378 return cb(new Error('secret key is not defined'))
379 }
380 if (!configuration.public_key) {
381 log('Public key is not defined in configuration', configuration)
382 return cb(new Error('public key is not defined'))
383 }
384
385 // write configuration on FS
386 try {
387 fs.writeFileSync(cst.INTERACTION_CONF, JSON.stringify(configuration, null, 4))
388 } catch (e) {
389 console.error('Error when writting configuration file %s', cst.INTERACTION_CONF)
390 return cb(e)
391 }
392 if (configuration.info_node.indexOf('http') === -1) { // handle old file
393 configuration.info_node = `https://${configuration.info_node}`
394 }
395 return cb(null, configuration)
396 }
397
398 /**
399 * Disconnect the RPC client from Interactor Daemon
400 * @param {Function} cb invoked with <err, msg>
401 */
402 static disconnectRPC (cb) {
403 log('Disconnect RPC')
404 if (!this.client_sock || !this.client_sock.close) {
405 log('RPC not launched')
406 return cb(null, {
407 success: false,
408 msg: 'RPC connection to Interactor Daemon is not launched'
409 })
410 }
411
412 if (this.client_sock.closing === true) {
413 log('RPC already closed')
414 return cb(null, {
415 success: false,
416 msg: 'RPC closed'
417 })
418 }
419
420 try {
421 let timer
422
423 log('Closing RPC INTERACTOR')
424
425 this.client_sock.once('close', _ => {
426 log('RPC INTERACTOR cleanly closed')
427 clearTimeout(timer)
428 return cb ? cb(null, { success: true }) : false
429 })
430
431 timer = setTimeout(_ => {
432 if (this.client_sock.destroy) {
433 this.client_sock.destroy()
434 }
435 return cb ? cb(null, { success: true }) : false
436 }, 200)
437
438 this.client_sock.close()
439 } catch (err) {
440 log('Error while closing RPC INTERACTOR : %s', err.message || err)
441 return cb ? cb(err) : false
442 }
443 }
444
445 /**
446 * Start the Interactor Daemon
447 * @param {Object} cst global constants
448 * @param {Object} infos data used to start the interactor [can be recovered from FS]
449 * @param {String} infos.secret_key the secret key used to cipher data
450 * @param {String} infos.public_key the public key used identify the user
451 * @param {String} infos.machine_name [optional] override name of the machine
452 * @param {Function} cb invoked with <err, msg, process>
453 */
454 static launchAndInteract (cst, opts, cb) {
455 // For Watchdog
456 if (process.env.PM2_AGENT_ONLINE) {
457 return cb()
458 }
459
460 process.env.PM2_INTERACTOR_PROCESSING = true
461
462 this.getOrSetConf(Object.assign(cst, constants), opts, (err, conf) => {
463 if (err || !conf) return cb(err || new Error('Cant retrieve configuration'))
464
465 if (!process.env.PM2_SILENT) {
466 console.log(chalk.cyan('[PM2 I/O]') + ' Using: Public key: %s | Private key: %s | Machine name: %s', conf.public_key, conf.secret_key, conf.machine_name)
467 }
468 return this.launchOrAttach(cst, conf, cb)
469 })
470 }
471
472 /**
473 * Retrieve configuration used by the Interaction Daemon
474 * @param {Object} cst global constants
475 * @param {Function} cb invoked with <err, data>
476 */
477 static getInteractInfo (cst, cb) {
478 log('Getting interaction info')
479 if (process.env.PM2_NO_INTERACTION) return cb(new Error('PM2_NO_INTERACTION set'))
480
481 this.ping(cst, (err, online) => {
482 if (err || !online) return cb(new Error('Interactor is offline'))
483
484 this.launchRPC(cst, _ => {
485 this.rpc.getInfos((err, infos) => {
486 if (err) return cb(err)
487
488 // Avoid general CLI to interfere with Keymetrics CLI commands
489 if (process.env.PM2_INTERACTOR_PROCESSING) return cb(null, infos)
490
491 this.disconnectRPC(() => {
492 return cb(null, infos)
493 })
494 })
495 })
496 })
497 }
498}