UNPKG

5.78 kBJavaScriptView Raw
1var express = require('express');
2var bodyParser = require('body-parser');
3var util = require('util');
4var fs = require('fs');
5var spawn = require('child_process').spawn;
6var through = require('through2').obj;
7var Stream = require('stream').Stream;
8var cwd = process.cwd();
9
10module.exports = makeApp;
11module.exports.NotFound = NotFound;
12module.exports.UnprocessableEntity = UnprocessableEntity;
13
14
15function NotFound(message) {
16 Error.call(this);
17 this.message = message;
18 this.statusCode = 404;
19}
20util.inherits(NotFound, Error);
21
22function UnprocessableEntity(message) {
23 Error.call(this);
24 this.message = message;
25 this.statusCode = 422;
26}
27util.inherits(UnprocessableEntity, Error);
28
29function logWebError(req, err) {
30 console.error(req.originalUrl, err.stack || err);
31}
32
33function catchWebErrors(res) {
34 return function(err){
35 if (err.statusCode) {
36 res.sendStatus(err.statusCode);
37 if (err.statusCode >= 500) {
38 logWebError(req, err);
39 }
40 } else {
41 res.sendStatus(500);
42 logWebError(res, err);
43 }
44 };
45}
46
47function api(fn){
48 return function(req, res){
49 Promise.resolve()
50 .then(function(){
51 return fn(req);
52 })
53 .then(function(result){
54 res.json(result);
55 })
56 .catch(catchWebErrors(res));
57 };
58}
59
60function isString(thing) {
61 return typeof thing === 'string';
62}
63
64function pidDir(pid) {
65 return cwd+'/'+pid;
66}
67
68function makeApp() {
69 var tasks = {};
70 var stdin = {};
71
72 function getTasks() {
73 return tasks;
74 }
75
76 function getTask(req) {
77 if (typeof tasks[req.params.pid] === 'undefined') {
78 throw new NotFound('task not found');
79 }
80
81 return tasks[req.params.pid];
82 }
83
84 function createTask(req) {
85 if (typeof req.body !== 'object') {
86 throw new UnprocessableEntity('trying to create task without a body');
87 }
88
89 var commands = req.body.commands || [];
90 if (!Array.isArray(commands) || commands.filter(isString).length !== commands.length) {
91 throw new UnprocessableEntity('req.body.commands must be an array of strings');
92 }
93
94 var shell = spawn('bash');
95 var pid = shell.pid;
96
97 fs.mkdir(pidDir(pid), function(err){
98 if (err) {
99 console.error('Error making directory', pidDir(pid), err);
100 }
101
102 shell.stdout.pipe(fs.createWriteStream(pidDir(pid)+'/stdout.log'));
103 shell.stderr.pipe(fs.createWriteStream(pidDir(pid)+'/stderr.log'));
104 });
105
106 stdin[pid] = shell.stdin;
107
108 commands.forEach(function(cmd){
109 stdin[pid].write(cmd+'\n');
110 });
111
112 if (req.body.end === true) {
113 stdin[pid].end();
114 delete stdin[pid];
115 }
116
117 var task = tasks[pid] = {
118 pid: pid,
119 startTime: Date.now(),
120 running: true,
121 writable: req.body.end !== true,
122 errors: []
123 };
124
125 shell.on('error', function(err){
126 task.errors.push(err.stack || err);
127 });
128
129 shell.on('exit', function(code, signal){
130 task.running = false;
131 task.code = code;
132 task.signal = signal;
133 });
134
135 return task;
136 }
137
138 function addCommands(req) {
139 if (typeof req.body !== 'object') {
140 throw new UnprocessableEntity('trying to create task without a body');
141 }
142
143 if (!/^[0-9]+$/.test(req.params.pid)) {
144 throw new UnprocessableEntity('req.body.pid must be a number');
145 }
146 var pid = parseInt(req.params.pid);
147
148 var commands = req.body.commands || [];
149 if (!Array.isArray(commands) || commands.filter(isString).length !== commands.length) {
150 throw new UnprocessableEntity('req.body.commands must be an array of strings');
151 }
152
153 var task = tasks[pid];
154 if (typeof task === 'undefined') {
155 throw new NotFound('task not found');
156 }
157
158 if (!task.writable) {
159 throw new Error('trying to write to a closed stdin');
160 }
161
162 if (typeof stdin[pid] === 'undefined' || !(stdin[pid] instanceof Stream)) {
163 throw new Error('invalid stdin stream');
164 }
165
166 commands.forEach(function(cmd){
167 stdin[pid].write(cmd+'\n');
168 });
169
170 if (req.body.end === true) {
171 tasks[pid].writable = false;
172 stdin[pid].end();
173 delete stdin[pid];
174 }
175
176 return task;
177 }
178
179 function stopTaskByPid(pid) {
180 return new Promise(function(resolve, reject){
181 var timeouts = [];
182
183 function forget(){
184 clearInterval(monitor);
185 timeouts.forEach(clearTimeout);
186 }
187
188 function check(){
189 try {
190 process.kill(pid, 0);
191 } catch (err) {
192 if (err.message !== 'kill ESRCH') {
193 throw err;
194 }
195 delete tasks[pid];
196 delete stdin[pid];
197 forget();
198 resolve();
199 }
200 }
201
202 var monitor = setInterval(check, 20);
203
204 check();
205 process.kill(pid, 'SIGHUP');
206 check();
207
208 timeouts.push(setTimeout(function(){
209 process.kill(pid, 'SIGTERM');
210 check();
211 }, 5000));
212
213 timeouts.push(setTimeout(function(){
214 process.kill(pid, 'SIGKILL');
215 check();
216 }, 10000));
217
218 timeouts.push(setTimeout(function(){
219 forget();
220 reject(new Error('unable to kill process '+pid));
221 }, 15000));
222 });
223 }
224
225 function stopTask(req) {
226 if (!/^[0-9]+$/.test(req.params.pid)) {
227 throw new UnprocessableEntity('req.body.pid must be a number');
228 }
229 var pid = parseInt(req.params.pid);
230
231 return stopTaskByPid(pid);
232 }
233
234 function stopTasks() {
235 var pids = Object.keys(tasks);
236
237 return Promise.all(pids.map(stopTaskByPid));
238 }
239
240 var app = express();
241 app.use(bodyParser.json());
242
243 app.get('/tasks', api(getTasks));
244 app.get('/tasks/:pid', api(getTask));
245 app.post('/tasks', api(createTask));
246 app.post('/tasks/:pid', api(addCommands));
247 app.delete('/tasks', api(stopTasks));
248 app.delete('/tasks/:pid', api(stopTask));
249
250 return app;
251}