UNPKG

6.77 kBJavaScriptView Raw
1var express = require('express');
2var bodyParser = require('body-parser');
3var util = require('util');
4var fs = require('fs');
5var Promise = require('bluebird');
6var spawn = require('child_process').spawn;
7var through = require('through2').obj;
8var stream = require('stream');
9var uuid = require('uuid');
10
11var cwd = process.cwd();
12
13module.exports = makeApp;
14module.exports.NotFound = NotFound;
15module.exports.UnprocessableEntity = UnprocessableEntity;
16
17
18function NotFound(message) {
19 Error.call(this);
20 this.message = message;
21 this.statusCode = 404;
22}
23util.inherits(NotFound, Error);
24
25function UnprocessableEntity(message) {
26 Error.call(this);
27 this.message = message;
28 this.statusCode = 422;
29}
30util.inherits(UnprocessableEntity, Error);
31
32function logWebError(req, err) {
33 console.error(req.originalUrl, err.stack || err);
34}
35
36function catchWebErrors(res) {
37 return function(err){
38 if (err.statusCode) {
39 res.sendStatus(err.statusCode);
40 if (err.statusCode >= 500) {
41 logWebError(req, err);
42 }
43 } else {
44 res.sendStatus(500);
45 logWebError(res, err);
46 }
47 };
48}
49
50function api(fn){
51 return function(req, res){
52 Promise.resolve()
53 .then(function(){
54 return fn(req);
55 })
56 .then(function(result){
57 res.json(result);
58 })
59 .catch(catchWebErrors(res));
60 };
61}
62
63function isString(thing) {
64 return typeof thing === 'string';
65}
66
67function idDir(id) {
68 return cwd+'/remote-task-'+id;
69}
70
71function makeApp() {
72 var tasks = {};
73 var stdin = {};
74
75 function getTasks() {
76 return tasks;
77 }
78
79 function getTask(req) {
80 if (typeof tasks[req.params.id] === 'undefined') {
81 throw new NotFound('task not found');
82 }
83
84 return tasks[req.params.id];
85 }
86
87 function createTask(req) {
88 if (typeof req.body !== 'object') {
89 throw new UnprocessableEntity('trying to create task without a body');
90 }
91
92 var commands = req.body.commands || [];
93 if (!Array.isArray(commands) || commands.filter(isString).length !== commands.length) {
94 throw new UnprocessableEntity('req.body.commands must be an array of strings');
95 }
96
97 var timeout = req.body.timeout;
98 if (typeof timeout !== 'undefined' && typeof timeout !== 'number') {
99 throw new UnprocessableEntity('req.body.timeout must be a number, or undefined');
100 }
101 timeout = timeout || 0;
102
103 var shell = spawn('bash');
104 var pid = shell.pid;
105 var id = uuid.v4();
106
107 fs.mkdir(idDir(id), function(err){
108 if (err) {
109 console.error('Error making directory', idDir(id), err);
110 }
111
112 shell.stdout.pipe(fs.createWriteStream(idDir(id)+'/stdout.log'));
113 shell.stderr.pipe(fs.createWriteStream(idDir(id)+'/stderr.log'));
114 });
115
116 stdin[id] = new stream.PassThrough();
117 stdin[id].pipe(shell.stdin);
118 stdin[id].pipe(fs.createWriteStream(idDir(id)+'/stdin.log'));
119
120 commands.forEach(function(cmd){
121 stdin[id].write(cmd+'\n');
122 });
123
124 if (req.body.end === true) {
125 stdin[id].end();
126 delete stdin[id];
127 }
128
129 var task = tasks[id] = {
130 id: id,
131 pid: pid,
132 startTime: Date.now(),
133 running: true,
134 writable: req.body.end !== true,
135 errors: []
136 };
137 if (timeout) {
138 task.timeout = timeout;
139 }
140
141 shell.on('error', function(err){
142 task.errors.push(err.stack || err);
143 });
144
145 var killTimeout;
146 if (timeout) {
147 killTimeout = setTimeout(function(){
148 stopTaskById(id).catch(function(err) {
149 console.error('failed to kill '+id+' after '+timeout+'ms timeout', err && err.stack ? err.stack : err);
150 });
151 }, timeout);
152 }
153
154 shell.on('exit', function(code, signal){
155 if (killTimeout) {
156 clearTimeout(killTimeout);
157 }
158 task.running = false;
159 task.code = code;
160 task.signal = signal;
161 });
162
163 return task;
164 }
165
166 function addCommands(req) {
167 if (typeof req.body !== 'object') {
168 throw new UnprocessableEntity('trying to create task without a body');
169 }
170
171 if (!/^[0-9a-z\-]{36}$/.test(req.params.id)) {
172 throw new UnprocessableEntity('req.body.id must be a uuid');
173 }
174 var id = req.params.id;
175
176 var commands = req.body.commands || [];
177 if (!Array.isArray(commands) || commands.filter(isString).length !== commands.length) {
178 throw new UnprocessableEntity('req.body.commands must be an array of strings');
179 }
180
181 var task = tasks[id];
182 if (typeof task === 'undefined') {
183 throw new NotFound('task not found');
184 }
185
186 if (!task.writable) {
187 throw new Error('trying to write to a closed stdin');
188 }
189
190 if (typeof stdin[id] === 'undefined' || !(stdin[id] instanceof stream.Stream)) {
191 throw new Error('invalid stdin stream');
192 }
193
194 commands.forEach(function(cmd){
195 stdin[id].write(cmd+'\n');
196 });
197
198 if (req.body.end === true) {
199 tasks[id].writable = false;
200 stdin[id].end();
201 delete stdin[id];
202 }
203
204 return task;
205 }
206
207 function stopTaskById(id) {
208 return new Promise(function(resolve, reject){
209 var timeouts = [];
210 var task = tasks[id];
211 if (!task) {
212 throw new Error('no such task '+id);
213 }
214
215 function forget(){
216 clearInterval(monitor);
217 timeouts.forEach(clearTimeout);
218 }
219
220 function check(){
221 try {
222 process.kill(task.pid, 0);
223 } catch (err) {
224 if (err.message !== 'kill ESRCH') {
225 throw err;
226 }
227 task.writable = false;
228 delete stdin[id];
229 forget();
230 resolve();
231 return true;
232 }
233 }
234
235 var monitor = setInterval(check, 20);
236
237 if (check()) return;
238 process.kill(task.pid, 'SIGHUP');
239 if (check()) return;
240
241 timeouts.push(setTimeout(function(){
242 process.kill(task.pid, 'SIGTERM');
243 check();
244 }, 5000));
245
246 timeouts.push(setTimeout(function(){
247 process.kill(task.pid, 'SIGKILL');
248 check();
249 }, 10000));
250
251 timeouts.push(setTimeout(function(){
252 forget();
253 reject(new Error('unable to kill process '+task.pid));
254 }, 15000));
255 });
256 }
257
258 function stopTask(req) {
259 if (!/^[0-9a-z\-]{36}$/.test(req.params.id)) {
260 throw new UnprocessableEntity('req.body.id must be a uuid');
261 }
262 var id = req.params.id;
263
264 return stopTaskById(id);
265 }
266
267 function stopTasks() {
268 var ids = Object.keys(tasks);
269
270 return Promise.all(ids.map(stopTaskById));
271 }
272
273 var app = express();
274 app.use(bodyParser.json());
275
276 app.get('/tasks', api(getTasks));
277 app.get('/tasks/:id', api(getTask));
278 app.post('/tasks', api(createTask));
279 app.post('/tasks/:id', api(addCommands));
280 app.delete('/tasks', api(stopTasks));
281 app.delete('/tasks/:id', api(stopTask));
282
283 return app;
284}