UNPKG

7.8 kBtext/coffeescriptView Raw
1# this is a work in progress.
2# it's doing proper cleanup and management of child processes
3# use it with care, not all edge cases are covered.
4#
5# author @devrim
6
7{argv} = require 'optimist'
8traverse = require 'traverse'
9_ = require 'underscore'
10fs = require "fs"
11os = require "os"
12hat = require 'hat'
13{exec,spawn,fork} = require "child_process"
14{EventEmitter} = require "events"
15
16
17################################
18### HANDLE SIGNALS and KILLS ###
19################################
20
21# guaranteed death.
22process.on 'SIGTERM', =>
23 console.log "going down"
24 process.exit(0)
25
26class Processes extends EventEmitter
27
28 constructor:(options={})->
29 {main, processContainerDir} = options
30 @list = {}
31 @child = null
32 @workerProcess = {}
33 @childProcessesContainerDir = processContainerDir or "/tmp/kd/"
34
35 try
36 fs.mkdirSync @childProcessesContainerDir
37 catch e
38 # do nothing, probably dir is already there.
39
40 if main
41 previouschildProcessesContainerDir = @childProcessesContainerDir+"."+Date.now()
42 fs.renameSync @childProcessesContainerDir, previouschildProcessesContainerDir
43 fs.mkdirSync @childProcessesContainerDir
44 exec "rm -rf #{previouschildProcessesContainerDir}",(err,stdout,stderr)->
45 if err
46 console.log "couldn't delete previous process tree at #{previouschildProcessesContainerDir}"
47
48 fork: (options,callback) ->
49 options.fork = yes
50 @run options,callback, (cb) =>
51 callback cb
52
53 spawn: (options,callback)->
54 options.spawn = yes
55 @run options,callback
56
57 exec: (cmd,callback)->
58 @spawn
59 name : Date.now()
60 cmd : cmd
61 restart : no
62 stdout : process.stdout
63 stderr : process.stderr
64 onExit : callback
65
66 run: (options,callback) ->
67 {name, restartTimeout, verbose} = options
68
69 options.name ?= Date.now()
70 options.restartTimeout ?= 1000
71 options.verbose ?= no
72 @child = @startProcess options, callback
73 child = @populateChildObject options, @child
74 callback false, child
75
76 getDate: ->
77 d = new Date()
78 pad = (n) ->
79 (if n < 10 then "0" + n else n)
80 d.getUTCFullYear() + "-" + pad(d.getUTCMonth() + 1) + "-" + pad(d.getUTCDate()) + "T" + pad(d.getUTCHours()) + ":" + pad(d.getUTCMinutes()) + ":" + pad(d.getUTCSeconds()) + "Z"
81
82 startProcess: (options, callback) ->
83 {name, args, opts, cmd, stdout, stderr, verbose} = options
84
85 if "string" is typeof options.cmd
86 cmdA = cmd.split(" ")
87 else if Array.isArray cmd
88 cmdA = cmd
89 else
90 console.log e = "[PROCESSES][ERROR][#{name}] Can't start without a valid cmd param." if verbose
91 return callback e
92
93 if options.fork
94 @child = fork cmdA[0],cmdA[1...],opts
95 else
96 # @child = spawn cmdA[0],cmdA[1...]
97 console.log ">>>", cmd, args
98 @child = spawn cmd, args # cmdA is split wrongly when quotation marks are involved
99 @child.stdout.pipe stdout if stdout
100 @child.stderr.pipe stderr if stderr
101
102 return @child
103
104
105 populateChildObject: (options, @child) ->
106 {name,onMessage,modulePath,args,restart,restartTimeout,cmd,log,onExit,stdout,stderr,verbose,die} = options
107 onExit ?= ->
108 console.log "[PROCESSES][#{name}] started the process: #{name} with pid: #{@child.pid}" if verbose
109 @createPidFile @child.pid,name
110
111 childObj =
112 name : name
113 process : @child
114 pid : @child.pid
115 verbose : verbose
116 startedAt: Date.now()
117
118 if cmd
119 childObj.cmd = cmd
120 else if modulePath
121 childObj.fork = {modulePath,args,options}
122
123 if @list[name]
124 d = Date.now()
125 console.log "[WARN][Processes] process name '#{name}' not unique. to be able to kill it properly it is now: #{name}-#{d}" if verbose
126 name += "-"+d
127
128 @list[name] = childObj
129
130 if die?.after
131 console.log "[PROCESSES][#{name}]i will kill this process:#{name} every #{die.after}msecs" if verbose
132 setTimeout =>
133 console.log "[PROCESSES][#{name}]it's time you die process:#{name}" if verbose
134 @kill name
135 ,die.after
136
137 @child.stdout.on 'data', (data)->
138 process.stdout.write("[PROCESSES][#{name}]" + data)
139
140 @child.stderr.on 'data', (data)->
141 console.log("[PROCESSES][ERROR][#{name}]" + data)
142
143 @child.on 'exit', ()=>
144 console.log "[PROCESSES][#{name}] did exit." if verbose
145 @emit "exit",@child.pid,name
146 delete @list[name]
147 if options.kontrol?.enabled
148 @workerProcess[name].stopReporter()
149 @workerProcess[name].connection.end()
150 delete @workerProcess[name]
151
152 @child.on 'close', () =>
153 console.log "[PROCESSES][#{name}] did close." if verbose
154 @emit "close",@child.pid,name
155 onExit @child.pid,name
156 if restart is yes
157 restartTimeout = 1 unless restartTimeout
158 setTimeout =>
159 @run options
160 ,restartTimeout
161 console.log "[PROCESSES][#{name}] restarting the process in #{restartTimeout} msecs." if verbose
162
163 @child.on 'message',(msg) =>
164 onMessage msg if "function" is typeof onMessage
165 if msg['process-monitor']
166 delete msg['process-monitor']
167 if options.kontrol?.enabled
168 msg['uuid'] = @workerProcess[name].worker.uuid
169 @workerProcess[name].fetchWorkerExchange (workerExchange) =>
170 @workerProcess[name].worker.message.command = "ack"
171 workerExchange.publish "input.worker", {monitor: msg},
172 appId: "#{@workerProcess[name].worker.uuid}"
173 return childObj
174
175 createPidFile : (pid,name)->
176 console.log "creating pid file"
177 fs.writeFileSync "#{@childProcessesContainerDir}/#{process.pid}.#{pid}",""
178 console.log "pid file created"
179
180 get : (name)->
181 if @list[name]
182 return @list[name].process
183 else
184 console.log "[ERROR][Processes] #{name} Can't be found."
185
186 fetchChildren: (parentPid,callback)->
187 findChildren = (arr, pid) ->
188 findChildrenRec = (pid,root=yes)->
189 c={}
190 pid = ""+pid
191 i = -1
192 arr.forEach (item)->
193 [parent,child] = item.split(".")
194 if parent is pid
195 c[child] = findChildrenRec child,no
196 return c
197
198 arr = traverse(findChildrenRec pid).paths()
199 arr.forEach (group) -> group.reverse()
200 arr = _.sortBy arr,(a)-> a.length*-1
201 return _.uniq _.flatten arr,yes
202
203 fs.readdir @childProcessesContainerDir,(err,list)->
204 if err
205 callback err
206 else
207 callback null,findChildren list,parentPid
208
209 killAllChildren: (parentPid, callback) ->
210 @fetchChildren parentPid,(err,orderedKillList)->
211 console.log "ordered kill list:"+orderedKillList
212 unless err
213 pidPair = []
214 orderedKillList.forEach (pid)->
215 console.log "killing children: #{pid}"
216 try
217 pidPair.push pid
218 process.kill(pid*1)
219 if pidPair.length is 2
220 pidPair.reverse()
221 deleteFile = pidPair.join(".")
222 console.log "deleting"+deleteFile
223 fs.unlinkSync deleteFile
224 pidPair.shift()
225 catch e
226 console.log "child process #{pid} did not exist. not killed by me."
227 callback null
228
229 sendMessage: (name,msg)->
230 if @list[name]?.process?.send
231 @list[name].process.send msg
232
233 kill: (name,callback=->)->
234 unless @list[name]
235 console.log "[PROCESSES][#{name}] cant kill #{name}, couldn't find it."
236 else
237 pid = @list[name].pid
238 console.log "[PROCESSES][#{name}] killing parent: #{pid}"
239 @killAllChildren pid,(err,res)->
240 console.log "[PROCESSES][#{name}] killing #{name} #{pid}"
241 process.kill pid
242
243module.exports = Processes
\No newline at end of file