engine.js

var nconf = require('nconf')
  , redis = require("redis")
  , _ = require('underscore')._
  , neuron = require('neuron')

, crypto = require('crypto') , sha = crypto.createHash('sha1') , zlib = require('zlib')

;
  
require('date-utils');
  

Use nconf to grab commandline params and read config.json

nconf.argv().file({ file: 'config.json' });

If --dev is passed use the development config section

var config = nconf.get('dev') ? nconf.get('development') : nconf.get('live');

Connect to redis

var client = redis.createClient(config.redis.port, config.redis.address);
if (nconf.get(config.redis.auth)) {
	client.auth(config.redis.auth, function (err) {
		if (err) { 

handle err;

		}
	});
}

Recursive function to remove unwanted elements from API response

var filter = function( source, map) {
	if (_.isArray(source)) {
		_.each(source, function(item, index) { filter( item, map[0]); });
	} else {
		if (_.isString(source) || map === true || _.isUndefined(map)) return 0;
		_.each(source, function(obj, key, source) {
			if (_.isUndefined(map[key])) {
				delete source[key];
			} else {
				filter( obj, map[key]);
			}
		});
	};	
}

Perform scheduled refresh

exports.refresh = function(api, key, bid, bundle) {

We're forcing a refresh of the content so run the api.code

	api.resource( api.params, api.credentials, function( err, res ) {
		if ( err ) {

We got an error so set our output object to be the error and expire immediately

			api.expires = ( new Date() );
  			var tout = {
  				expires: api.expires,
  				result: err,
  				iid: bid+key,
  				cname: key,
  				scheduled: true
  			};
				
		} else {
			

Filter the response

  			if (_.has(api.filter)) {
	  			filter ( res, api.filter );
  			}
			

Perform cleanup function on API response

  			if (_.has(api, 'cleanup')) {
	  			res = api.cleanup(res);
  			}
			

Build the stored response

  			api.expires = ( new Date() ).addSeconds( api.cacheduration );
  			bundle[key] = api;
  		
  			var tout = {
  				expires: api.expires,
  				result: res,
  				iid: api.iid,
  				cname: key,
  				scheduled: true
  			};
	   		

Save response to Redis

	   		client.set(bid+key, JSON.stringify(tout));
	   		

Update the bundle

	   		client.del('bundle'+bid);
		}
  	});
	
}

Retrieve the requested bundle

