1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 | var
|
13 | _datasources = require('../objects/datasources'),
|
14 | _datatables = require('../objects/datatables'),
|
15 | connector = require('../connectors/connector'),
|
16 | utils = require('../shared/utils'),
|
17 | mysql = require('./mysql'),
|
18 | mongo = require('./mongo'),
|
19 | zlib = require('zlib'),
|
20 | path = require('path');
|
21 |
|
22 | var createConnection = function (datatable) {
|
23 | var connection = null;
|
24 | switch (datatable.caching.system.type) {
|
25 | case 'mysql':
|
26 | joola.logger.debug('open connection to mysql ' + datatable.datasource.name);
|
27 | connection = mysql;
|
28 | break;
|
29 | case 'mongo':
|
30 | joola.logger.debug('open connection to mongo ' + datatable.datasource.name);
|
31 | connection = mongo;
|
32 | break;
|
33 | case 'mssql':
|
34 | joola.logger.debug('open connection to mssql ' + datatable.datasource.name);
|
35 | break;
|
36 | default:
|
37 | break;
|
38 | }
|
39 |
|
40 | return connection;
|
41 | };
|
42 |
|
43 | var exports = {};
|
44 |
|
45 | exports.flushCache = function (datatable, callback) {
|
46 | if (datatable.caching) {
|
47 | var db = createConnection(datatable);
|
48 | db.flushCache(datatable, callback);
|
49 | }
|
50 | else
|
51 | return callback();
|
52 | };
|
53 |
|
54 | exports.minCacheDate = function (datatable, callback) {
|
55 | var db = createConnection(datatable);
|
56 | db.minCacheDate(datatable, callback);
|
57 | };
|
58 |
|
59 | exports.maxCacheDate = function (datatable, callback) {
|
60 | var db = createConnection(datatable);
|
61 | db.maxCacheDate(datatable, callback);
|
62 | };
|
63 |
|
64 | exports.lock = function (key, rangeKey, query, callback) {
|
65 | var startTimer = new Date().getTime();
|
66 | var _self = this;
|
67 | var lockKey = query.lockKey;
|
68 | var lockRangeKey = function (key, value, callback) {
|
69 | joola.cache.save('locks', key.clean(), value, joola.config.integration.cache.lockTimeout, function (err) {
|
70 | joola.cache.load('locks', key.clean(), function (err, _value) {
|
71 | if (_value && _value.lockKey == value.lockKey) {
|
72 | joola.logger.silly('RangeKey [' + 'joola:cachedTable:' + key.clean() + ':' + value.lockKey + '][tout:' + joola.config.integration.cache.lockTimeout + '] locked!');
|
73 | return callback(true);
|
74 | }
|
75 | else {
|
76 | joola.logger.error('Locked by another node [' + key + ']');
|
77 | return callback(false);
|
78 |
|
79 | }
|
80 | })
|
81 | });
|
82 | };
|
83 |
|
84 | joola.cache.load('locks', key.clean(), function (err, value) {
|
85 | if (value) {
|
86 | if (value.lockKey != null && value.lockKey != lockKey) {
|
87 | if (value.nextLock != lockKey) {
|
88 | joola.logger.silly('Setting nextLock [' + lockKey + ']');
|
89 | value.nextLock = lockKey;
|
90 |
|
91 |
|
92 |
|
93 | }
|
94 | setTimeout(function () {
|
95 | if (new Date() - startTimer % 100 == 0)
|
96 | joola.logger.silly('Waiting on cache for table [' + lockKey + ']');
|
97 | if (new Date() - startTimer > joola.config.integration.cache.waitOnCache) {
|
98 | joola.logger.debug('Timeout while waiting for cache [' + lockKey + ']');
|
99 | return callback(false);
|
100 | }
|
101 | else
|
102 | _self.lock(key, rangeKey, query, callback);
|
103 | }, joola.config.integration.cache.waitInterval);
|
104 | }
|
105 | else if (value.lockKey == lockKey) {
|
106 | return callback(true, lockKey);
|
107 | }
|
108 | else if (value.lockKey == null) {
|
109 | value = {};
|
110 | value.lockKey = lockKey;
|
111 | value.nextLock = null;
|
112 | lockRangeKey(key, value, function (success) {
|
113 | if (success) {
|
114 | return callback(true, lockKey);
|
115 | }
|
116 | else
|
117 | return callback(false);
|
118 | });
|
119 | }
|
120 | else {
|
121 | return callback(false);
|
122 | }
|
123 | }
|
124 | else {
|
125 | value = {};
|
126 | value.lockKey = lockKey;
|
127 | value.nextLock = null;
|
128 | lockRangeKey(key, value, function (success) {
|
129 | if (success)
|
130 | return callback(true, lockKey);
|
131 | else
|
132 | return callback(false);
|
133 | });
|
134 | }
|
135 | });
|
136 | };
|
137 |
|
138 | exports.unlock = function (key, rangeKey, lockKey, callback) {
|
139 | var unlockRangeKey = function (key, value, callback) {
|
140 | joola.cache.save('locks', key.clean(), value, function () {
|
141 | joola.logger.silly('RangeKey [' + 'joola:cachedTable:' + key.clean() + ':' + lockKey + '] unlocked!');
|
142 | return callback();
|
143 | });
|
144 | };
|
145 |
|
146 | joola.cache.load('locks', key.clean(), function (error, value) {
|
147 | if (value) {
|
148 | if (value.lockKey == lockKey) {
|
149 | value.lockKey = value.nextLock;
|
150 | value.nextLock = null;
|
151 | unlockRangeKey(key, value, function () {
|
152 | return callback();
|
153 | });
|
154 | }
|
155 | else {
|
156 |
|
157 | return callback();
|
158 | }
|
159 | }
|
160 | else {
|
161 |
|
162 | return callback();
|
163 | }
|
164 |
|
165 | });
|
166 | };
|
167 |
|
168 | exports.cacheTable = function (datatable, query, callback) {
|
169 | var _self = this;
|
170 |
|
171 | |
172 |
|
173 |
|
174 |
|
175 |
|
176 |
|
177 |
|
178 | var ds = datatable.datasource;
|
179 |
|
180 | query.lockKey = require('node-uuid').v4();
|
181 | query.startdate = query.startdate.fixDate(true, true);
|
182 | query.enddate = query.enddate.fixDate(true, true);
|
183 |
|
184 | _self.lock(datatable.id, '', query, function (success, lockKey) {
|
185 | if (!success) {
|
186 | joola.logger.error('Failed to obtain lock on [' + lockKey + '].');
|
187 |
|
188 | return callback();
|
189 | }
|
190 | _self.check(datatable, query, true, function () {
|
191 | if (query.cache.state == 'match') {
|
192 | joola.logger.silly('Table [' + datatable.id + '] for range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff') + '] already available.');
|
193 | _self.unlock(datatable.id, '', lockKey, function () {
|
194 | return callback(null);
|
195 | });
|
196 | return;
|
197 | }
|
198 |
|
199 | if (query.cache.state == 'extendleft') {
|
200 | query.extension = 'left';
|
201 | query.enddate = ce.clone(query.cache.range.startdate);
|
202 | query.enddate.setMilliseconds(query.enddate.getMilliseconds() - 1);
|
203 | joola.logger.debug('Table [' + datatable.id + '] can be extended to the left, new enddate: ' + query.enddate);
|
204 | }
|
205 | else if (query.cache.state == 'extendright') {
|
206 | query.extension = 'right';
|
207 | query.startdate = ce.clone(query.cache.range.enddate);
|
208 | query.startdate.setMilliseconds(query.startdate.getMilliseconds() + 1);
|
209 | joola.logger.debug('Table [' + datatable.id + '] can be extended to the right, new startdate: ' + query.startdate);
|
210 | }
|
211 | else if (query.cache.state == 'extendboth') {
|
212 | joola.logger.debug('Table [' + datatable.id + '] can be extended in both directions.');
|
213 | query.extension = 'both';
|
214 | var _basequery = ce.clone(query);
|
215 | var _query = ce.clone(query);
|
216 | _query.enddate = ce.clone(_basequery.cache.range.startdate);
|
217 | _query.enddate.setMilliseconds(_basequery.enddate.getMilliseconds() - 1);
|
218 | query = [];
|
219 | query.push(ce.clone(_query));
|
220 | _query = ce.clone(_basequery);
|
221 | _query.startdate = ce.clone(_basequery.cache.range.enddate);
|
222 | _query.startdate.setMilliseconds(_basequery.startdate.getMilliseconds() + 1);
|
223 | query.push(ce.clone(_query));
|
224 | }
|
225 | else {
|
226 | joola.logger.debug('Table [' + datatable.id + '] check/extend check found nothing.');
|
227 | }
|
228 |
|
229 | if (!Array.isArray(query)) {
|
230 | var _query = ce.clone(query);
|
231 | query = [];
|
232 | query.push(_query);
|
233 | }
|
234 | query.forEach(function (q) {
|
235 | q.startdate = utils.formatDate(q.startdate, 'yyyy-mm-dd hh:nn:ss.fff', true);
|
236 | q.enddate = utils.formatDate(q.enddate, 'yyyy-mm-dd hh:nn:ss.fff', true);
|
237 |
|
238 | q.sql = q.sql.replace('%STARTDATE%', q.startdate);
|
239 | q.sql = q.sql.replace('%ENDDATE%', q.enddate);
|
240 |
|
241 | q.datatable = datatable;
|
242 | q.datasource = ds;
|
243 | q.limit = q.datasource.limit;
|
244 | q.lockKey = require('node-uuid').v4();
|
245 |
|
246 |
|
247 |
|
248 | var calls = [];
|
249 |
|
250 | if (q.startdate > q.enddate) {
|
251 | joola.logger.debug('Avoiding query due to mismatching start/end dates');
|
252 | }
|
253 | else {
|
254 | var call = function (callback) {
|
255 | joola.logger.info('Executing source query [' + datatable.id + '] for range [' + q.startdate + ']-[' + q.enddate + '][lim:' + q.limit + ']...');
|
256 |
|
257 | connector.executeQuery(q, function (query, rows, fields, err) {
|
258 | if (err) {
|
259 | return callback(err);
|
260 | }
|
261 |
|
262 | var data = {};
|
263 | data.rows = rows.rows;
|
264 | data.fields = rows.fields;
|
265 |
|
266 | if (data.rows.length == 0) {
|
267 | joola.logger.debug('Nothing to do here.');
|
268 | return callback(null);
|
269 | }
|
270 | else {
|
271 | |
272 |
|
273 |
|
274 |
|
275 |
|
276 |
|
277 |
|
278 |
|
279 |
|
280 |
|
281 |
|
282 |
|
283 |
|
284 |
|
285 |
|
286 |
|
287 |
|
288 |
|
289 |
|
290 |
|
291 |
|
292 |
|
293 |
|
294 |
|
295 |
|
296 |
|
297 |
|
298 |
|
299 |
|
300 |
|
301 |
|
302 |
|
303 | joola.logger.debug('Query for cache data on table [' + datatable.id + '] has finished (' + data.rows.length + '), saving to cache store...');
|
304 | if (data.rows.length > 0) {
|
305 | _self.saveData(datatable, query, data, function (datatable, data, err) {
|
306 | return callback(err);
|
307 | });
|
308 | }
|
309 |
|
310 | }
|
311 | });
|
312 | };
|
313 | calls.push(call);
|
314 | }
|
315 | fork(calls, function (err) {
|
316 | _self.unlock(datatable.id, query[0].cache.range.key, lockKey, function () {
|
317 | return callback(err);
|
318 | });
|
319 | });
|
320 | });
|
321 | });
|
322 | });
|
323 | };
|
324 |
|
325 | exports.check = function (datatable, query, wait, callback) {
|
326 | var _self = this;
|
327 | joola.logger.debug('Checking table [' + datatable.id + '] for range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff') + '].');
|
328 |
|
329 | query.cache = {};
|
330 | joola.cache.load('tables', datatable.id.clean(), function (error, value) {
|
331 | if (value) {
|
332 | _.each(value.ranges, function (range) {
|
333 | range.startdate = new Date(range.startdate);
|
334 | range.enddate = new Date(range.enddate);
|
335 |
|
336 | joola.logger.debug('Found cached table [' + datatable.id + '] with range [' + utils.formatDate(range.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + '] - [' + utils.formatDate(range.enddate, 'yyyy-mm-dd hh:nn:ss.fff') + ']');
|
337 |
|
338 |
|
339 |
|
340 |
|
341 | if (range.startdate <= query.startdate && range.enddate >= query.enddate) {
|
342 | query.cache.range = range;
|
343 | query.cache.state = 'match';
|
344 | return callback();
|
345 | }
|
346 |
|
347 | else if (query.startdate <= range.startdate && query.enddate >= range.enddate) {
|
348 | query.cache.range = range;
|
349 | query.cache.state = 'extendboth';
|
350 | }
|
351 |
|
352 | else if (query.startdate < range.startdate && range.enddate >= query.enddate) {
|
353 |
|
354 | query.cache.range = range;
|
355 | query.cache.state = 'extendleft';
|
356 | }
|
357 |
|
358 | else if (range.startdate <= query.startdate && range.enddate < query.enddate) {
|
359 |
|
360 | query.cache.range = range;
|
361 | query.cache.state = 'extendright';
|
362 | }
|
363 | });
|
364 |
|
365 | if (query.cache.state != 'match') {
|
366 | if (!query.cache.range) {
|
367 | query.cache.range = {};
|
368 | query.cache.range.key = utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff').clean() + '_' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff').clean();
|
369 | query.cache.range.startdate = query.startdate;
|
370 | query.cache.range.enddate = query.enddate;
|
371 | }
|
372 | return callback();
|
373 | }
|
374 | }
|
375 | else {
|
376 | query.cache = {};
|
377 | query.cache.range = {};
|
378 | query.cache.range.key = utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff').clean() + '_' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff').clean();
|
379 |
|
380 | query.cache.range.startdate = query.startdate;
|
381 | query.cache.range.enddate = query.enddate;
|
382 |
|
383 | if (wait)
|
384 | query.cache.range.state = 'building';
|
385 | value = {ranges: [query.cache.range]};
|
386 | joola.cache.save('tables', datatable.id.clean(), value, function (error, value) {
|
387 |
|
388 | return callback();
|
389 | });
|
390 | }
|
391 | });
|
392 | };
|
393 |
|
394 | exports.saveData = function (datatable, query, data, callback) {
|
395 | joola.logger.debug('Saving cache data for [' + datatable.id + ']:[ext:' + query.extension + ']...');
|
396 | var db = createConnection(datatable);
|
397 | db.saveData(query, datatable, data, function (error, firstDate, lastDate) {
|
398 | if (error) {
|
399 | return callback(datatable, data, error);
|
400 | }
|
401 | else if (data) {
|
402 | data = data.rows;
|
403 | joola.logger.debug('Cache engine finished storing [' + datatable.id + '], found [' + (data ? data.length : 0) + '] documents, [' + firstDate + ']:[' + lastDate + '].');
|
404 |
|
405 | if (data && data.length > 0 && typeof(firstDate) != 'undefined' && firstDate != null && typeof(lastDate) != 'undefined' && lastDate != null) {
|
406 | var key = 'joola:cachedTable:' + datatable.id.clean();
|
407 |
|
408 | var save = function (key, value, range) {
|
409 | if (!query.balanced) {
|
410 | joola.cache.save('tables', datatable.id.clean(), value, null, function () {
|
411 | joola.logger.debug('Cache key saved for [' + datatable.id.clean() + '] with range [' + utils.formatDate(range.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + ']-[' + utils.formatDate(range.enddate, 'yyyy-mm-dd hh:nn:ss') + '].');
|
412 | return callback(datatable, data, error);
|
413 | });
|
414 | }
|
415 | else
|
416 | return callback(datatable, data, error);
|
417 | };
|
418 | joola.cache.load('tables', datatable.id.clean(), function (error, value) {
|
419 | if (value && query.cache.range.key) {
|
420 |
|
421 | var _range = _.find(value.ranges, function (range) {
|
422 | return range.key == query.cache.range.key;
|
423 | });
|
424 |
|
425 | if (!_range) {
|
426 |
|
427 | _range = {};
|
428 | value.ranges.push(_range);
|
429 | }
|
430 | else {
|
431 |
|
432 |
|
433 | }
|
434 |
|
435 | _range.extension = query.extension;
|
436 | switch (query.extension) {
|
437 | case null:
|
438 | _range.startdate = firstDate
|
439 | _range.enddate = lastDate
|
440 | break;
|
441 | case 'left':
|
442 | _range.startdate = firstDate
|
443 | break;
|
444 | case 'right':
|
445 | _range.enddate = lastDate
|
446 | break;
|
447 | case 'both':
|
448 | _range.startdate = query.startdate
|
449 | _range.enddate = query.enddate
|
450 | break;
|
451 | default:
|
452 | break;
|
453 | }
|
454 |
|
455 | |
456 |
|
457 |
|
458 |
|
459 |
|
460 |
|
461 |
|
462 |
|
463 |
|
464 |
|
465 |
|
466 |
|
467 |
|
468 |
|
469 |
|
470 |
|
471 |
|
472 |
|
473 |
|
474 |
|
475 |
|
476 |
|
477 |
|
478 | _range.state = 'ready';
|
479 | _range.key = _range.startdate.clean() + '_' + _range.enddate.clean();
|
480 | _range.debug = '1';
|
481 | save(key, value, _range);
|
482 | }
|
483 | else if (value) {
|
484 | var rangeToAdd =
|
485 | {
|
486 | key: firstDate.clean() + '_' + lastDate.clean(),
|
487 | startdate: firstDate,
|
488 | enddate: lastDate,
|
489 | state: 'ready',
|
490 | debug: '2'
|
491 | };
|
492 | value.ranges.push(rangeToAdd);
|
493 | save(key, value, rangeToAdd);
|
494 | }
|
495 | else {
|
496 | var value = {
|
497 | ranges: [
|
498 | {
|
499 | key: firstDate.clean() + '_' + lastDate.clean(),
|
500 | startdate: firstDate,
|
501 | enddate: lastDate,
|
502 | state: 'ready',
|
503 | debug: '3'
|
504 | }
|
505 | ]};
|
506 | save(key, value, value.ranges[0]);
|
507 | }
|
508 | });
|
509 | }
|
510 | else {
|
511 |
|
512 | return callback(datatable, data, error);
|
513 | }
|
514 |
|
515 | }
|
516 | else {
|
517 | return callback(datatable, data, error);
|
518 | }
|
519 |
|
520 | });
|
521 | };
|
522 |
|
523 | exports.fetch = function (query, callback) {
|
524 | joola.logger.debug('Fetch from cache [' + query.datatable.id + '] with range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss') + ']');
|
525 | try {
|
526 | if (!query.datatable.datasource) query.datatable.datasource = _datasources.get(query.datatable.datasourceid);
|
527 | var db = createConnection(query.datatable);
|
528 |
|
529 | db.fetch(query, callback);
|
530 | }
|
531 | catch (ex) {
|
532 | return callback(ex, arguments);
|
533 | }
|
534 | };
|
535 |
|
536 | exports.eagerCache = function (dt, callback) {
|
537 | var self = this;
|
538 | joola.logger.debug('Eager cache running for table [' + dt.id + ']...');
|
539 | var _ds;
|
540 |
|
541 | var maxDate = null;
|
542 |
|
543 |
|
544 | _ds = _.find(joola.config.integration.datasources, function (ds) {
|
545 | return ds.id == dt.datasource.id;
|
546 | });
|
547 | if (_ds && _ds.lastEagerCache) {
|
548 | joola.logger.debug('Missing max cache date on [' + dt.id + '], setting to last used: ' + _ds.lastEagerCache);
|
549 | maxDate = _ds.lastEagerCache;
|
550 | }
|
551 | else {
|
552 | joola.logger.debug('Missing max cache date on [' + dt.id + '], setting to: ' + dt.datasource.enddate.value);
|
553 | maxDate = dt.datasource.enddate.value;
|
554 | if (_ds)
|
555 | _ds.lastEagerCache = new Date(maxDate);
|
556 | }
|
557 |
|
558 | var _todate = new Date(maxDate);
|
559 | _todate.setMilliseconds(_todate.getMilliseconds() + dt.caching.eager.step);
|
560 |
|
561 | maxDate = new Date(_todate);
|
562 | if (_ds)
|
563 | _ds.lastEagerCache = new Date(maxDate);
|
564 |
|
565 | joola.logger.debug('Processing ' + dt.name + '[' + maxDate + ']:[' + _todate + ']');
|
566 |
|
567 | var query = connector.createQuery();
|
568 | query.type = 'eager';
|
569 | dt.query = _datatables.basequery(dt);
|
570 |
|
571 | var basequery = dt.query;
|
572 | query.sql = basequery.sql;
|
573 | query.datasource = dt.datasource;
|
574 |
|
575 | var enddate = ce.clone(_todate);
|
576 | var startdate = ce.clone(maxDate);
|
577 | startdate.setMilliseconds(startdate.getMilliseconds() + 1);
|
578 |
|
579 | query.enddate = enddate;
|
580 | query.startdate = startdate;
|
581 |
|
582 | self.cacheTable(dt, ce.clone(query), function (err) {
|
583 | return callback(err);
|
584 | });
|
585 |
|
586 | };
|
587 |
|
588 |
|
589 | module.exports = exports; |
\ | No newline at end of file |