Coverage

100%
168
168
0

/Users/ryan/Dev/node/agenda/lib/agenda.js

100%
107
107
0
LineHitsSource
11var Job = require('./job.js'),
2 humanInterval = require('human-interval'),
3 mongo = require('mongoskin');
4
51var Agenda = module.exports = function(config) {
63 if(!(this instanceof Agenda)) return new Agenda(config);
71 if(!config) config = {};
81 this._processEvery = humanInterval(config.processEvery) || humanInterval('5 seconds');
91 this._defaultConcurrency = config.defaultConcurrency || 5;
101 this._maxConcurrency = config.maxConcurrency || 20;
111 this._definitions = {};
121 this._runningJobs = 0;
131 if(config.db)
141 this.database(config.db.address, config.db.collection);
15};
16
17// Configuration Methods
18
191Agenda.prototype.database = function(url, collection) {
204 collection = collection || 'agendaJobs';
214 this._db = mongo.db(url, {w: 0}).collection(collection);
224 return this;
23};
24
251Agenda.prototype.processEvery = function(time) {
263 this._processEvery = humanInterval(time);
273 return this;
28};
29
301Agenda.prototype.maxConcurrency = function(num) {
312 this._maxConcurrency = num;
322 return this;
33};
34
351Agenda.prototype.defaultConcurrency = function(num) {
362 this._defaultConcurrency = num;
372 return this;
38};
39
40// Job Methods
411Agenda.prototype.create = function(name, data) {
4214 var priority = this._definitions[name] ? this._definitions[name].priority : 0;
4314 var job = new Job({name: name, data: data, type: 'normal', priority: priority, agenda: this});
4414 return job;
45};
46
471Agenda.prototype.jobs = function() {
482 var args = Array.prototype.slice.call(arguments);
492 var self = this;
50
512 if(typeof args[args.length -1] == 'function') {
522 var fn = args.pop();
532 var wrapJobs = function(err, jobs) {
542 if(err) fn(err, null);
55 else {
562 jobs = jobs.map(function(j) {
572 j.agenda = self;
582 return new Job(j);
59 });
602 fn(err, jobs);
61 }
62 };
632 args.push(wrapJobs);
64 }
65
662 return this._db.findItems.apply(this._db, args);
67};
68
691Agenda.prototype.define = function(name, options, processor) {
705 if(!processor) {
714 processor = options;
724 options = {};
73 }
745 this._definitions[name] = {
75 fn: processor,
76 concurrency: options.concurrency || this._defaultConcurrency,
77 priority: options.priority || 0,
78 running: 0
79 };
80};
81
821Agenda.prototype.every = function(interval, name, data) {
834 var job;
844 job = this.create(name, data);
854 job.attrs.type = 'single';
864 job.repeatEvery(interval);
874 job.save();
884 return job;
89};
90
911Agenda.prototype.schedule = function(when, name, data) {
922 var job = this.create(name, data);
932 job.schedule(when);
942 job.save();
952 return job;
96};
97
981Agenda.prototype.saveJob = function(job, cb) {
9913 var fn = cb;
10013 var props = getJobProperties(job);
10113 if(props.type == 'single')
1025 this._db.findAndModify({name: props.name, type: 'single'}, {}, {$set: props}, {upsert: true, new: true}, processDbResult);
103 else {
1048 var query = {};
1058 if(job.attrs._id) query._id = job.attrs._id;
1068 this._db.findAndModify(query, {$set: props}, {upsert: true, new: true}, processDbResult);
107 }
108
10913 function processDbResult(err, res) {
11012 if(err) throw(err);
11112 else if(res && typeof res == 'object')
11212 job.attrs._id = res._id;
113
11412 if(fn)
1155 fn(err, job);
116 }
117};
118
119// Job Flow Methods
120
1211Agenda.prototype.start = function() {
1221 var self = this;
1231 this._processInterval = setInterval(function() {
1241 processJobs.call(self);
125 }, this._processEvery);
126};
127
1281Agenda.prototype.stop = function() {
1291 clearInterval(this._processInterval);
1301 this._processInterval = undefined;
131};
132
1331function processJobs() {
1341 var definitions = this._definitions,
135 self = this;
1361 var now = new Date();
1371 this.jobs({nextRunAt: {$lte: now}}, {sort: {'priority': -1}}, function(err, jobs) {
1381 if(err) throw(err);
1391 else fillJobQueue();
140
1411 function fillJobQueue() {
1421 if(!jobs.length) return;
1431 while(self._runningJobs < self._maxConcurrency) {
1442 var job = getNextProcessableJob();
1452 if(job) {
1461 job.run(fillJobQueue);
147 } else
1481 break;
149 }
150 }
151
1521 function getNextProcessableJob() {
1532 var definition, job, index;
1542 for(index = 0; index < jobs.length; ++index) {
1551 definition = definitions[jobs[index].attrs.name];
1561 if(definition.concurrency > definition.running) {
1571 job = jobs[index];
1581 break;
159 }
160 }
1613 if(job) return jobs.splice(index, 1)[0];
1621 return undefined;
163 }
164 });
165}
166
1671function getJobProperties(job) {
16813 var props = {};
16913 for(var key in job.attrs) {
170119 props[key] = job.attrs[key];
171 }
17213 delete props._id;
17313 return props;
174}
175