exports.fulfill = function ( myRes, bid, bundle, callback, override ) {
	
	if (typeof bundle === 'undefined') {
		myRes.writeHead(404);
		myRes.end();
		return false;
	}
	

Count the number of queries in this bundle so we know when we are ready to respond

	var queriesInThisBundle = _.size(bundle),
		thisResponse = {};
		

cleanup is not an API request

	if(_.has(bundle, 'cleanup')) {
		queriesInThisBundle--;
	}
	

If override was not passed

	if( _.isUndefined( override )) {
		

Retrieve bundle response from Redis

		client.get('bid'+bid, function ( err, doc ) {	
			
			if ( err || doc === null ) {

There was an error so force refresh on bundle

				exports.fulfill( myRes, bid, bundle, callback, true );
				
			} else {
				
				doc = JSON.parse( doc );
				

If there is a valid expiration date for the bundle

				doc.expires = new Date(doc.expires);
				if ( 'expires' in doc && _.isDate(doc.expires) ) {
					doc.secleft = doc.expires.getSecondsBetween( new Date() ) * -1;
				} else {

This should never happen

					doc.secleft = 0;
				}
				
				if (Number(doc.secleft) < 0 ) {
					

The bundle has expired. Force a refresh

					exports.fulfill( myRes, bid, bundle, callback, true );
					
				} else {
					

Respond with the cached data

					doc.fromcache = true;
					var responseType = callback ? 'application/javascript' : 'application/json';
					myRes.writeHead(200, {'Content-Type': responseType, 'max-age': doc.secleft, 'cache-control': 'public, max-age='+doc.secleft+', no-transform', "Expires": doc.expires });
					

If a callback name was passed, use it. Otherwise, just output the object

					var tbuf = new Buffer(  ( !callback ) ? JSON.stringify( doc ) : callback + '('  + JSON.stringify( doc ) +  ');'  );
					myRes.end(tbuf);
				}
			}
		});	
	
	} else {
		

Override was passed so wew ar eforcing a refresh

		var manager = new neuron.JobManager();
		
		manager.on('empty', function (job) {
	
			if (job.name === 'finishRequest' && queriesInThisBundle === 0) {
				var done = true;
				
				_.each(manager.job, function(job, key) {
					if (_.size(job.waiting)  !== 0 || _.size(job.running) !== 0 || job.queue.length !== 0) {
        				done = false;
        			}
        		});
        		
        		if (done) manager.enqueue('sendResponse', bid);
			}
		});
		
		manager.addJob('fulfillPart', {
			work: function(api, key, override, cachedPart) {

				var self = this;
				
				if ( _.isUndefined( override ) ) {
					

Load the cached api response from Redis

			  		client.get(bid+key, function (err, doc) {
			  			if (err || doc === null){ 
			  				self.finished = true;
			  				manager.enqueue('fulfillPart', api, key, true );
			  			} else {
				  			
				  			doc = JSON.parse( doc );
				  			doc.expires = new Date(doc.expires);
				  			if ( ('expires' in doc) && _.isDate(doc.expires) ) {
				  				var secleft = doc.expires.getSecondsBetween( new Date() ) * -1;
				  			}
				  			if (secleft < 0) {
				  				self.finished = true;
				  				manager.enqueue('fulfillPart', api, key, true, doc );
				  			} else {
				  				doc.fromcache = true;				  			
				  				manager.enqueue('finishRequest', doc );	
				  				self.finished = true;	 			
				  			}
				  		}
				  	}); 
				  	
				} else {

					if (_.has( api, 'auth')) { 

If the API request object has an auth scheme defined

						api.auth.type.authorize (api, bid, key, function( result ) { 
							if (result === true) {
								manager.enqueue('startRequest', api, key, cachedPart);
							} else {
								manager.enqueue('finishRequest', result );
							}
							self.finished = true;
						});
					} else {

Authentication is not needed

						self.finished = true;
						manager.enqueue('startRequest', api, key, cachedPart);
					}
						
				}
				
			}
		});
		
		manager.addJob('startRequest', {
			work: function( api, key, cachedPart ) {
				
				var self = this;
				
				if (_.has( api, 'timeout')) {
					self.timeout = setTimeout(function(self) {
						manager.enqueue('finishRequest', cachedPart );	
						if(_.has(cachedPart)) {
							cachedPart.timeout = true;
							cachedPart.fromcache = true;
						};
						self.finished = true;
					}, api.timeout, self)
				}
				
				api.resource( api.params, api.credentials, function( err, res ) {
			  			
		  			clearTimeout(self.timeout)
		  			delete self.timeout;
		  			
		  			if ( err ) {
		  				
		  				api.expires = ( new Date() );
			  			tout = _.isUndefined(cachedPart) ? {} : cachedPart;
			  			tout.cname = key;
			  			tout.expires = api.expires;
			  			tout.fromcache = true;
			  			tout.err = err;
		  				
		  			} else {
		  			

Filter the response

			  			if (_.has(api, 'filter')) {
				  			filter ( res, api.filter );
			  			}
			  			

Perform cleanup function on API response

			  			if (_.has(api, 'cleanup')) {
				  			res = api.cleanup(res);
			  			}
			  			

Build the stored response

			  			api.expires = ( new Date() ).addSeconds( api.cacheduration );
			  			bundle[key] = api;

client.set('bundle'+bid, JSON.stringify(bundle));

			  			var tout = {
			  				expires: api.expires,
			  				result: res,
			  				iid: api.iid,
			  				cname: key
			  			};
				   		

Save response to Redis

				   		client.set(bid+key, JSON.stringify(tout));
				   	}
					manager.enqueue('finishRequest', tout );	
	  				self.finished = true;
			  	});
			}
		})
		
		manager.addJob('finishRequest', {
			work: function(apiResponse) {
				
				queriesInThisBundle--;
				thisResponse[apiResponse.cname] = apiResponse;
			  	
			  	this.finished = true;
			}
		});
		
		manager.addJob('sendResponse', {
			work: function() {

Update the expiration date on the bundle

				var tout = {
					expires: _.min( thisResponse, function( val ) { return val.expires } ).expires, 
				};
				
				client.set('bundle'+bid, JSON.stringify(bundle));
				

Insert api responses into bundle

				_.each( thisResponse, function( val, idx ) {
					tout[val.cname] = val;
				});
				

Perform cleanup function on bundle

			  	if (_.has(bundle, 'cleanup')) {
				  	tout = bundle.cleanup(tout);
				}
	

Save cached bundle in Redis

				client.set('bid'+bid, JSON.stringify(tout));
				

Determine the seconds left before expiry

				if ( 'expires' in tout && _.isDate(tout.expires) ) {
					tout.secleft = tout.expires.getSecondsBetween( new Date() ) * -1;
				} else {
					tout.secleft = 3600;
				}
	

Send the results

				var responseType = callback ? 'application/javascript' : 'application/json';
				myRes.writeHead(200, {'Content-Type': responseType, 'max-age': tout.secleft, 'cache-control': 'public, max-age='+tout.secleft+', no-transform', "Expires": tout.expires });
				var tbuf = new Buffer(!callback ? JSON.stringify(tout) : callback + '(' + JSON.stringify(tout) + ');' );
				myRes.end(tbuf);
				
			}
		});
		
		_.each( bundle, function( api, key ) {	
			if (key !== 'cleanup') {
				if (_.isUndefined(api, 'credentials')) {
					api.credentials = {};
				}	
				manager.enqueue('fulfillPart', api, key);
			}
		});
		
	}
}