UNPKG

15.5 kBPlain TextView Raw
1import { Request as BaseRequest, WorkerRouter as Router, Callback, CallbackResponse, workerErrorHandler, workerHandler, workerRequestHandler } from './router';
2import * as akala from '@akala/core';
3import * as express from 'express';
4import * as stream from 'stream';
5export { Router, Callback, workerHandler as RouterHandler, workerErrorHandler as ErrorHandler, workerRequestHandler as RequestHandler };
6import * as jsonrpc from '@akala/json-rpc-ws'
7import * as send from 'send';
8import * as onFinished from 'on-finished';
9import { PayloadDataType } from '@akala/json-rpc-ws';
10const log = akala.log('akala:worker');
11
12export { CallbackResponse }
13
14export function createClient<TConnection extends jsonrpc.Connection>(namespace: string): PromiseLike<jsonrpc.Client<TConnection>>
15{
16 return akala.resolve('$agent.' + namespace);
17}
18
19export type MasterRegistration = (from?: string, masterPath?: string, workerPath?: string) => void;
20export type IsModule = (moduleName: string) => boolean;
21
22export interface resolve
23{
24 (param: '$http'): akala.Http
25 (param: '$request'): Request
26 (param: '$callback'): Callback
27 (param: '$router'): Router
28 (param: '$io'): <TConnection extends jsonrpc.Connection>(namespace: string) => PromiseLike<jsonrpc.Client<TConnection>>
29 (param: '$bus'): jsonrpc.Client<jsonrpc.Connection>
30 (param: '$master'): MasterRegistration
31 (param: '$isModule'): IsModule
32 (param: string): any
33}
34
35export interface WorkerInjector extends akala.Injector
36{
37 resolve: resolve;
38}
39const masterPrefixes = ['header', 'route', 'query', 'body'];
40
41export class WorkerInjectorImpl extends akala.Injector
42{
43 constructor(private request: BaseRequest & { body?: any })
44 {
45 super();
46 }
47
48 public resolve<T = any>(name: string): T
49 {
50 let indexOfDot = name.indexOf('.');
51
52 if (~indexOfDot)
53 {
54 let master = name.substr(0, indexOfDot);
55 if (~masterPrefixes.indexOf(master))
56 {
57 log(`resolving ${name}`)
58 switch (master)
59 {
60 case 'header':
61 return this.request.headers[name.substr(indexOfDot + 1)] as any;
62 case 'query':
63 return this.request.query && this.request.query[name.substr(indexOfDot + 1)] as any;
64 case 'route':
65 log(this.request.params)
66 return this.request.params && this.request.params[name.substr(indexOfDot + 1)] as any;
67 case 'body':
68 return this.request.body && this.request.body[name.substr(indexOfDot + 1)];
69 }
70 }
71 }
72 return super.resolve<T>(name);
73 }
74}
75
76export interface Request extends BaseRequest
77{
78 injector?: WorkerInjector;
79 [key: string]: any;
80 body: any;
81}
82
83export interface SendFileOptions extends send.SendOptions
84{
85 headers?: { [key: string]: string }
86}
87
88export function expressWrap(handler: express.Handler)
89{
90 return function (req: Request, next: akala.NextFunction)
91 {
92 var callback = req.injector.resolve('$callback');
93 var headers: any = {};
94 var response = buildResponse(req, callback, next);
95 handler(<any>req, response, next);
96 }
97}
98
99function buildResponse(req: Request, callback: Callback, next: akala.NextFunction): express.Response
100{
101 return <any>new MyResponse(req, callback, next);
102}
103
104class MyResponse extends stream.Transform implements CallbackResponse
105{
106 constructor(private req: Request, callback: Callback, next: akala.NextFunction)
107 {
108 super({
109 transform: (chunk, _encoding, callback) =>
110 {
111 callback(null, chunk);
112 }, decodeStrings: true
113 });
114 this.headers = {};
115 this.sendStatus = callback
116 this.status = callback
117 this.send = callback
118 this.json = callback
119 this.on('pipe', () =>
120 {
121 this.isStream = true;
122 });
123 this.on('end', () => { this.send(this); });
124 }
125
126 public isStream = false;
127 data: any;
128 headers = {};
129 sendStatus: Callback
130 status: Callback
131 links = undefined
132 send: Callback
133 json: Callback
134 jsonp = undefined
135 sendFile(path: string, options?: SendFileOptions, callback?: akala.NextFunction)
136 {
137 var encodedPath = encodeURI(path);
138 var done = callback;
139 var req = this.req;
140 var res = this;
141 var next = function (err?)
142 {
143 if (err)
144 res.send(500, err);
145 else
146 res.send(200);
147 };
148 var opts: SendFileOptions = options || {};
149
150 if (!path)
151 {
152 throw new TypeError('path argument is required to res.sendFile');
153 }
154
155 // support function as second arg
156 if (typeof options === 'function')
157 {
158 done = options;
159 opts = {};
160 }
161
162 // create file stream
163 var pathname = encodeURI(path);
164 var file = send(req, pathname, opts);
165
166 // transfer
167 sendfile(res, file, opts, function (err)
168 {
169 if (done) return done(err);
170 if (err && err.code === 'EISDIR') return next();
171
172 // next() all but write errors
173 if (err && err.code !== 'ECONNABORTED' && err.syscall !== 'write')
174 {
175 next(err);
176 }
177 });
178 }
179 sendfile = undefined
180 download = undefined
181 contentType(type: string) { this.setHeader('contentType', type); return this; }
182 type = undefined
183 format = undefined
184 attachment = undefined
185 set(field: any): this
186 set(field: string, value?: string): this
187 set(field: string | any, value?: string): this
188 {
189 return this.header(field, value);
190 }
191
192 header(field: any): this
193 header(field: string, value?: string): this
194 header(field: string | any, value?: string): this
195 {
196 if (typeof field == 'string')
197 {
198 if (typeof (value) == 'undefined')
199 return this.headers[field];
200 this.setHeader(field, value);
201 return this;
202 }
203 else
204 {
205 var self = this;
206 if (field)
207 akala.each(field, function (value, key)
208 {
209 self.setHeader(key as string, value);
210 });
211 }
212 }
213 headersSent = false
214 get = undefined
215 clearCookie = undefined
216 cookie = undefined
217 location(location: string) { this.setHeader('location', location); return this; }
218 setHeader(field: string, value: string)
219 {
220 this.headers[field] = value;
221 }
222 /**
223 * Redirect to the given `url` with optional response `status`
224 * defaulting to 302.
225 *
226 * The resulting `url` is determined by `res.location()`, so
227 * it will play nicely with mounted apps, relative paths,
228 * `"back"` etc.
229 *
230 * Examples:
231 *
232 * res.redirect('/foo/bar');
233 * res.redirect('http://example.com');
234 * res.redirect(301, 'http://example.com');
235 * res.redirect('http://example.com', 301);
236 * res.redirect('../login'); // /blog/post/1 -> /blog/login
237 */
238 redirect(url: string): void
239 redirect(status: number, url: string): void
240 redirect(url: string, status: number): void
241 redirect(url: string | number, status?: number | string): void
242 {
243 if (typeof (status) == 'undefined')
244 status = 302
245 if (typeof (url) == 'number' && typeof (status) == 'string')
246 {
247 var swap = url;
248 url = status;
249 status = swap;
250 }
251 this.setHeader('location', <string>url);
252 this.send(status);
253 }
254
255 /**
256 * Render `view` with the given `options` and optional callback `fn`.
257 * When a callback function is given a response will _not_ be made
258 * automatically, otherwise a response of _200_ and _text/html_ is given.
259 *
260 * Options:
261 *
262 * - `cache` boolean hinting to the engine it should cache
263 * - `filename` filename of the view being rendered
264 */
265 render = undefined;
266
267 locals: any;
268
269 charset: string;
270
271 /**
272 * Adds the field to the Vary response header, if it is not there already.
273 * Examples:
274 *
275 * res.vary('User-Agent').render('docs');
276 *
277 */
278 vary = undefined;
279
280 app = undefined;
281 setTimeout = undefined;
282 addTrailers = undefined;
283
284 writeContinue = undefined;
285 finished = false;
286 writable = true;
287 writeHead(statusCode: number, reasonPhrase?: string, headers?: any)
288 writeHead(statusCode: number, headers?: any): void
289 writeHead(statusCode: number, reasonPhrase?: string | any, headers?: any): void
290 {
291 log('writeHead');
292 this.statusCode = statusCode;
293 if (typeof reasonPhrase != 'string')
294 {
295 headers = reasonPhrase;
296 reasonPhrase = null;
297 }
298 if (reasonPhrase)
299 this.statusMessage = reasonPhrase;
300 this.header(headers);
301 this.send(this);
302 this.headersSent = true;
303 }
304 statusCode: number;
305 statusMessage: string;
306 sendDate: boolean;
307 getHeader(name: string)
308 {
309 return this.headers[name];
310 };
311 removeHeader(name: string): void
312 {
313 delete this.headers[name];
314 }
315
316 public _write(chunk: any, encoding?: string, callback?: (err: Error) => void): void
317 {
318 if (!this.headersSent)
319 this.writeHead(this.statusCode);
320
321 super['_write'](chunk, encoding, callback);
322 }
323}
324
325export function handle(app: Router, root: string)
326{
327 return function handle(request: Request, next?: akala.NextFunction): PromiseLike<CallbackResponse>
328 {
329 return new Promise((resolve, reject) =>
330 {
331 var callback: Callback = <any>function callback(status, data?: PayloadDataType | string)
332 {
333 var response: CallbackResponse;
334 if (arguments.length == 0)
335 return reject();
336 if (arguments.length == 1 && status === 'route')
337 return reject(status);
338
339 if (isNaN(Number(status)) || Array.isArray(status))
340 {
341 response = status;
342 if (typeof (data) == 'undefined')
343 {
344 if (typeof (status) == 'undefined')
345 response = { statusCode: 404, data: 'Not found' };
346 else if (isNaN(Number(response.statusCode)) && !(response instanceof stream.Readable))
347 {
348 data = response as any;
349 response = { statusCode: 200, data: data };
350 status = null;
351 }
352 else
353 data = undefined;
354 status = null;
355 }
356 }
357 else
358 response = { statusCode: status, data: 'No data' };
359
360 response.statusCode = response.statusCode || 200;
361
362 if (!(data instanceof stream.Readable) && !Buffer.isBuffer(data) && typeof (data) !== 'string' && typeof data != 'number' && typeof (data) != 'undefined')
363 {
364 if (!response.headers)
365 response.headers = {};
366 if (data instanceof Error)
367 {
368 if (!response.headers['Content-Type'])
369 response.headers['Content-Type'] = 'text/text';
370 data = data.stack;
371 }
372 else
373 {
374 if (!response.headers['Content-Type'])
375 response.headers['Content-Type'] = 'application/json';
376 data = JSON.stringify(data);
377 }
378 }
379 if (typeof (data) != 'undefined')
380 response.data = data;
381
382 resolve(response);
383 }
384
385 callback.redirect = function (url: string)
386 {
387 return callback({ status: 302, headers: { location: url } })
388 }
389
390 var requestInjector: WorkerInjector = new WorkerInjectorImpl(request);
391 requestInjector.register('$request', request);
392 requestInjector.register('$callback', callback);
393 // log(request);
394 Object.defineProperty(request, 'injector', { value: requestInjector, enumerable: false, configurable: false, writable: false });
395
396 log(request.url);
397 app.handle(request, callback, function (err, _req, _callback)
398 {
399 reject(err);
400 });
401 })
402 }
403}
404
405
406function sendfile(res, file, options: SendFileOptions, callback)
407{
408 var done = false;
409 var streaming;
410
411 // request aborted
412 function onaborted()
413 {
414 if (done) return;
415 done = true;
416
417 var err = new Error('Request aborted');
418 err['code'] = 'ECONNABORTED';
419 callback(err);
420 }
421
422 // directory
423 function ondirectory()
424 {
425 if (done) return;
426 done = true;
427
428 var err = new Error('EISDIR, read');
429 err['code'] = 'EISDIR';
430 callback(err);
431 }
432
433 // errors
434 function onerror(err)
435 {
436 if (done) return;
437 done = true;
438 callback(err);
439 }
440
441 // ended
442 function onend()
443 {
444 if (done) return;
445 done = true;
446 callback();
447 }
448
449 // file
450 function onfile()
451 {
452 streaming = false;
453 }
454
455 // finished
456 function onfinish(err)
457 {
458 if (err && err.code === 'ECONNRESET') return onaborted();
459 if (err) return onerror(err);
460 if (done) return;
461
462 setImmediate(function ()
463 {
464 if (streaming !== false && !done)
465 {
466 onaborted();
467 return;
468 }
469
470 if (done) return;
471 done = true;
472 callback();
473 });
474 }
475
476 // streaming
477 function onstream()
478 {
479 streaming = true;
480 }
481
482 file.on('directory', ondirectory);
483 file.on('end', onend);
484 file.on('error', onerror);
485 file.on('file', onfile);
486 file.on('stream', onstream);
487 onFinished(res, onfinish);
488
489 if (options.headers)
490 {
491 // set headers on successful transfer
492 file.on('headers', function headers(res)
493 {
494 var obj = options.headers;
495 var keys = Object.keys(obj);
496
497 for (var i = 0; i < keys.length; i++)
498 {
499 var k = keys[i];
500 res.setHeader(k, obj[k]);
501 }
502 });
503 }
504
505 // pipe
506 file.pipe(res);
507}
\No newline at end of file