engine.js | |
|---|---|
var nconf = require('nconf')
, redis = require("redis")
, _ = require('underscore')._
, neuron = require('neuron') | |
| , crypto = require('crypto') , sha = crypto.createHash('sha1') , zlib = require('zlib') | ;
require('date-utils');
|
| Use nconf to grab commandline params and read config.json | nconf.argv().file({ file: 'config.json' }); |
| If --dev is passed use the development config section | var config = nconf.get('dev') ? nconf.get('development') : nconf.get('live'); |
| Connect to redis | var client = redis.createClient(config.redis.port, config.redis.address);
if (nconf.get(config.redis.auth)) {
client.auth(config.redis.auth, function (err) {
if (err) { |
| handle err; | }
});
} |
Recursive function to remove unwanted elements from API response | var filter = function( source, map) {
if (_.isArray(source)) {
_.each(source, function(item, index) { filter( item, map[0]); });
} else {
if (_.isString(source) || map === true || _.isUndefined(map)) return 0;
_.each(source, function(obj, key, source) {
if (_.isUndefined(map[key])) {
delete source[key];
} else {
filter( obj, map[key]);
}
});
};
} |
Perform scheduled refresh | exports.refresh = function(api, key, bid, bundle) { |
| We're forcing a refresh of the content so run the api.code | api.resource( api.params, api.credentials, function( err, res ) {
if ( err ) { |
| We got an error so set our output object to be the error and expire immediately | api.expires = ( new Date() );
var tout = {
expires: api.expires,
result: err,
iid: bid+key,
cname: key,
scheduled: true
};
} else {
|
| Filter the response | if (_.has(api.filter)) {
filter ( res, api.filter );
}
|
| Perform cleanup function on API response | if (_.has(api, 'cleanup')) {
res = api.cleanup(res);
}
|
| Build the stored response | api.expires = ( new Date() ).addSeconds( api.cacheduration );
bundle[key] = api;
var tout = {
expires: api.expires,
result: res,
iid: api.iid,
cname: key,
scheduled: true
};
|
| Save response to Redis | client.set(bid+key, JSON.stringify(tout));
|
| Update the bundle | client.del('bundle'+bid);
}
});
} |
Retrieve the requested bundle | exports.fulfill = function ( myRes, bid, bundle, callback, override ) {
if (typeof bundle === 'undefined') {
myRes.writeHead(404);
myRes.end();
return false;
}
|
| Count the number of queries in this bundle so we know when we are ready to respond | var queriesInThisBundle = _.size(bundle),
thisResponse = {};
|
| cleanup is not an API request | if(_.has(bundle, 'cleanup')) {
queriesInThisBundle--;
}
|
| If override was not passed | if( _.isUndefined( override )) {
|
| Retrieve bundle response from Redis | client.get('bid'+bid, function ( err, doc ) {
if ( err || doc === null ) { |
| There was an error so force refresh on bundle | exports.fulfill( myRes, bid, bundle, callback, true );
} else {
doc = JSON.parse( doc );
|
| If there is a valid expiration date for the bundle | doc.expires = new Date(doc.expires);
if ( 'expires' in doc && _.isDate(doc.expires) ) {
doc.secleft = doc.expires.getSecondsBetween( new Date() ) * -1;
} else { |
| This should never happen | doc.secleft = 0;
}
if (Number(doc.secleft) < 0 ) {
|
| The bundle has expired. Force a refresh | exports.fulfill( myRes, bid, bundle, callback, true );
} else {
|
| Respond with the cached data | doc.fromcache = true;
var responseType = callback ? 'application/javascript' : 'application/json';
myRes.writeHead(200, {'Content-Type': responseType, 'max-age': doc.secleft, 'cache-control': 'public, max-age='+doc.secleft+', no-transform', "Expires": doc.expires });
|
| If a callback name was passed, use it. Otherwise, just output the object | var tbuf = new Buffer( ( !callback ) ? JSON.stringify( doc ) : callback + '(' + JSON.stringify( doc ) + ');' );
myRes.end(tbuf);
}
}
});
} else {
|
Override was passed so wew ar eforcing a refresh | var manager = new neuron.JobManager();
manager.on('empty', function (job) {
if (job.name === 'finishRequest' && queriesInThisBundle === 0) {
var done = true;
_.each(manager.job, function(job, key) {
if (_.size(job.waiting) !== 0 || _.size(job.running) !== 0 || job.queue.length !== 0) {
done = false;
}
});
if (done) manager.enqueue('sendResponse', bid);
}
});
manager.addJob('fulfillPart', {
work: function(api, key, override, cachedPart) {
var self = this;
if ( _.isUndefined( override ) ) {
|
| Load the cached api response from Redis | client.get(bid+key, function (err, doc) {
if (err || doc === null){
self.finished = true;
manager.enqueue('fulfillPart', api, key, true );
} else {
doc = JSON.parse( doc );
doc.expires = new Date(doc.expires);
if ( ('expires' in doc) && _.isDate(doc.expires) ) {
var secleft = doc.expires.getSecondsBetween( new Date() ) * -1;
}
if (secleft < 0) {
self.finished = true;
manager.enqueue('fulfillPart', api, key, true, doc );
} else {
doc.fromcache = true;
manager.enqueue('finishRequest', doc );
self.finished = true;
}
}
});
} else {
if (_.has( api, 'auth')) { |
| If the API request object has an auth scheme defined | api.auth.type.authorize (api, bid, key, function( result ) {
if (result === true) {
manager.enqueue('startRequest', api, key, cachedPart);
} else {
manager.enqueue('finishRequest', result );
}
self.finished = true;
});
} else { |
| Authentication is not needed | self.finished = true;
manager.enqueue('startRequest', api, key, cachedPart);
}
}
}
});
manager.addJob('startRequest', {
work: function( api, key, cachedPart ) {
var self = this;
if (_.has( api, 'timeout')) {
self.timeout = setTimeout(function(self) {
manager.enqueue('finishRequest', cachedPart );
if(_.has(cachedPart)) {
cachedPart.timeout = true;
cachedPart.fromcache = true;
};
self.finished = true;
}, api.timeout, self)
}
api.resource( api.params, api.credentials, function( err, res ) {
clearTimeout(self.timeout)
delete self.timeout;
if ( err ) {
api.expires = ( new Date() );
tout = _.isUndefined(cachedPart) ? {} : cachedPart;
tout.cname = key;
tout.expires = api.expires;
tout.fromcache = true;
tout.err = err;
} else {
|
| Filter the response | if (_.has(api, 'filter')) {
filter ( res, api.filter );
}
|
| Perform cleanup function on API response | if (_.has(api, 'cleanup')) {
res = api.cleanup(res);
}
|
| Build the stored response | api.expires = ( new Date() ).addSeconds( api.cacheduration );
bundle[key] = api; |
| client.set('bundle'+bid, JSON.stringify(bundle)); | var tout = {
expires: api.expires,
result: res,
iid: api.iid,
cname: key
};
|
| Save response to Redis | client.set(bid+key, JSON.stringify(tout));
}
manager.enqueue('finishRequest', tout );
self.finished = true;
});
}
})
manager.addJob('finishRequest', {
work: function(apiResponse) {
queriesInThisBundle--;
thisResponse[apiResponse.cname] = apiResponse;
this.finished = true;
}
});
manager.addJob('sendResponse', {
work: function() { |
| Update the expiration date on the bundle | var tout = {
expires: _.min( thisResponse, function( val ) { return val.expires } ).expires,
};
client.set('bundle'+bid, JSON.stringify(bundle));
|
| Insert api responses into bundle | _.each( thisResponse, function( val, idx ) {
tout[val.cname] = val;
});
|
| Perform cleanup function on bundle | if (_.has(bundle, 'cleanup')) {
tout = bundle.cleanup(tout);
}
|
| Save cached bundle in Redis | client.set('bid'+bid, JSON.stringify(tout));
|
| Determine the seconds left before expiry | if ( 'expires' in tout && _.isDate(tout.expires) ) {
tout.secleft = tout.expires.getSecondsBetween( new Date() ) * -1;
} else {
tout.secleft = 3600;
}
|
| Send the results | var responseType = callback ? 'application/javascript' : 'application/json';
myRes.writeHead(200, {'Content-Type': responseType, 'max-age': tout.secleft, 'cache-control': 'public, max-age='+tout.secleft+', no-transform', "Expires": tout.expires });
var tbuf = new Buffer(!callback ? JSON.stringify(tout) : callback + '(' + JSON.stringify(tout) + ');' );
myRes.end(tbuf);
}
});
_.each( bundle, function( api, key ) {
if (key !== 'cleanup') {
if (_.isUndefined(api, 'credentials')) {
api.credentials = {};
}
manager.enqueue('fulfillPart', api, key);
}
});
}
}
|