UNPKG

8.63 kBJavaScriptView Raw
1'use strict'
2
3const multiaddr = require('multiaddr')
4const fs = require('fs-extra')
5const merge = require('merge-options').bind({ ignoreUndefined: true })
6const debug = require('debug')
7const execa = require('execa')
8const nanoid = require('nanoid')
9const path = require('path')
10const os = require('os')
11const tempWrite = require('temp-write')
12const { checkForRunningApi, repoExists, tmpDir, defaultRepo } = require('./utils')
13
14const daemonLog = {
15 info: debug('ipfsd-ctl:daemon:stdout'),
16 err: debug('ipfsd-ctl:daemon:stderr')
17}
18
19function translateError (err) {
20 // get the actual error message to be the err.message
21 err.message = `${err.stdout} \n\n ${err.stderr} \n\n ${err.message} \n\n`
22
23 return err
24}
25
26/** @typedef {import("./index").ControllerOptions} ControllerOptions */
27
28/**
29 * Controller for daemon nodes
30 * @class
31 *
32 */
33class Daemon {
34 /**
35 * @constructor
36 * @param {ControllerOptions} [opts]
37 */
38 constructor (opts) {
39 /** @type ControllerOptions */
40 this.opts = opts
41 this.path = this.opts.ipfsOptions.repo || (opts.disposable ? tmpDir(opts.type) : defaultRepo(opts.type))
42 this.exec = this.opts.ipfsBin
43 this.env = merge({ IPFS_PATH: this.path }, this.opts.env)
44 this.disposable = this.opts.disposable
45 this.subprocess = null
46 this.initialized = false
47 this.started = false
48 this.clean = true
49 this.apiAddr = null
50 this.gatewayAddr = null
51 this.api = null
52 }
53
54 /**
55 * @private
56 * @param {string} addr
57 */
58 _setApi (addr) {
59 this.apiAddr = multiaddr(addr)
60 this.api = this.opts.ipfsHttpModule(addr)
61 this.api.apiHost = this.apiAddr.nodeAddress().address
62 this.api.apiPort = this.apiAddr.nodeAddress().port
63 }
64
65 /**
66 * @private
67 * @param {string} addr
68 */
69 _setGateway (addr) {
70 this.gatewayAddr = multiaddr(addr)
71 this.api.gatewayHost = this.gatewayAddr.nodeAddress().address
72 this.api.gatewayPort = this.gatewayAddr.nodeAddress().port
73 }
74
75 /**
76 * Initialize a repo.
77 *
78 * @param {Object} [initOptions={}] - @see https://github.com/ipfs/js-ipfs/blob/master/README.md#optionsinit
79 * @returns {Promise<Daemon>}
80 */
81 async init (initOptions) {
82 this.initialized = await repoExists(this.path)
83 if (this.initialized) {
84 this.clean = false
85 return this
86 }
87
88 const opts = merge(
89 {
90 emptyRepo: false,
91 bits: this.opts.test ? 1024 : 2048,
92 profiles: this.opts.test ? ['test'] : []
93 },
94 typeof this.opts.ipfsOptions.init === 'boolean' ? {} : this.opts.ipfsOptions.init,
95 typeof initOptions === 'boolean' ? {} : initOptions
96 )
97
98 const args = ['init']
99
100 // default-config only for JS
101 if (this.opts.ipfsOptions.config && this.opts.type === 'js') {
102 args.push(tempWrite.sync(JSON.stringify(this.opts.ipfsOptions.config)))
103 }
104
105 // Translate ipfs options to cli args
106 if (opts.bits) {
107 args.push('--bits', opts.bits)
108 }
109 if (opts.pass && this.opts.type === 'js') {
110 args.push('--pass', '"' + opts.pass + '"')
111 }
112 if (opts.emptyRepo) {
113 args.push('--empty-repo')
114 }
115 if (Array.isArray(opts.profiles) && opts.profiles.length) {
116 args.push('--profile', opts.profiles.join(','))
117 }
118
119 const { stdout, stderr } = await execa(this.exec, args, {
120 env: this.env
121 })
122 .catch(translateError)
123
124 daemonLog.info(stdout)
125 daemonLog.err(stderr)
126
127 // default-config only for Go
128 if (this.opts.type === 'go') {
129 await this._replaceConfig(merge(
130 await this._getConfig(),
131 this.opts.ipfsOptions.config
132 ))
133 }
134
135 this.clean = false
136 this.initialized = true
137
138 return this
139 }
140
141 /**
142 * Delete the repo that was being used. If the node was marked as disposable this will be called automatically when the process is exited.
143 *
144 * @returns {Promise<Daemon>}
145 */
146 async cleanup () {
147 if (!this.clean) {
148 await fs.remove(this.path)
149 this.clean = true
150 }
151 return this
152 }
153
154 /**
155 * Start the daemon.
156 *
157 * @return {Promise<Daemon>}
158 */
159 async start () {
160 const args = ['daemon']
161 const opts = this.opts.ipfsOptions
162 // add custom args
163 args.push(...this.opts.args)
164
165 if (opts.pass && this.opts.type === 'js') {
166 args.push('--pass', '"' + opts.pass + '"')
167 }
168 if (opts.offline) {
169 args.push('--offline')
170 }
171 if (opts.preload && this.opts.type === 'js') {
172 args.push('--enable-preload', Boolean(opts.preload.enabled))
173 }
174 if (opts.EXPERIMENTAL && opts.EXPERIMENTAL.sharding) {
175 args.push('--enable-sharding-experiment')
176 }
177 if (opts.EXPERIMENTAL && opts.EXPERIMENTAL.ipnsPubsub) {
178 args.push('--enable-namesys-pubsub')
179 }
180
181 // Check if a daemon is already running
182 const api = checkForRunningApi(this.path)
183 if (api) {
184 this._setApi(api)
185 } else {
186 let output = ''
187 const ready = new Promise((resolve, reject) => {
188 this.subprocess = execa(this.exec, args, {
189 env: this.env
190 })
191 this.subprocess.stderr.on('data', data => daemonLog.err(data.toString()))
192 this.subprocess.stdout.on('data', data => daemonLog.info(data.toString()))
193
194 const readyHandler = data => {
195 output += data.toString()
196 const apiMatch = output.trim().match(/API .*listening on:? (.*)/)
197 const gwMatch = output.trim().match(/Gateway .*listening on:? (.*)/)
198
199 if (apiMatch && apiMatch.length > 0) {
200 this._setApi(apiMatch[1])
201 }
202
203 if (gwMatch && gwMatch.length > 0) {
204 this._setGateway(gwMatch[1])
205 }
206
207 if (output.match(/(?:daemon is running|Daemon is ready)/)) {
208 // we're good
209 this.started = true
210 this.subprocess.stdout.off('data', readyHandler)
211 resolve(this.api)
212 }
213 }
214 this.subprocess.stdout.on('data', readyHandler)
215 this.subprocess.catch(err => reject(translateError(err)))
216 })
217 await ready
218 }
219
220 this.started = true
221 // Add `peerId`
222 const id = await this.api.id()
223 this.api.peerId = id
224
225 return this
226 }
227
228 /**
229 * Stop the daemon.
230 *
231 * @return {Promise<Daemon>}
232 */
233 async stop () {
234 if (!this.started) {
235 return this
236 }
237
238 let killTimeout
239 let killed = false
240 if (this.opts.forceKill !== false) {
241 killTimeout = setTimeout(() => {
242 // eslint-disable-next-line no-console
243 console.error(new Error(`Timeout stopping ${this.opts.type} node. Process ${this.subprocess.pid} will be force killed now.`))
244 killed = true
245
246 this.subprocess.kill('SIGKILL')
247 }, this.opts.forceKillTimeout)
248 }
249
250 try {
251 await this.api.stop()
252 } catch (err) {
253 if (!killed) {
254 throw err // if was killed then ignore error
255 }
256
257 daemonLog.info('Daemon was force killed')
258 }
259
260 clearTimeout(killTimeout)
261 this.subprocess.stderr.removeAllListeners()
262 this.subprocess.stdout.removeAllListeners()
263 this.started = false
264
265 if (this.disposable) {
266 await this.cleanup()
267 }
268 return this
269 }
270
271 /**
272 * Get the pid of the `ipfs daemon` process.
273 *
274 * @returns {Promise<Number>}
275 */
276 pid () {
277 if (this.subprocess) {
278 return Promise.resolve(this.subprocess.pid)
279 }
280 throw new Error('Daemon process is not running.')
281 }
282
283 /**
284 * Call `ipfs config`
285 *
286 * If no `key` is passed, the whole config is returned as an object.
287 *
288 * @private
289 * @param {string} [key] - A specific config to retrieve.
290 * @returns {Promise<Object|String>}
291 */
292 async _getConfig (key = 'show') {
293 const {
294 stdout
295 } = await execa(
296 this.exec,
297 ['config', key],
298 {
299 env: this.env
300 })
301 .catch(translateError)
302
303 if (key === 'show') {
304 return JSON.parse(stdout)
305 }
306
307 return stdout.trim()
308 }
309
310 /**
311 * Replace the current config with the provided one
312 *
313 * @private
314 * @param {object} config
315 * @returns {Promise<Daemon>}
316 */
317 async _replaceConfig (config) {
318 const tmpFile = path.join(os.tmpdir(), nanoid())
319
320 await fs.writeFile(tmpFile, JSON.stringify(config))
321 await execa(
322 this.exec,
323 ['config', 'replace', `${tmpFile}`],
324 { env: this.env }
325 )
326 .catch(translateError)
327 await fs.unlink(tmpFile)
328
329 return this
330 }
331
332 /**
333 * Get the version of ipfs
334 *
335 * @returns {Promise<String>}
336 */
337 async version () {
338 const {
339 stdout
340 } = await execa(this.exec, ['version'], {
341 env: this.env
342 })
343 .catch(translateError)
344
345 return stdout.trim()
346 }
347}
348
349module.exports = Daemon