UNPKG

4.33 kBJavaScriptView Raw
1(function() {
2 var DoLater, EventEmitter, mongoPool, now;
3
4 mongoPool = require('mongo-pool2');
5
6 EventEmitter = require('events').EventEmitter;
7
8 now = function() {
9 return +new Date();
10 };
11
12 module.exports = DoLater = (function() {
13 function DoLater(config) {
14 this.config = config;
15 this._runningCheck = false;
16 this._poolReady = false;
17 this._buffer = [];
18 this._eventsHub = new EventEmitter;
19 this._jobNames = {};
20 mongoPool.create(this.config, this._onPoolReady.bind(this));
21 this._checkInterval = this.config.interval || 1000;
22 this._pollInterval = 100;
23 }
24
25 DoLater.prototype.doLater = function(waitSec, jobName) {
26 var job, params, tm;
27 params = Array.prototype.slice.call(arguments, 1);
28 tm = now();
29 console.log('DoLater adding job', jobName, 'at', Date(tm));
30 job = {
31 createdAt: tm,
32 when: tm + waitSec * 1000,
33 jobName: jobName,
34 params: params
35 };
36 if (!this._poolReady) {
37 return this._addBuffered(job);
38 } else {
39 return this._addJob(job);
40 }
41 };
42
43 DoLater.prototype._addBuffered = function(job) {
44 return this._buffer.push(job);
45 };
46
47 DoLater.prototype._onPoolReady = function(err, pool) {
48 var jobSpec, _i, _len, _ref;
49 if (err) {
50 throw err;
51 }
52 this._pool = pool;
53 this._poolReady = true;
54 _ref = this._buffer;
55 for (_i = 0, _len = _ref.length; _i < _len; _i++) {
56 jobSpec = _ref[_i];
57 this._addJob(jobSpec);
58 }
59 this._buffer = [];
60 return this._check();
61 };
62
63 DoLater.prototype._coll = function() {
64 var conn;
65 conn = this._pool.acquire();
66 return conn.collection(this.config.collection);
67 };
68
69 DoLater.prototype._addJob = function(job) {
70 var coll;
71 if (!this._poolReady) {
72 return this._addBuffered(job);
73 }
74 coll = this._coll();
75 return coll.insert(job, function() {
76 var tm;
77 tm = now();
78 return console.log('DoLater job', job.jobName, 'added at', Date(tm));
79 });
80 };
81
82 DoLater.prototype._check = function() {
83 var checkFn, coll, query, sort;
84 checkFn = this._check.bind(this);
85 if (this._runningCheck) {
86 return setTimeout(checkFn, this._pollInterval);
87 }
88 this._runningCheck = true;
89 coll = this._coll();
90 query = {
91 when: {
92 $lt: now()
93 },
94 jobName: {
95 $in: this.jobNames()
96 }
97 };
98 sort = [['when', 1], ['createdAt', 1]];
99 return coll.findAndRemove(query, sort, (function(_this) {
100 return function(err, job) {
101 _this._runningCheck = false;
102 if (err) {
103 console.log('DoLater error', err);
104 return setTimeout(checkFn, _this._pollInterval);
105 } else if (job) {
106 _this._onJob(job);
107 return setTimeout(checkFn, _this._pollInterval);
108 } else {
109 return setTimeout(checkFn, _this._checkInterval);
110 }
111 };
112 })(this));
113 };
114
115 DoLater.prototype._onJob = function(job) {
116 var tm;
117 tm = now();
118 console.log('DoLater executing job', job.jobName, 'at', Date(tm));
119 return this._eventsHub.emit.apply(this._eventsHub, job.params);
120 };
121
122 DoLater.prototype.on = function(jobName, fn) {
123 var _base;
124 if ((_base = this._jobNames)[jobName] == null) {
125 _base[jobName] = 0;
126 }
127 this._jobNames[jobName] += 1;
128 return this._eventsHub.on(jobName, fn);
129 };
130
131 DoLater.prototype.off = function(jobName, fn) {
132 if (fn != null) {
133 this._jobNames[jobName] -= 1;
134 if (!this._jobNames[jobName]) {
135 delete this._jobNames[jobName];
136 }
137 return this._eventsHub.removeListener(jobName, fn);
138 } else {
139 if (jobName) {
140 delete this._jobNames[jobName];
141 } else {
142 this._jobNames = {};
143 }
144 return this._eventsHub.removeAllListeners(jobName);
145 }
146 };
147
148 DoLater.prototype.jobNames = function() {
149 return Object.keys(this._jobNames);
150 };
151
152 return DoLater;
153
154 })();
155
156 DoLater.SECOND = 1;
157
158 DoLater.MINUTE = DoLater.SECOND * 60;
159
160 DoLater.HOUR = DoLater.MINUTE * 60;
161
162 DoLater.DAY = DoLater.HOUR * 24;
163
164}).call(this);