UNPKG

6.82 kBJavaScriptView Raw
1var express = require('express');
2var bodyParser = require('body-parser');
3var util = require('util');
4var fs = require('fs');
5var Promise = require('bluebird');
6var spawn = require('child_process').spawn;
7var through = require('through2').obj;
8var stream = require('stream');
9var uuid = require('uuid');
10
11var cwd = process.cwd();
12
13module.exports = makeApp;
14module.exports.NotFound = NotFound;
15module.exports.UnprocessableEntity = UnprocessableEntity;
16
17
18function NotFound(message) {
19 Error.call(this);
20 this.message = message;
21 this.statusCode = 404;
22}
23util.inherits(NotFound, Error);
24
25function UnprocessableEntity(message) {
26 Error.call(this);
27 this.message = message;
28 this.statusCode = 422;
29}
30util.inherits(UnprocessableEntity, Error);
31
32function logWebError(req, err) {
33 console.error(req.originalUrl, err.stack || err);
34}
35
36function catchWebErrors(res) {
37 return function(err){
38 if (err.statusCode) {
39 res.sendStatus(err.statusCode);
40 if (err.statusCode >= 500) {
41 logWebError(req, err);
42 }
43 } else {
44 res.sendStatus(500);
45 logWebError(res, err);
46 }
47 };
48}
49
50function api(fn){
51 return function(req, res){
52 Promise.resolve()
53 .then(function(){
54 return fn(req);
55 })
56 .then(function(result){
57 res.json(result);
58 })
59 .catch(catchWebErrors(res));
60 };
61}
62
63function isString(thing) {
64 return typeof thing === 'string';
65}
66
67function idDir(id) {
68 return cwd+'/remote-task-'+id;
69}
70
71function makeApp() {
72 var tasks = {};
73 var stdin = {};
74
75 function getTasks() {
76 return tasks;
77 }
78
79 function getTask(req) {
80 if (typeof tasks[req.params.id] === 'undefined') {
81 throw new NotFound('task not found');
82 }
83
84 return tasks[req.params.id];
85 }
86
87 function createTask(req) {
88 if (typeof req.body !== 'object') {
89 throw new UnprocessableEntity('trying to create task without a body');
90 }
91
92 var commands = req.body.commands || [];
93 if (!Array.isArray(commands) || commands.filter(isString).length !== commands.length) {
94 throw new UnprocessableEntity('req.body.commands must be an array of strings');
95 }
96
97 var timeout = req.body.timeout;
98 if (typeof timeout !== 'undefined' && typeof timeout !== 'number') {
99 throw new UnprocessableEntity('req.body.timeout must be a number, or undefined');
100 }
101 timeout = timeout || 0;
102
103 var shell = spawn('bash');
104 var pid = shell.pid;
105 var id = uuid.v4();
106
107 stdin[id] = new stream.PassThrough();
108
109 fs.mkdir(idDir(id), function(err){
110 if (err) {
111 console.error('Error making directory', idDir(id), err);
112 }
113
114 shell.stdout.pipe(fs.createWriteStream(idDir(id)+'/stdout.log'));
115 shell.stderr.pipe(fs.createWriteStream(idDir(id)+'/stderr.log'));
116
117 stdin[id].pipe(shell.stdin);
118 stdin[id].pipe(fs.createWriteStream(idDir(id)+'/stdin.log'));
119 stdin[id].on('end', function(){
120 delete stdin[id];
121 })
122 });
123
124 commands.forEach(function(cmd){
125 stdin[id].write(cmd+'\n');
126 });
127
128 if (req.body.end === true) {
129 stdin[id].end();
130 }
131
132 var task = tasks[id] = {
133 id: id,
134 pid: pid,
135 startTime: Date.now(),
136 running: true,
137 writable: req.body.end !== true,
138 errors: []
139 };
140 if (timeout) {
141 task.timeout = timeout;
142 }
143
144 shell.on('error', function(err){
145 task.errors.push(err.stack || err);
146 });
147
148 var killTimeout;
149 if (timeout) {
150 killTimeout = setTimeout(function(){
151 stopTaskById(id).catch(function(err) {
152 console.error('failed to kill '+id+' after '+timeout+'ms timeout', err && err.stack ? err.stack : err);
153 });
154 }, timeout);
155 }
156
157 shell.on('exit', function(code, signal){
158 if (killTimeout) {
159 clearTimeout(killTimeout);
160 }
161 task.running = false;
162 task.code = code;
163 task.signal = signal;
164 });
165
166 return task;
167 }
168
169 function addCommands(req) {
170 if (typeof req.body !== 'object') {
171 throw new UnprocessableEntity('trying to create task without a body');
172 }
173
174 if (!/^[0-9a-z\-]{36}$/.test(req.params.id)) {
175 throw new UnprocessableEntity('req.body.id must be a uuid');
176 }
177 var id = req.params.id;
178
179 var commands = req.body.commands || [];
180 if (!Array.isArray(commands) || commands.filter(isString).length !== commands.length) {
181 throw new UnprocessableEntity('req.body.commands must be an array of strings');
182 }
183
184 var task = tasks[id];
185 if (typeof task === 'undefined') {
186 throw new NotFound('task not found');
187 }
188
189 if (!task.writable) {
190 throw new Error('trying to write to a closed stdin');
191 }
192
193 if (typeof stdin[id] === 'undefined' || !(stdin[id] instanceof stream.Stream)) {
194 throw new Error('invalid stdin stream');
195 }
196
197 commands.forEach(function(cmd){
198 stdin[id].write(cmd+'\n');
199 });
200
201 if (req.body.end === true) {
202 tasks[id].writable = false;
203 stdin[id].end();
204 delete stdin[id];
205 }
206
207 return task;
208 }
209
210 function stopTaskById(id) {
211 return new Promise(function(resolve, reject){
212 var timeouts = [];
213 var task = tasks[id];
214 if (!task) {
215 throw new Error('no such task '+id);
216 }
217
218 function forget(){
219 clearInterval(monitor);
220 timeouts.forEach(clearTimeout);
221 }
222
223 function check(){
224 try {
225 process.kill(task.pid, 0);
226 } catch (err) {
227 if (err.message !== 'kill ESRCH') {
228 throw err;
229 }
230 task.writable = false;
231 delete stdin[id];
232 forget();
233 resolve();
234 return true;
235 }
236 }
237
238 var monitor = setInterval(check, 20);
239
240 if (check()) return;
241 process.kill(task.pid, 'SIGHUP');
242 if (check()) return;
243
244 timeouts.push(setTimeout(function(){
245 process.kill(task.pid, 'SIGTERM');
246 check();
247 }, 5000));
248
249 timeouts.push(setTimeout(function(){
250 process.kill(task.pid, 'SIGKILL');
251 check();
252 }, 10000));
253
254 timeouts.push(setTimeout(function(){
255 forget();
256 reject(new Error('unable to kill process '+task.pid));
257 }, 15000));
258 });
259 }
260
261 function stopTask(req) {
262 if (!/^[0-9a-z\-]{36}$/.test(req.params.id)) {
263 throw new UnprocessableEntity('req.body.id must be a uuid');
264 }
265 var id = req.params.id;
266
267 return stopTaskById(id);
268 }
269
270 function stopTasks() {
271 var ids = Object.keys(tasks);
272
273 return Promise.all(ids.map(stopTaskById));
274 }
275
276 var app = express();
277 app.use(bodyParser.json());
278
279 app.get('/tasks', api(getTasks));
280 app.get('/tasks/:id', api(getTask));
281 app.post('/tasks', api(createTask));
282 app.post('/tasks/:id', api(addCommands));
283 app.delete('/tasks', api(stopTasks));
284 app.delete('/tasks/:id', api(stopTask));
285
286 return app;
287}