UNPKG

6.66 kBJavaScriptView Raw
1var generalConfigManager = require('./config-manager')(),
2 loadGlobalAuth = require('./global-auth'),
3 helpers = require('./helpers'),
4 jobDependencyManager = require('./job-dependencies/loader.js');
5/**
6 * Scheduler
7 *
8 * @param {boolean} options.logSuccessfulJob logs output when job is executed succesfully (verbose)
9 * @return {object} scheduler
10 */
11
12
13var scheduler = {
14 globalAuth: loadGlobalAuth(generalConfigManager.get('authentication-file-path')),
15 domain: require('domain').create(),
16 scheduleTasks:{},
17 configurations:{},
18 initialize: function() {
19 this.domain.on("error", function(error) {
20 console.error(error.stack);
21 });
22 },
23 removeJob: function(configKey) {
24 clearTimeout(this.scheduleTasks[configKey].timer);
25 },
26 updateConfig: function(configKey,config) {
27
28 for(p in config) {
29 this.configurations[configKey][p] = config[p];
30 }
31
32
33
34 clearTimeout(this.scheduleTasks[configKey].timer);
35 this.scheduleTasks[configKey].f();
36 },
37 scheduleNext: function(job_worker, widgets) {
38 var thiz = this;
39 var f = function(){
40 thiz.schedule(job_worker, widgets);
41 };
42
43 var timer = setTimeout(f, job_worker.config.interval || 60 * 1000);
44
45
46 if(this.scheduleTasks[job_worker.widget_item.config]) {
47 clearTimeout(this.scheduleTasks[job_worker.widget_item.config].timer);
48 }
49
50
51 this.scheduleTasks[job_worker.widget_item.config] = {timer:timer, f:f};
52 },
53
54 schedule: function(job_worker, widgets) {
55
56 function handleError(err) {
57 job_worker.dependencies.logger.error('executed with errors: ' + err);
58
59 // in case of error retry in one third of the original interval or 1 min, whatever is lower
60 job_worker.config.interval = Math.min(job_worker.config.original_interval / 3, 60000);
61
62 // -------------------------------------------------------------
63 // Decide if we hold error notification according to widget config.
64 // if the retryOnErrorTimes property found in config, the error notification
65 // won´t be sent until we reach that number of consecutive errrors.
66 // This will prevent showing too many error when connection to flaky, unreliable
67 // servers.
68 // -------------------------------------------------------------
69 var sendError = true;
70 if (job_worker.firstRun === false) {
71 if (job_worker.config.retryOnErrorTimes) {
72 job_worker.retryOnErrorCounter = job_worker.retryOnErrorCounter || 0;
73 if (job_worker.retryOnErrorCounter <= job_worker.config.retryOnErrorTimes) {
74 job_worker.dependencies.logger.warn('widget with retryOnErrorTimes. attempts: '
75 + job_worker.retryOnErrorCounter);
76 sendError = false;
77 job_worker.retryOnErrorCounter++;
78 }
79 }
80 }
81 else {
82 // this is the first run for this job so if it fails, we want to inform immediately
83 // since it may be a configuration or dev related problem.
84 job_worker.firstRun = false;
85 }
86
87 if (sendError) {
88 widgets.sendData({error: err, config: {interval: job_worker.config.interval}});
89 }
90 }
91
92 var thiz = this;
93 var task = this.domain.bind(job_worker.task);
94
95 if (!job_worker.config.interval){
96 job_worker.config.interval = 60 * 1000; // default to 60 secs if not provided
97 }
98 else if (job_worker.config.interval < 1000){
99 job_worker.config.interval = 1000; // minium 1 sec
100 }
101
102 if (!job_worker.config.original_interval) // set original interval so we can get back to it
103 job_worker.config.original_interval = job_worker.config.interval;
104
105
106 // Save the job configuration in the singleton
107 if(!this.configurations[job_worker.widget_item.config]) {
108 this.configurations[job_worker.widget_item.config] = job_worker.config;
109 } else {
110 job_worker.config = this.configurations[job_worker.widget_item.config];
111 }
112
113 try {
114
115 // TODO
116 // - We are still passing job_worker.dependencies as a parameter for backwards compatibility
117 // but makes more sense to be passed as a property of job_worker.
118 // - The same with config
119 task.call(job_worker, job_worker.config, job_worker.dependencies, function(err, data){
120 if (err) {
121 handleError(err);
122 }
123 else {
124 job_worker.retryOnErrorCounter = 0; //reset error counter on success
125 if (!data) data = {};
126 data.config = {interval: job_worker.config.interval}; // give access to interval to client
127 widgets.sendData(data);
128 }
129 thiz.scheduleNext(job_worker, widgets);
130 });
131 }
132 catch (e) {
133 job_worker.dependencies.logger.error('Uncaught exception executing job: ' + e);
134 handleError(e);
135 thiz.scheduleNext(job_worker, widgets);
136 }
137 },
138
139 scheduleFirst: function(jobWorker,withDelay) {
140
141 var thiz = this;
142 // unique id for this widget
143 jobWorker.id = jobWorker.dashboard_name + "_" + jobWorker.widget_item.config + "_" +
144 jobWorker.widget_item.widget + "_" + jobWorker.widget_item.job;
145
146 var widgets = { sendData: function(data) { thiz.eventQueue.send(jobWorker.id, data); } };
147
148 // add security info
149 jobWorker.config.globalAuth = this.globalAuth;
150
151 if (jobWorker.widget_item.enabled !== false){
152 if (jobWorker.task){
153
154 // introduce a random delay on job initialization to avoid a concurrency peak on start
155 var rndDelay = helpers.getRndInt(0, 15000);
156
157 jobDependencyManager.fillDependencies(jobWorker, this.io, generalConfigManager);
158
159 setTimeout(function(){
160 //----------------------
161 // schedule job
162 //----------------------
163 thiz.schedule(jobWorker, widgets);
164
165 }, withDelay ? rndDelay:0);
166
167 }
168 else{
169 generalLogger.warn("no job task for " + eventId);
170 }
171 }
172 else { // job is disabled
173 widgets.sendData({error: 'disabled'});
174 }
175 }
176 };
177
178scheduler.initialize();
179
180module.exports.scheduler = scheduler;