UNPKG

17.5 kBJavaScriptView Raw
1"use strict";
2
3var EventEmitter = require ('events').EventEmitter,
4 http = require ('http'),
5 util = require ('util'),
6 url = require ('url'),
7 path = require ('path'),
8 os = require ('os'),
9 dataflows = require ('../index'),
10 flow = require ('../flow'),
11 common = dataflows.common,
12 paint = dataflows.color;
13
14var mime, memoize;
15
16try {
17 mime = require ('mime');
18} catch (e) {
19 console.error (paint.error ('cannot find mime module'));
20}
21
22try {
23 memoize = require ('memoizee');
24} catch (e) {
25 console.error ('memoizee module not found. it provide optimized path lookups');
26}
27
28/**
29 * @class initiator.httpdi
30 * @extends events.EventEmitter
31 *
32 * Initiates HTTP server-related dataflows.
33 */
34var httpdi = module.exports = function httpdIConstructor (config) {
35 // we need to launch httpd
36
37 this.host = config.host;
38 if (!config.port)
39 throw "you must define 'port' key for http initiator";
40 else
41 this.port = config.port;
42
43 this.flows = config.workflows || config.dataflows || config.flows;
44
45 // I don't want to serve static files by default
46 if (config.static) {
47 this.static = config.static === true ? {} : config.static;
48 // - change static root by path
49 if (typeof project !== "undefined") {
50 this.static.root = project.root.fileIO (this.static.root || 'www');
51 } else {
52 var io = require (path.join (__dirname, '../io/easy'));
53 this.static.root = new io (this.static.root || 'www');
54 }
55
56 this.static.index = this.static.index || "index.html";
57 this.static.headers = this.static.headers || {};
58 }
59
60 // - - - prepare configs
61 this.prepare = config.prepare;
62
63 //
64
65 if (config.router && process.mainModule.exports[config.router]) {
66 this.router = process.mainModule.exports[config.router];
67 } else {
68 this.router = this.defaultRouter;
69 }
70
71 // TODO: use 0.0.0.0 instead of auto
72// if (this.host == "auto") {
73// this.detectIP (this.listen);
74// } else {
75 this.listen ();
76// }
77
78 return this;
79};
80
81util.inherits (httpdi, EventEmitter);
82
83httpdi.connections = {};
84
85httpdi.prototype.started = function () {
86 // called from server listen
87 var listenHost = this.host ? this.host : '127.0.0.1';
88 var listenPort = this.port === 80 ? '' : ':'+this.port;
89 console.log(
90 'http initiator running at',
91 paint.path (
92 'http://'+listenHost+listenPort+'/'
93 ),
94 this.static
95 ? "and serving static files from " + paint.path (this.static.root.path) // project.root.relative (this.static.root)
96 : ""
97 );
98
99 httpdi.connections[this.host+":"+this.port] = this.server;
100
101 this.ready = true;
102
103 this.emit ('ready');
104};
105
106httpdi.prototype.runPrepare = function (df, request, response, prepareCfg) {
107
108 var self = this;
109
110 var prepare = this.prepare;
111
112 if (prepare) {
113
114 var dfChain = [];
115
116 // create chain of wfs
117
118 var prepareFailure = false;
119
120 prepareCfg.forEach(function(p, index, arr) {
121
122 var innerDfConfig = util.extend (true, {}, prepare[p]);
123
124 if (!innerDfConfig || !innerDfConfig.tasks) {
125 console.error (paint.error('request canceled:'), 'no prepare task named "'+p+'"');
126// self.emit ('error', 'no prepare task named "'+p+'"');
127 prepareFailure = true;
128 var presenter = self.createPresenter({}, request, response, 'failed');
129// var presenter = self.createPresenter(cDF, 'failed');
130 if (presenter)
131 presenter.runDelayed ();
132 return;
133 }
134
135 innerDfConfig.stage = 'prepare';
136
137 innerDfConfig.idPrefix = df.coloredId + '>';
138
139 var innerDf = new flow(innerDfConfig, {
140 request: request,
141 response: response
142 });
143
144 dfChain.push(innerDf);
145
146 });
147
148 if (prepareFailure) {
149 return;
150 }
151
152 // push main df to chain
153
154 dfChain.push(df);
155
156 // subscribe they
157
158 for (var i = 0; i < dfChain.length-1; i++) {
159
160 var currentDf = dfChain[i];
161 currentDf.nextDf = dfChain[i+1];
162
163 currentDf.on('completed', function(cDF) {
164 setTimeout(cDF.nextDf.runDelayed.bind (cDF.nextDf), 0);
165 });
166
167 currentDf.on('failed', function(cDF) {
168 var presenter = self.createPresenter(cDF, request, response, 'failed');
169 if (presenter)
170 presenter.runDelayed ();
171 });
172
173 }
174
175 dfChain[0].runDelayed();
176
177 } else {
178
179 throw "Config doesn't contain such prepare type: " + df.prepare;
180
181 }
182};
183
184
185httpdi.prototype.createPresenter = function (df, request, response, state) {
186 var self = this;
187 // presenter can be:
188 // {completed: ..., failed: ..., failedRequire: ...} — succeeded or failed tasks in dataflow or failed require step
189 // "template.name" — template file for presenter
190 // {"type": "json"} — presenter config
191 // TODO: [{...}, {...}] — presentation dataflow
192
193 if (!df.presenter) {
194 this.finishRequest (response);
195 return;
196 }
197 // TODO: emit SOMETHING
198
199 var presenter = df.presenter;
200
201 //console.log ('running presenter on state: ', state, presenter[state]);
202
203 // {completed: ..., failed: ..., failedRequire: ...}
204 if (presenter[state])
205 presenter = presenter[state];
206
207 var tasks = [];
208
209 if (Object.is('String', presenter)) {
210 // "template.name"
211 // WTF? not sure about use case. we want all data declared explicitly.
212 // but here is no data passed to presenter
213 tasks.push ({
214 file: presenter,
215 //vars: "{$vars}",
216 response: "{$response}",
217 $class: "presenter",
218 $important: true
219 });
220 } else if (Object.is('Array', presenter)) {
221 // TODO: [{...}, {...}]
222 presenter.map (function (item) {
223 var task = {};
224 util.extend (true, task, item);
225 task.response = "{$response}";
226 task.vars = task.vars || task.data || {};
227 if (!Object.keys (task.vars).length && task.dump)
228 task.vars = df.data;
229 if (!task.functionName || !task.$function) {
230 task.className = task.$class || task.className ||
231 "presenter";
232 task.$important = true;
233 }
234 tasks.push (task);
235 });
236 } else {
237 // {"type": "json"}
238 presenter.response = "{$response}";
239
240 if (!presenter.vars && !presenter.data && presenter.dump) {
241 presenter.vars = {};
242 var skip = {};
243 "request|response|global|appMain|project".split ('|').forEach (function (k) {
244 skip[k] = true;
245 });
246 // WHY IS DF.DATA IS FILLED WITH JUNK???
247 for (var k in df.data) {
248 if (!skip[k]) {
249 presenter.vars[k] = df.data[k];
250 }
251 }
252 } else {
253 presenter.vars = presenter.vars || presenter.data || {};
254 }
255
256 if (!presenter.functionName || !presenter.$function) {
257 presenter.className = presenter.$class || presenter.className ||
258 "presenter";
259 presenter.$important = true;
260 }
261
262 tasks.push (presenter);
263 }
264
265 var reqParams = util.extend(true, {
266 error: df.error,
267 request: request,
268 response: response
269 }, df.data);
270
271 var presenterDf = new flow ({
272 id: df.id,
273 tasks: tasks,
274 stage: 'presentation'
275 }, reqParams);
276
277 presenterDf.on ('completed', function () {
278 //self.log ('presenter done');
279 self.finishRequest (response);
280 });
281
282 presenterDf.on ('failed', function () {
283 presenterDf.log ('Presenter failed: ' + request.method + ' to ' + request.url.pathname);
284 var df500 = self.createFlowByCode(500, request, response);
285 if (df500) {
286 df500.on ('completed', self.finishRequest.bind (self, response));
287 df500.on ('failed', self.finishRequest.bind (self, response));
288 } else {
289 self.finishRequest (response);
290 }
291 });
292
293 return presenterDf;
294};
295
296httpdi.prototype.finishRequest = function (res) {
297 if (!res.finished)
298 res.end ();
299};
300
301httpdi.prototype.createFlow = function (cfg, req, res) {
302 var self = this;
303
304 if (cfg.static) {
305 return false;
306 }
307
308 // task MUST contain tasks or presenter
309 if (!cfg.tasks) {
310 if (!cfg.presenter) {
311 return;
312 } else {
313 var df = {
314 //id:,
315 data: {},
316 //error: ,
317 presenter: cfg.presenter
318 };
319 var presenter = self.createPresenter(df, req, res, 'completed');
320 if (presenter) {
321 presenter.runDelayed ();
322 self.emit('detected', req, res, presenter);
323 return presenter;
324 }
325 }
326 }
327
328
329 var df = new flow(
330 util.extend (true, {}, cfg),
331 { request: req, response: res }
332 );
333
334 if (cfg.presenter) {
335 df.presenter = cfg.presenter;
336 }
337
338 console.log ('dataflow', req.method, req.url.pathname, df.coloredId);
339
340 df.on('completed', function (df) {
341 var presenter = self.createPresenter(df, req, res, 'completed');
342 if (presenter) {
343 presenter.runDelayed ();
344 }
345 });
346
347 df.on('failed', function (df) {
348 var presenter = self.createPresenter(df, req, res, 'failed');
349 if (presenter) {
350 presenter.runDelayed ();
351 }
352
353 });
354
355 self.emit('detected', req, res, df);
356
357 if (cfg.prepare) {
358 self.runPrepare(df, req, res, cfg.prepare);
359 } else {
360 df.runDelayed();
361 }
362
363 return df;
364};
365
366httpdi.prototype.createFlowByCode = function (code, req, res) {
367 res.statusCode = code;
368 // find a flow w/ presenter by HTTP response code
369 if (!this.flows._codeFlows) {
370 this.flows._codeFlows = {};
371 }
372 if (!(res.statusCode in this.flows._codeFlows)) {
373 this.flows._codeFlows[
374 res.statusCode
375 ] = this.flows.filter(function (df) {
376 return df.code == res.statusCode;
377 })[0];
378 }
379 var codeDfConfig = this.flows._codeFlows[res.statusCode];
380 if (codeDfConfig) {
381 if (!codeDfConfig.tasks) { codeDfConfig.tasks = []; }
382 var df = this.createFlow(codeDfConfig, req, res);
383 if (df) {
384 df.on ('completed', this.finishRequest.bind (this, res));
385 df.on ('failed', this.finishRequest.bind (this, res));
386 return true;
387 }
388 }
389
390 this.finishRequest (res);
391 return false;
392};
393
394httpdi.prototype.initFlow = function (wfConfig, req) {
395};
396
397// hierarchical router
398// TODO: igzakt match
399// TODO: dirInfo, fileName, fileExtension, fullFileName
400httpdi.prototype.hierarchical = function (req, res) {
401 var pathName = req.url.pathname;
402
403 // strip trailing slashes
404 if (pathName.length > 1) {
405 pathName = pathName.replace(/\/+$/, '');
406 }
407
408 var pathParts = pathName.split(/\/+/).slice(1);
409
410 var capture = [];
411 this.hierarchical.tree = this;
412 this.hierarchical.path = [];
413 var routeFinder = this.hierarchical.findByPath.bind (this.hierarchical);
414 if (memoize)
415 routeFinder = memoize (routeFinder);
416 var config = routeFinder (
417 null, pathParts, 0, capture
418 );
419
420 var pathPartsRemains = pathParts.slice (this.hierarchical.checkedLevel + 1);
421
422 // console.log (this.hierarchical.path, this.hierarchical.checkedLevel, pathParts, pathPartsRemains);
423
424 if (config) {
425 req.capture = capture;
426 req.pathInfo = pathPartsRemains.join ('/');
427 return this.createFlow (config, req, res);
428 }
429
430 return null;
431};
432
433httpdi.prototype.hierarchical.walkList = function (
434 list, pathParts, level, callback
435) {
436 var pathLen = pathParts.length;
437 var listLen = list && list.length;
438 outer: for (var i = 0; i < listLen; i += 1) {
439 var tree = list[i];
440
441 for (var j = pathLen; j > level; j -= 1) {
442 var pathFragment = pathParts.slice(level, j).join('/');
443
444 if (callback(tree, pathFragment, j - 1)) {
445 break outer;
446 }
447 }
448 }
449};
450
451httpdi.prototype.hierarchical.findByPath = function (
452 tree, pathParts, level, capture
453) {
454 if (!tree)
455 tree = this.tree;
456 var list = tree.workflows || tree.dataflows || tree.flows;
457 this.checkedLevel = level;
458 var branch = null;
459
460 // exact match
461 this.walkList(
462 list, pathParts, level,
463 function (tree, pathFragment, index) {
464 // console.log ('PATH', tree.path, 'FRAGMENT', pathFragment);
465 if (tree.path == pathFragment) {
466 this.checkedLevel = index;
467 branch = tree;
468 this.path.push (tree.path);
469 return true;
470 }
471 return false;
472 }.bind (this)
473 );
474
475 // pattern match
476 !branch && this.walkList(
477 list, pathParts, level,
478 function (tree, pathFragment, index) {
479 // console.log ('PATTERN', tree.pattern, 'FRAGMENT', pathFragment);
480 var match = tree.pattern && pathFragment.match(tree.pattern);
481 if (match) {
482 this.checkedLevel = index;
483 branch = tree;
484 capture.push.apply(capture, match.slice(1));
485 this.path.push (tree.path);
486 return true;
487 }
488 return false;
489 }.bind (this)
490 );
491
492 if ((branch && branch.static && this.checkedLevel >= 0) || this.checkedLevel >= pathParts.length - 1) {
493 return branch;
494 } else {
495 return branch && this.findByPath(
496 branch, pathParts, this.checkedLevel + 1, capture
497 );
498 }
499};
500
501httpdi.prototype.defaultRouter = httpdi.prototype.hierarchical;
502
503httpdi.prototype.httpDate = function (date) {
504 date = date || new Date ();
505 var fstr = "%a, %d %b %Y %H:%M:%S UTC";
506 var utc = 'getUTC';
507 //utc = utc ? 'getUTC' : 'get';
508 var shortDayNames = 'Sun Mon Tue Wed Thu Fri Sat'.split (' ');
509 var shortMonNames = 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split (' ');
510 return fstr.replace (/%[YmdHMSab]/g, function (m) {
511 switch (m) {
512 case '%Y': return date[utc + 'FullYear'] (); // no leading zeros required
513 case '%m': m = 1 + date[utc + 'Month'] (); break;
514 case '%d': m = date[utc + 'Date'] (); break;
515 case '%H': m = date[utc + 'Hours'] (); break;
516 case '%M': m = date[utc + 'Minutes'] (); break;
517 case '%S': m = date[utc + 'Seconds'] (); break;
518 case '%a': return shortDayNames[date[utc + 'Day'] ()]; // no leading zeros required
519 case '%b': return shortMonNames[date[utc + 'Month'] ()]; // no leading zeros required
520 default: return m.slice (1); // unknown code, remove %
521 }
522 // add leading zero if required
523 return ('0' + m).slice (-2);
524 });
525};
526
527httpdi.prototype.findHandler = function (req, res) {
528 var df = this.router (req, res);
529
530 if (df) {
531 if (!df.ready) {
532 console.error ("flow not ready and cannot be started");
533 }
534 return;
535 }
536
537 // console.log ('httpdi not detected: ' + req.method + ' to ' + req.url.pathname);
538 this.emit ("unknown", req, res);
539
540 // NOTE: we don't want to serve static files using nodejs.
541 // NOTE: but for rapid development this is acceptable.
542 // NOTE: you MUST write static: false for production
543 if (this.static) {
544 this.handleStatic (req, res);
545 } else {
546 this.createFlowByCode (404, req, res) || res.end();
547 }
548
549};
550
551httpdi.prototype.handleStatic = function (req, res) {
552 var self = this;
553
554 var isIndex = /\/$/.test(req.url.pathname) ? self.static.index : '';
555
556 var fileObject = self.static.root.fileIO (req.url.pathname.substr (1), isIndex);
557
558 console.log ('filesys ', req.method, req.url.pathname, isIndex ? '=> '+ isIndex : '');
559
560 var contentType, charset;
561 // make sure html return fast as possible
562 // if ('.html' == path.extname(pathName)) {
563 // contentType = 'text/html';
564 // charset = 'utf-8';
565 // } else
566 // TODO: maybe use extension based mapping, mime is slow because we need to read
567 // and analyze magic numbers
568 if (mime && mime.lookup) {
569 contentType = mime.lookup (fileObject.path);
570 // The logic for charset lookups is pretty rudimentary.
571 if (contentType.match (/^text\//))
572 charset = mime.charsets.lookup(contentType, 'utf-8');
573 if (charset) contentType += '; charset='+charset;
574 } else if (!contentType) {
575 console.error(
576 'sorry, there is no content type for %s', fileObject.path
577 );
578 }
579
580 var fileOptions = {flags: "r"};
581
582 var statusCode = 200;
583 var start = 0;
584 var end = 0;
585 var rangeHeader = req.headers.range;
586 if (rangeHeader != null) {
587 // console.log (rangeHeader);
588 var range = rangeHeader.split ('bytes=')[1].split ('-');
589 start = parseInt(range[0]);
590 end = parseInt(range[1]);
591 if (!isNaN(start)) {
592 if (!isNaN(end) && start > end) {
593 // error, return 200
594 } else {
595 statusCode = 206;
596 fileOptions.start = start;
597 if (!isNaN(end))
598 fileOptions.end = end;
599 // console.log (
600 // 'Browser requested bytes from %d to %d of file %s',
601 // start, end, file.name
602 // );
603
604
605 }
606 }
607 }
608
609 fileObject.readStream (fileOptions, function (readStream, stats) {
610
611 if (!stats) {
612 self.createFlowByCode (404, req, res);
613 return;
614 }
615
616 // if (isNaN(end) || end == 0) end = stat.size-1;
617 if (stats.isDirectory() && !readStream) {
618
619 res.statusCode = 303;
620 res.setHeader('Location', req.url.pathname +'/');
621 res.end('Redirecting to ' + req.url.pathname +'/');
622 return;
623
624 } else if (stats.isFile() && readStream) {
625 var headers = {};
626
627 var uri = req.url.pathname;
628 while (uri.length > 1) {
629 var h = self.static.headers[uri];
630 headers = util.extend(headers, h || {});
631 uri = path.dirname(uri);
632 }
633
634 var headersExtend = {
635 'Content-Type': contentType,
636 'Content-Length': stats.size,
637 'Date': self.httpDate (stats.mtime),
638 };
639
640 if (typeof project !== "undefined" && project.config.debug) {
641 headersExtend['Cache-Control'] = 'no-store, no-cache';
642 }
643
644 if (statusCode == 206) {
645 end = fileOptions.end ? fileOptions.end : stats.size-1;
646 headersExtend['Content-Range'] = 'bytes '+fileOptions.start+'-'+(end)+'/'+stats.size;
647 headersExtend["Accept-Ranges"] = "bytes";
648 headersExtend["Content-Length"] = end - fileOptions.start + 1;
649
650 // console.log (headersExtend);
651 }
652
653 headers = util.extend (headers, headersExtend);
654
655 req.on('close', function() {
656 readStream.destroy();
657 });
658
659 res.writeHead (statusCode, headers);
660 readStream.pipe (res);
661 readStream.resume ();
662 return;
663 }
664
665 self.handleFileStream (stats, readStream, req, res);
666 });
667
668};
669
670httpdi.prototype.listen = function () {
671
672 var self = this;
673
674 this.server = http.createServer (function (req, res) {
675 req.pause ();
676 // console.log ('serving: ' + req.method + ' ' + req.url + ' for ', req.connection.remoteAddress + ':' + req.connection.remotePort);
677
678 // here we need to find matching flows
679 // for received request
680
681 req.url = url.parse (req.url, true);
682 // use for flow match
683 req[req.method] = true;
684
685 self.findHandler (req, res);
686 });
687
688 var listenArgs = [this.port];
689
690 if (this.host) {
691 listenArgs.push (this.host);
692 }
693
694 listenArgs.push (function () {
695 self.started ();
696 });
697
698 this.server.listen.apply (this.server, listenArgs);
699};