/** * joola.io * * Copyright Joola Smart Solutions, Ltd. * * Licensed under GNU General Public License 3.0 or later. * Some rights reserved. See LICENSE, AUTHORS. * * @license GPL-3.0+ */ global.logger_component = 'engine'; var logger = require('joola.io.logger'), _datasources = require('./lib/objects/datasources'), _datatables = require('./lib/objects/datatables'), connector = require('./lib/connectors/connector'), caching = require('./lib/caching/manager.js'), _ = global._ = require('underscore'), ce = global.ce = require('cloneextend'), express = require('express'), http = require('http'), https = require('https'), path = require('path'), nconf = require('nconf'), async = require('async'); require('nconf-http'); require('pkginfo')(module, 'version'); require('date-utils'); var joola = {}; global.joola = joola; joola.state = { status: 'init', running: false }; joola.logger = logger; joola.config = nconf; joola.config.server = null; joola.config.auth = null; joola.config.integration = null; joola.config.content = null; var httpServer, httpsServer, app = global.app = express(); joola.config.add('base-config', { type: 'file', file: joola.config.get('confurl') || './config/config.json' }); var fetchConfig = function (name, file, callback) { joola.logger.debug('Fetching configuration from: ' + joola.config.get('config:url') + file); joola.config.add(name, { type: 'http', url: joola.config.get('config:url') + file, callback: function (err) { if (err) { //Fallback to file joola.logger.warn('Failed to fetch config, fallback to local FS: ' + joola.config.get('config:url') + file); joola.config.add(name, { type: 'file', file: joola.config.get('conf') || './config/' + file + '.json' }); } //Validate config if (!joola.config.get(name + ':version')) return callback(new Error('Failed to load configuration file: ' + './config/' + file + '.json')); return callback(null); }}); }; var loadConfig = function (callback) { joola.state.status = 'loadConfig'; joola.config.argv() .env(); //Load integration config //server fetchConfig('server', 'joola.io.engine.server', function (err) { if (err) return callback(err); joola.config.server = joola.config.get('server'); //auth fetchConfig('auth', 'joola.io.engine.auth', function (err) { if (err) return callback(err); joola.config.auth = joola.config.get('auth'); //integration fetchConfig('integration', 'joola.io.engine.integration', function (err) { if (err) return callback(err); joola.config.integration = joola.config.get('integration'); //content fetchConfig('content', 'joola.io.engine.content', function (err) { if (err) return callback(err); joola.config.content = joola.config.get('content'); return callback(null); }); }); }); }); }; var setupCache = function (callback) { var port, host, dbid, options; var errorReported = false; host = joola.config.get('server:redis:host'); port = joola.config.get('server:redis:port'); dbid = joola.config.get('server:redis:DB'); if (!dbid) { joola.config.set('server:redis:DB', 0); } var handleRedisError = function (err) { return joola.logger.error('Redis error: ' + err); } require('./lib/shared/redis').redis(joola, function (err) { if (err) { if (!errorReported) { return handleRedisError(err); //throw err; } else { errorReported = true; return callback(err); } } require('./lib/shared/cache').cache(joola, function (err) { if (err) { if (!errorReported) return handleRedisError(err); //throw err; else { errorReported = true; return callback(err); } } return callback(null); }); }); }; var flushRedis = function (callback) { joola.redis.flushdb(function (err) { if (err) return callback(err); joola.logger.warn('Redis cache flushed'); callback(null); }); }; var processIntegration = function (callback) { joola.state.status = 'processIntegration'; var datasources = joola.config.integration.datasources; datasources.forEach(function (ds) { if (!ds.id) throw 'JSON Config Validation Error: Source ' + ds.name + ' has no id'; _.each(ds.datatables, function (dt) { if (!dt.id) throw 'JSON Config Validation Error: Table ' + dt.name + ' has no id'; dt.datasourceid = ds.id; }); }); return callback(null); }; var verifyDatasources = function (callback) { var datasources = joola.config.integration.datasources; var checks = []; var allChecksPassed = true; datasources.forEach(function (ds) { var check = function (callback) { joola.logger.debug('Checking data source [' + ds.id + ']...'); _datasources.validate(ds.id, function (err, validated) { if (err) { joola.logger.warn('Failed to validate data source: ' + ds.id); allChecksPassed = false; } else if (!validated) { joola.logger.warn('Failed to validate data source: ' + ds.id); allChecksPassed = false; } if (joola.config.get('flushcache')) { ds.datatables.forEach(function (dt) { dt.datasource = ds; _datatables.flushCache(dt, function () { }); }) return callback(); } else return callback(); }); }; checks.push(check); }); async.parallel(checks, function () { if (allChecksPassed) return callback(null); else return callback(new Error('Some data source checks failed, check log for more details.')); }); }; var preFlight = function (callback) { joola.state.status = 'preFlight'; loadConfig(function (err) { if (err) return callback(err); joola.logger.info('Loaded server config version ' + joola.config.get('server:version')); joola.logger.info('Loaded auth config version ' + joola.config.get('auth:version')); joola.logger.info('Loaded integration config version ' + joola.config.get('integration:version')); joola.logger.info('Loaded content config version ' + joola.config.get('content:version')); joola.logger.setLevel(joola.config.server.logLevel); setupCache(function (err) { if (err) { return callback(err); } processIntegration(function (err) { if (err) return callback(err); verifyDatasources(function (err) { if (err) return callback(err); return callback(null); }); }); }); }); }; var start = function (callback) { joola.state.status = 'starting'; joola.state.running = false; setupApplication(function (err) { if (err) return callback(err); setupRoutes(function (err) { if (err) return callback(err); setupControlPort(function () { joola.logger.info('Control port running on port ' + joola.config.get('server:controlPort:port')); startHTTP(function () { joola.logger.debug('HTTP running'); if (joola.config.get('server:secure') === true) { startHTTPS(function () { joola.logger.debug('HTTPS running'); return callback(null); }); } else return callback(null); }); }); }); }); }; var setupApplication = function (callback) { try { var winstonStream = { write: function (message, encoding) { joola.logger.info(message); } }; app.use(express.logger((global.test ? function (req, res) { } : {stream: winstonStream}))); app.set('views', __dirname + '/views'); app.set('view engine', 'jade'); app.use(express.compress()); app.use(express.bodyParser()); app.use(express.methodOverride()); return callback(null); } catch (err) { joola.logger.error('setupApplication: ' + err) return callback(err); } }; var setupRoutes = function (callback) { try { var index = require('./routes/index'); app.configure(function () { app.use(function (req, res, next) { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'joola-token, Content-Type, origin'); res.setHeader('Access-Control-Max-Age', '86400'); res.setHeader('X-Powered-By', 'joola.io'); return next(); }); }); app.get('/', index.index); app.get('/:resource', index.route); app.get('/:resource/:action', index.route); app.use(express.static(path.join(__dirname, 'public'))); app.use(app.router); //TODO: Setup 500 and 404 routes return callback(null); } catch (err) { joola.logger.error('setupRoutes: ' + err) return callback(err); } }; var startHTTP = function (callback) { var result = {}; try { var _httpServer = http.createServer(app).listen(joola.config.get('server:port'),function (err) { if (err) { result.status = 'Failed: ' + ex.message; return callback(result); } joola.state.status = 'running'; joola.state.running = true; joola.logger.info('Joola Analytics HTTP server listening on port ' + joola.config.get('server:port')); httpServer = _httpServer; return callback(result); }).on('error',function (ex) { result.status = 'Failed: ' + ex.message; return callback(result); }).on('close', function () { joola.state.status = 'stopped'; joola.state.running = false; joola.logger.warn('Joola Analytics HTTP server listening on port ' + (joola.config.get('server:port')).toString() + ' received a CLOSE command.'); }); } catch (ex) { result.status = 'Failed: ' + ex.message; return callback(result); } return null; }; var startHTTPS = function (callback) { var result = {}; try { var secureOptions = { key: fs.readFileSync(joola.config.get('server:keyFile')), cert: fs.readFileSync(joola.config.get('server:certFile')) }; var _httpsServer = https.createServer(secureOptions, app).listen(joola.config.get('server:securePort'),function (err) { if (err) { result.status = 'Failed: ' + ex.message; return callback(result); } joola.logger.info('Joola Analytics HTTPS server listening on port ' + joola.config.get('server:securePort')); result.status = 'Success'; httpsServer = _httpsServer; return callback(result); }).on('error',function (ex) { result.status = 'Failed: ' + ex.message; return callback(result); }).on('close', function () { joola.logger.warn('Joola Analytics HTTPS server listening on port ' + joola.config.get('server:securePort').toString() + ' received a CLOSE command.'); }); } catch (ex) { result.status = 'Failed: ' + ex.message; return callback(result); } return null; }; //Control Port var setupControlPort = function (callback) { var cp = require('node-controlport'); var cp_endpoints = []; cp_endpoints.push({ endpoint: 'status', exec: function (callback) { return callback({status: joola.state.status, pid: process.pid}); } }); cp_endpoints.push({ endpoint: 'start', exec: function (callback) { if (joola.config.get('server:secure') === true) { startHTTP(function () { startHTTPS(callback); }); } else { startHTTP(callback); } } } ); cp_endpoints.push({ endpoint: 'stop', exec: function (callback) { var result = {}; result.status = 'Success'; try { httpServer.close(); if (joola.config.get('server:secure') === true) httpsServer.close(); process.exit(0); } catch (ex) { console.log(ex); result.status = 'Failed: ' + ex.message; return callback(result); } return callback(result); } }); cp.start(joola.config.get('server:controlPort:port'), cp_endpoints, callback); }; <<<<<<< HEAD ======= joola.logger.info('Starting joola.io Engine, version ' + module.exports.version + '...'); preFlight(function () { joola.logger.info('Preflight checks done, starting engine...'); start(function (err) { if (err) throw err; joola.logger.info('joole.io Engine started!'); if (joola.config.get('flushcache')) { flushRedis(function (err) { if (err) throw err; }); } fetchEndDates(function (err) { buildInitialCache(function () { setupEagerCache(function () { }); }); logger.info('Cache ready!'); }); }); }); >>>>>>> develop var fetchEndDates = function (callback) { joola.logger.debug('Fetching end dates...'); var datasources = _datasources.list(true); var async_calls = []; datasources.forEach(function (ds) { if (ds.enddate) { if (ds.enddate.type == 'query') { var query = connector.createQuery(); query.datasource = ds; query.sql = ds.enddate.query; joola.logger.debug('Fetching end dates for datasource - ' + ds.id); var call = function (callback) { connector.executeQuery(query, function (query, rows, fields, error) { if (error) throw error; var row = rows.rows[0]; var col = rows.fields[0].name; var timestamp; try { timestamp = new Date(row[col]); //.fixDate(true, true); } catch (ex) { //no data maybe? timestamp = new Date(); } timestamp.setMilliseconds(timestamp.getMilliseconds() - 1); joola.logger.debug('... [' + ds.id + '] ' + timestamp.format('yyyy-mm-dd hh:nn:ss')); ds.enddate.value = timestamp; return callback(); }); }; async_calls.push(call); } } }); fork(async_calls, function () { joola.logger.debug('...Datasource end dates fetched.'); /* joola.config.content.system.enddate = function () { var _date = ce.clone(datasources[0].enddate.value); _date.addDays(-1); return datasources[0].enddate.value; };*/ return callback(); }); }; var buildInitialCache = function (callback) { var range; //monitor which datatables need interval based caching var found = false; var datatables = _datatables.list(); var calls = []; var errorReported = false; var cacheTable = function (args, callback) { var query = args.query; var dt = args.dt; query.enddate = ce.clone(args.enddate); query.startdate = ce.clone(args.startdate); caching.cacheTable(dt, ce.clone(query), function (err) { return callback(err); }); }; datatables.forEach(function (dt) { if (dt.caching) { if (dt.caching.method == 'persist' || dt.caching.method == 'forward') { found = true; joola.logger.debug('Found [' + dt.id + '] for cache management [' + dt.caching.method + ']...'); var step = dt.caching.step; //first fetch the base line, and then setup the time for the regular forward fetches var ds = _datasources.get(dt.datasourceid); var query = connector.createQuery(); dt.datasource = ds; dt.query = _datatables.basequery(dt); var basequery = dt.query; query.sql = basequery.sql; query.datasource = ds; var enddate = ce.clone(ds.enddate.value); var startdate = ce.clone(enddate); startdate.setMilliseconds(startdate.getMilliseconds() + 1); startdate.addDays(-1 * parseInt((range ? range : dt.caching.baseline), 10)); var _baseline_start = ce.clone(startdate); query.enddate = enddate; query.startdate = startdate; var firstRun = true; if (_baseline_start.dateDiff(enddate) > step) { while (startdate >= _baseline_start) { if (firstRun) { startdate = ce.clone(enddate); startdate.setMilliseconds(startdate.getMilliseconds() + 1); firstRun = false; } startdate.addDays(-1 * parseInt((range ? range : dt.caching.step), 10)); calls.push({query: query, dt: dt, startdate: ce.clone(startdate), enddate: ce.clone(enddate)}); enddate = ce.clone(startdate); enddate.setMilliseconds(enddate.getMilliseconds() - 1); } } else { calls.push({query: query, dt: dt, startdate: ce.clone(startdate), enddate: ce.clone(enddate)}); } } } }); async.mapSeries(calls, cacheTable, function (err, results) { if (err) return callback(err); joola.logger.info('Cache pre-load completed.'); //TODO: add foward caching /* setTimeout(function () { intervalCheck(dt); }, dt.caching.interval); */ return callback(null); }); if (!found) return callback(null); }; var setupEagerCache = function (callback) { var tables = _datatables.list(); _.each(tables, function (dt) { dt.datasource = _datasources.get(dt.datasourceid); if (dt.caching && dt.caching.eager && dt.caching.eager.interval && dt.caching.eager.interval > 0) { joola.logger.debug('Setting up eagerCache for [' + dt.id + '] task with interval: ' + dt.caching.eager.interval); var tFunction = function () { caching.eagerCache(dt, function () { joola.logger.debug('Re-setting eagerCache for [' + dt.id + '] task with interval: ' + dt.caching.eager.interval); setTimeout(tFunction, dt.caching.eager.interval); }); }; setTimeout(tFunction, dt.caching.eager.interval); } }); return callback(); }; joola.logger.info('Starting joola.io Engine, version ' + module.exports.version + '...'); try { preFlight(function (err) { if (err) throw err; else { joola.logger.info('Preflight checks done, starting engine...'); try { start(function (err) { if (err) throw err; joola.logger.info('joole.io Engine started!'); fetchEndDates(function (err) { if (err) throw err; buildInitialCache(function (err) { if (err) throw err; setupEagerCache(function (err) { if (err) throw err; }); }); logger.info('Cache ready!'); }); }); } catch (ex) { joola.logger.error('FATAL EXCEPTION (start): ' + ex.message + '\n' + ex.stack, null, function () { process.exit(1); }); } } }); } catch (ex) { joola.logger.error('FATAL EXCEPTION (preflight): ' + ex.message + '\n' + ex.stack, null, function () { process.exit(1); }); } process.on('uncaughtException', function (exception) { // handle or ignore error console.log('FATAL EXCEPTION: ' + exception.message); console.log(exception.stack); joola.logger.error('FATAL EXCEPTION: ' + exception.message + '\n' + exception.stack, null, function () { process.exit(1); }); });