/Users/ryan/Dev/node/agenda/lib/job.js

100%
61
61
0
LineHitsSource
11var humanInterval = require('human-interval'),
2 date = require('date.js');
3
41var Job = module.exports = function Job(args) {
527 args = args || {};
627 this.agenda = args.agenda;
7
827 var attrs = {};
927 attrs._id = args._id;
1027 attrs.name = args.name;
1127 attrs.data = args.data;
1227 attrs.priority = parsePriority(args.priority) || 0;
13
14 // Set some times
1527 attrs.lastRunAt = args.lastRunAt;
1627 attrs.lastFinishedAt = args.lastFinishedAt;
1727 attrs.nextRunAt = args.nextRunAt || new Date();
18
1927 attrs.repeatInterval = args.repeatInterval;
2027 attrs.type = args.type || 'once';
21
2227 this.attrs = attrs;
23};
24
251Job.prototype.repeatEvery = function(interval) {
267 this.attrs.repeatInterval = humanInterval(interval);
277 return this;
28};
29
301Job.prototype.schedule = function(time) {
314 this.attrs.nextRunAt = date(time);
324 return this;
33};
34
351Job.prototype.priority = function(priority) {
363 this.attrs.priority = parsePriority(priority);
373 return this;
38};
39
401Job.prototype.fail = function(reason) {
413 this.attrs.failReason = reason;
423 this.attrs.failedAt = new Date();
433 return this;
44};
45
461Job.prototype.run = function(cb) {
475 var self = this,
48 agenda = self.agenda,
49 definition = agenda._definitions[self.attrs.name];
50
515 agenda._runningJobs++;
525 definition.running++;
53
545 process.nextTick(function() {
555 var now = new Date();
565 self.attrs.lastRunAt = now;
575 if(self.attrs.repeatInterval) {
582 self.attrs.nextRunAt = new Date(now.valueOf() + self.attrs.repeatInterval);
59 } else {
603 self.attrs.nextRunAt = undefined;
61 }
625 try {
635 definition.fn(self, function() {
644 now = new Date();
654 agenda._runningJobs--;
664 definition.running--;
674 self.attrs.lastFinishedAt = now;
684 self.save(cb);
69 });
70 } catch(e) {
711 now = new Date();
721 self.fail(e.message);
731 agenda._runningJobs--;
741 definition.running--;
751 self.attrs.lastFinishedAt = now;
761 self.save(function(err, job) {
771 if(cb)
781 cb(e, job);
79 });
80 }
81 });
82};
83
841Job.prototype.save = function(cb) {
8514 this.agenda.saveJob(this, cb);
8614 return this;
87};
88
891function parsePriority(priority) {
9030 var priorityMap = {
91 lowest: -20,
92 low: -10,
93 normal: 0,
94 high: 10,
95 highest: 20
96 };
9730 if(typeof priority == 'number')
9817 return priority;
99 else
10013 return priorityMap[priority];
101}
102