UNPKG

6.71 kBJavaScriptView Raw
1var generalConfigManager = require('./config-manager')(),
2 loadGlobalAuth = require('./global-auth'),
3 helpers = require('./helpers'),
4 jobDependencyManager = require('./job-dependencies/loader.js');
5/**
6 * Scheduler
7 *
8 * @param {boolean} options.logSuccessfulJob logs output when job is executed succesfully (verbose)
9 * @return {object} scheduler
10 */
11
12
13var scheduler = {
14 globalAuth: loadGlobalAuth(generalConfigManager.get('authentication-file-path')),
15 domain: require('domain').create(),
16 scheduleTasks:{},
17 configurations:{},
18 initialize: function() {
19 this.domain.on("error", function(error) {
20 console.error(error.stack);
21 });
22 },
23 removeJob: function(configKey) {
24 clearTimeout(this.scheduleTasks[configKey].timer);
25 },
26 updateConfig: function(configKey,config) {
27
28 if(this.scheduleTasks[configKey]) {
29 for(p in config) {
30 this.configurations[configKey][p] = config[p];
31 }
32
33 clearTimeout(this.scheduleTasks[configKey].timer);
34 this.scheduleTasks[configKey].f();
35 }
36
37 },
38 scheduleNext: function(job_worker, widgets) {
39 var thiz = this;
40 var f = function(){
41 thiz.schedule(job_worker, widgets);
42 };
43
44 var timer = setTimeout(f, job_worker.config.interval || 60 * 1000);
45
46
47 if(this.scheduleTasks[job_worker.widget_item.config]) {
48 clearTimeout(this.scheduleTasks[job_worker.widget_item.config].timer);
49 }
50
51
52 this.scheduleTasks[job_worker.widget_item.config] = {timer:timer, f:f};
53 },
54
55 schedule: function(job_worker, widgets) {
56
57 function handleError(err) {
58 job_worker.dependencies.logger.error('executed with errors: ' + err);
59
60 // in case of error retry in one third of the original interval or 1 min, whatever is lower
61 job_worker.config.interval = Math.min(job_worker.config.original_interval / 3, 60000);
62
63 // -------------------------------------------------------------
64 // Decide if we hold error notification according to widget config.
65 // if the retryOnErrorTimes property found in config, the error notification
66 // won´t be sent until we reach that number of consecutive errrors.
67 // This will prevent showing too many error when connection to flaky, unreliable
68 // servers.
69 // -------------------------------------------------------------
70 var sendError = true;
71 if (job_worker.firstRun === false) {
72 if (job_worker.config.retryOnErrorTimes) {
73 job_worker.retryOnErrorCounter = job_worker.retryOnErrorCounter || 0;
74 if (job_worker.retryOnErrorCounter <= job_worker.config.retryOnErrorTimes) {
75 job_worker.dependencies.logger.warn('widget with retryOnErrorTimes. attempts: '
76 + job_worker.retryOnErrorCounter);
77 sendError = false;
78 job_worker.retryOnErrorCounter++;
79 }
80 }
81 }
82 else {
83 // this is the first run for this job so if it fails, we want to inform immediately
84 // since it may be a configuration or dev related problem.
85 job_worker.firstRun = false;
86 }
87
88 if (sendError) {
89 widgets.sendData({error: err, config: {interval: job_worker.config.interval}});
90 }
91 }
92
93 var thiz = this;
94 var task = this.domain.bind(job_worker.task);
95
96 if (!job_worker.config.interval){
97 job_worker.config.interval = 60 * 1000; // default to 60 secs if not provided
98 }
99 else if (job_worker.config.interval < 1000){
100 job_worker.config.interval = 1000; // minium 1 sec
101 }
102
103 if (!job_worker.config.original_interval) // set original interval so we can get back to it
104 job_worker.config.original_interval = job_worker.config.interval;
105
106
107 // Save the job configuration in the singleton
108 if(!this.configurations[job_worker.widget_item.config]) {
109 this.configurations[job_worker.widget_item.config] = job_worker.config;
110 } else {
111 job_worker.config = this.configurations[job_worker.widget_item.config];
112 }
113
114 try {
115
116 // TODO
117 // - We are still passing job_worker.dependencies as a parameter for backwards compatibility
118 // but makes more sense to be passed as a property of job_worker.
119 // - The same with config
120 task.call(job_worker, job_worker.config, job_worker.dependencies, function(err, data){
121 if (err) {
122 handleError(err);
123 }
124 else {
125 job_worker.retryOnErrorCounter = 0; //reset error counter on success
126 if (!data) data = {};
127 data.config = {interval: job_worker.config.interval}; // give access to interval to client
128 widgets.sendData(data);
129 }
130 thiz.scheduleNext(job_worker, widgets);
131 });
132 }
133 catch (e) {
134 job_worker.dependencies.logger.error('Uncaught exception executing job: ' + e);
135 handleError(e);
136 thiz.scheduleNext(job_worker, widgets);
137 }
138 },
139
140 scheduleFirst: function(jobWorker,withDelay) {
141
142 var thiz = this;
143 // unique id for this widget
144 jobWorker.id = jobWorker.dashboard_name + "_" + jobWorker.widget_item.config + "_" +
145 jobWorker.widget_item.widget + "_" + jobWorker.widget_item.job;
146
147 var widgets = { sendData: function(data) { thiz.eventQueue.send(jobWorker.id, data); } };
148
149 // add security info
150 jobWorker.config.globalAuth = this.globalAuth;
151
152 if (jobWorker.widget_item.enabled !== false){
153 if (jobWorker.task){
154
155 // introduce a random delay on job initialization to avoid a concurrency peak on start
156 var rndDelay = helpers.getRndInt(0, 15000);
157
158 jobDependencyManager.fillDependencies(jobWorker, this.io, generalConfigManager);
159
160 setTimeout(function(){
161 //----------------------
162 // schedule job
163 //----------------------
164 thiz.schedule(jobWorker, widgets);
165
166 }, withDelay ? rndDelay:0);
167
168 }
169 else{
170 generalLogger.warn("no job task for " + eventId);
171 }
172 }
173 else { // job is disabled
174 widgets.sendData({error: 'disabled'});
175 }
176 }
177 };
178
179scheduler.initialize();
180
181module.exports.scheduler = scheduler;