UNPKG

22 kBJavaScriptView Raw
1/**
2 * joola.io
3 *
4 * Copyright Joola Smart Solutions, Ltd. <info@joo.la>
5 *
6 * Licensed under GNU General Public License 3.0 or later.
7 * Some rights reserved. See LICENSE, AUTHORS.
8 *
9 * @license GPL-3.0+ <http://spdx.org/licenses/GPL-3.0+>
10 */
11
12var
13 _datasources = require('../objects/datasources'),
14 _datatables = require('../objects/datatables'),
15 connector = require('../connectors/connector'),
16 utils = require('../shared/utils'),
17 mysql = require('./mysql'),
18 mongo = require('./mongo'),
19 zlib = require('zlib'),
20 path = require('path');
21
22var createConnection = function (datatable) {
23 var connection = null;
24 switch (datatable.caching.system.type) {
25 case 'mysql':
26 joola.logger.debug('open connection to mysql ' + datatable.datasource.name);
27 connection = mysql;
28 break;
29 case 'mongo':
30 joola.logger.debug('open connection to mongo ' + datatable.datasource.name);
31 connection = mongo;
32 break;
33 case 'mssql':
34 joola.logger.debug('open connection to mssql ' + datatable.datasource.name);
35 break;
36 default:
37 break;
38 }
39
40 return connection;
41};
42
43var exports = {};
44
45exports.flushCache = function (datatable, callback) {
46 if (datatable.caching) {
47 var db = createConnection(datatable);
48 db.flushCache(datatable, callback);
49 }
50 else
51 return callback();
52};
53
54exports.minCacheDate = function (datatable, callback) {
55 var db = createConnection(datatable);
56 db.minCacheDate(datatable, callback);
57};
58
59exports.maxCacheDate = function (datatable, callback) {
60 var db = createConnection(datatable);
61 db.maxCacheDate(datatable, callback);
62};
63
64exports.lock = function (key, rangeKey, query, callback) {
65 var startTimer = new Date().getTime();
66 var _self = this;
67 var lockKey = query.lockKey;
68 var lockRangeKey = function (key, value, callback) {
69 joola.cache.save('locks', key.clean(), value, joola.config.integration.cache.lockTimeout, function (err) {
70 joola.cache.load('locks', key.clean(), function (err, _value) {
71 if (_value && _value.lockKey == value.lockKey) {
72 joola.logger.silly('RangeKey [' + 'joola:cachedTable:' + key.clean() + ':' + value.lockKey + '][tout:' + joola.config.integration.cache.lockTimeout + '] locked!');
73 return callback(true);
74 }
75 else {
76 joola.logger.error('Locked by another node [' + key + ']');
77 return callback(false);
78 //throw new Error("Locked by another node");
79 }
80 })
81 });
82 };
83
84 joola.cache.load('locks', key.clean(), function (err, value) {
85 if (value) {
86 if (value.lockKey != null && value.lockKey != lockKey) {
87 if (value.nextLock != lockKey) {
88 joola.logger.silly('Setting nextLock [' + lockKey + ']');
89 value.nextLock = lockKey;
90 //cache.save('joola:cachedTable:lock:' + key.clean(), value, joola.config.cache.lockTimeout, function (err) {
91
92 //});
93 }
94 setTimeout(function () {
95 if (new Date() - startTimer % 100 == 0)
96 joola.logger.silly('Waiting on cache for table [' + lockKey + ']');
97 if (new Date() - startTimer > joola.config.integration.cache.waitOnCache) {
98 joola.logger.debug('Timeout while waiting for cache [' + lockKey + ']');
99 return callback(false);
100 }
101 else
102 _self.lock(key, rangeKey, query, callback);
103 }, joola.config.integration.cache.waitInterval);
104 }
105 else if (value.lockKey == lockKey) {
106 return callback(true, lockKey);
107 }
108 else if (value.lockKey == null) {
109 value = {};
110 value.lockKey = lockKey;
111 value.nextLock = null;
112 lockRangeKey(key, value, function (success) {
113 if (success) {
114 return callback(true, lockKey);
115 }
116 else
117 return callback(false);
118 });
119 }
120 else {
121 return callback(false);
122 }
123 }
124 else {
125 value = {};
126 value.lockKey = lockKey;
127 value.nextLock = null;
128 lockRangeKey(key, value, function (success) {
129 if (success)
130 return callback(true, lockKey);
131 else
132 return callback(false);
133 });
134 }
135 });
136};
137
138exports.unlock = function (key, rangeKey, lockKey, callback) {
139 var unlockRangeKey = function (key, value, callback) {
140 joola.cache.save('locks', key.clean(), value, function () {
141 joola.logger.silly('RangeKey [' + 'joola:cachedTable:' + key.clean() + ':' + lockKey + '] unlocked!');
142 return callback();
143 });
144 };
145
146 joola.cache.load('locks', key.clean(), function (error, value) {
147 if (value) {
148 if (value.lockKey == lockKey) {
149 value.lockKey = value.nextLock;
150 value.nextLock = null;
151 unlockRangeKey(key, value, function () {
152 return callback();
153 });
154 }
155 else {
156
157 return callback();
158 }
159 }
160 else {
161
162 return callback();
163 }
164
165 });
166};
167
168exports.cacheTable = function (datatable, query, callback) {
169 var _self = this;
170
171 /*
172 if (query._query.type == 'distinct') {
173 joola.logger.error('DISTINCT');
174 callback();
175 return;
176 }*/
177
178 var ds = datatable.datasource;
179 //datatable.query = _datatables.basequery(datatable);
180 query.lockKey = require('node-uuid').v4();
181 query.startdate = query.startdate.fixDate(true, true);
182 query.enddate = query.enddate.fixDate(true, true);
183
184 _self.lock(datatable.id, '', query, function (success, lockKey) {
185 if (!success) {
186 joola.logger.error('Failed to obtain lock on [' + lockKey + '].');
187 //throw 'Failed to obtain lock on [' + datatable.id + '][' + query[0].cache.range.key + '].';
188 return callback();
189 }
190 _self.check(datatable, query, true, function () {
191 if (query.cache.state == 'match') {
192 joola.logger.silly('Table [' + datatable.id + '] for range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff') + '] already available.');
193 _self.unlock(datatable.id, '', lockKey, function () {
194 return callback(null);
195 });
196 return;
197 }
198
199 if (query.cache.state == 'extendleft') {
200 query.extension = 'left';
201 query.enddate = ce.clone(query.cache.range.startdate);
202 query.enddate.setMilliseconds(query.enddate.getMilliseconds() - 1);
203 joola.logger.debug('Table [' + datatable.id + '] can be extended to the left, new enddate: ' + query.enddate);
204 }
205 else if (query.cache.state == 'extendright') {
206 query.extension = 'right';
207 query.startdate = ce.clone(query.cache.range.enddate);
208 query.startdate.setMilliseconds(query.startdate.getMilliseconds() + 1);
209 joola.logger.debug('Table [' + datatable.id + '] can be extended to the right, new startdate: ' + query.startdate);
210 }
211 else if (query.cache.state == 'extendboth') {
212 joola.logger.debug('Table [' + datatable.id + '] can be extended in both directions.');
213 query.extension = 'both';
214 var _basequery = ce.clone(query);
215 var _query = ce.clone(query);
216 _query.enddate = ce.clone(_basequery.cache.range.startdate);
217 _query.enddate.setMilliseconds(_basequery.enddate.getMilliseconds() - 1);
218 query = [];
219 query.push(ce.clone(_query));
220 _query = ce.clone(_basequery);
221 _query.startdate = ce.clone(_basequery.cache.range.enddate);
222 _query.startdate.setMilliseconds(_basequery.startdate.getMilliseconds() + 1);
223 query.push(ce.clone(_query));
224 }
225 else {
226 joola.logger.debug('Table [' + datatable.id + '] check/extend check found nothing.');
227 }
228
229 if (!Array.isArray(query)) {
230 var _query = ce.clone(query);
231 query = [];
232 query.push(_query);
233 }
234 query.forEach(function (q) {
235 q.startdate = utils.formatDate(q.startdate, 'yyyy-mm-dd hh:nn:ss.fff', true);
236 q.enddate = utils.formatDate(q.enddate, 'yyyy-mm-dd hh:nn:ss.fff', true);
237
238 q.sql = q.sql.replace('%STARTDATE%', q.startdate);
239 q.sql = q.sql.replace('%ENDDATE%', q.enddate);
240
241 q.datatable = datatable;
242 q.datasource = ds;
243 q.limit = q.datasource.limit;
244 q.lockKey = require('node-uuid').v4();
245
246 //_self.lock(datatable.id, query[0].cache.range.key, q, function (success, lockKey) {
247
248 var calls = [];
249
250 if (q.startdate > q.enddate) {
251 joola.logger.debug('Avoiding query due to mismatching start/end dates');
252 }
253 else {
254 var call = function (callback) {
255 joola.logger.info('Executing source query [' + datatable.id + '] for range [' + q.startdate + ']-[' + q.enddate + '][lim:' + q.limit + ']...');
256
257 connector.executeQuery(q, function (query, rows, fields, err) {
258 if (err) {
259 return callback(err);
260 }
261
262 var data = {};
263 data.rows = rows.rows;
264 data.fields = rows.fields;
265
266 if (data.rows.length == 0) {
267 joola.logger.debug('Nothing to do here.');
268 return callback(null);
269 }
270 else {
271 /* if (data.rows.length > 100000) {
272 var limitCalls = [];
273 var iCount = Math.floor((data.rows.length / 100000) + 1);
274
275 query.balanced = true;
276
277 joola.logger.debug('Balanced query for cache data on table [' + datatable.id + '] has finished (' + data.rows.length + '), saving to cache store...');
278 joola.logger.debug('Setting up calls for rows chunk [' + iCount.toString() + ' chunks]...');
279 for (var i = 0; i < iCount; i++) {
280 joola.logger.debug('Setting up call for rows chunk [' + i.toString() + ']...');
281 var limitCall = function (callback) {
282 var _data = ce.clone(data);
283 _data.rows.splice(100000, data.rows.length);
284 joola.logger.debug('Processing data on table [' + datatable.id + '], length: ' + _data.rows.length + '<-' + data.rows.length + '), saving to cache store...');
285 if (_data.rows.length < 100000) {
286 query.balanced = false;
287 }
288 _self.saveData(datatable, ce.clone(query), _data, function () {
289 _data = null;
290 data.rows.splice(0, 100000);
291 return callback(null);
292 });
293 };
294
295 limitCalls.push(limitCall);
296 }
297
298 forkSeries(limitCalls, function () {
299 return callback(null);
300 })
301 }
302 else {*/
303 joola.logger.debug('Query for cache data on table [' + datatable.id + '] has finished (' + data.rows.length + '), saving to cache store...');
304 if (data.rows.length > 0) {
305 _self.saveData(datatable, query, data, function (datatable, data, err) {
306 return callback(err);
307 });
308 }
309 // }
310 }
311 });
312 };
313 calls.push(call);
314 }
315 fork(calls, function (err) {
316 _self.unlock(datatable.id, query[0].cache.range.key, lockKey, function () {
317 return callback(err);
318 });
319 });
320 });
321 });
322 });
323};
324
325exports.check = function (datatable, query, wait, callback) {
326 var _self = this;
327 joola.logger.debug('Checking table [' + datatable.id + '] for range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff') + '].');// with resolution [' + query.resolution + ']')
328
329 query.cache = {};
330 joola.cache.load('tables', datatable.id.clean(), function (error, value) {
331 if (value) {
332 _.each(value.ranges, function (range) {
333 range.startdate = new Date(range.startdate);
334 range.enddate = new Date(range.enddate);
335
336 joola.logger.debug('Found cached table [' + datatable.id + '] with range [' + utils.formatDate(range.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + '] - [' + utils.formatDate(range.enddate, 'yyyy-mm-dd hh:nn:ss.fff') + ']');
337
338 //TODO: Better handle finding ranges, currently keeps extending one range.
339 //we have 4 possibilities
340 //the range is already cached
341 if (range.startdate <= query.startdate && range.enddate >= query.enddate) {
342 query.cache.range = range;
343 query.cache.state = 'match';
344 return callback();
345 }
346 //both sides extension
347 else if (query.startdate <= range.startdate && query.enddate >= range.enddate) {
348 query.cache.range = range;
349 query.cache.state = 'extendboth';
350 }
351 //we can extend the table to the left
352 else if (query.startdate < range.startdate && range.enddate >= query.enddate) {
353 //query.cache.key = range.key;
354 query.cache.range = range;
355 query.cache.state = 'extendleft';
356 }
357 //we can extend the table to the right
358 else if (range.startdate <= query.startdate && range.enddate < query.enddate) {
359 //query.cache.key = range.key;
360 query.cache.range = range;
361 query.cache.state = 'extendright';
362 }
363 });
364
365 if (query.cache.state != 'match') {
366 if (!query.cache.range) {
367 query.cache.range = {};
368 query.cache.range.key = utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff').clean() + '_' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff').clean();
369 query.cache.range.startdate = query.startdate;
370 query.cache.range.enddate = query.enddate;
371 }
372 return callback();
373 }
374 }
375 else {
376 query.cache = {};
377 query.cache.range = {};
378 query.cache.range.key = utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss.fff').clean() + '_' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss.fff').clean();
379
380 query.cache.range.startdate = query.startdate;
381 query.cache.range.enddate = query.enddate;
382
383 if (wait)
384 query.cache.range.state = 'building';
385 value = {ranges: [query.cache.range]};
386 joola.cache.save('tables', datatable.id.clean(), value, function (error, value) {
387
388 return callback();
389 });
390 }
391 });
392};
393
394exports.saveData = function (datatable, query, data, callback) {
395 joola.logger.debug('Saving cache data for [' + datatable.id + ']:[ext:' + query.extension + ']...');
396 var db = createConnection(datatable);
397 db.saveData(query, datatable, data, function (error, firstDate, lastDate) {
398 if (error) {
399 return callback(datatable, data, error);
400 }
401 else if (data) {
402 data = data.rows;
403 joola.logger.debug('Cache engine finished storing [' + datatable.id + '], found [' + (data ? data.length : 0) + '] documents, [' + firstDate + ']:[' + lastDate + '].');
404 //db.maxNotHandledCacheDate(datatable, function (date) {
405 if (data && data.length > 0 && typeof(firstDate) != 'undefined' && firstDate != null && typeof(lastDate) != 'undefined' && lastDate != null) {
406 var key = 'joola:cachedTable:' + datatable.id.clean();
407
408 var save = function (key, value, range) {
409 if (!query.balanced) {
410 joola.cache.save('tables', datatable.id.clean(), value, null, function () {
411 joola.logger.debug('Cache key saved for [' + datatable.id.clean() + '] with range [' + utils.formatDate(range.startdate, 'yyyy-mm-dd hh:nn:ss.fff') + ']-[' + utils.formatDate(range.enddate, 'yyyy-mm-dd hh:nn:ss') + '].');
412 return callback(datatable, data, error);
413 });
414 }
415 else
416 return callback(datatable, data, error);
417 };
418 joola.cache.load('tables', datatable.id.clean(), function (error, value) {
419 if (value && query.cache.range.key) {
420
421 var _range = _.find(value.ranges, function (range) {
422 return range.key == query.cache.range.key;
423 });
424
425 if (!_range) {
426 //we have a new range.
427 _range = {};
428 value.ranges.push(_range);
429 }
430 else {
431 // firstDate = (new Date(_range.startdate) < firstDate ? new Date(_range.startdate) : firstDate);
432 // lastDate = (new Date(_range.enddate) > lastDate ? new Date(_range.enddate) : lastDate);
433 }
434
435 _range.extension = query.extension;
436 switch (query.extension) {
437 case null:
438 _range.startdate = firstDate//.fixDate(true);
439 _range.enddate = lastDate//.fixDate(true);
440 break;
441 case 'left':
442 _range.startdate = firstDate//.fixDate(true);
443 break;
444 case 'right':
445 _range.enddate = lastDate//.fixDate(true);
446 break;
447 case 'both':
448 _range.startdate = query.startdate//.fixDate(true);
449 _range.enddate = query.enddate//.fixDate(true);
450 break;
451 default:
452 break;
453 }
454
455 /*
456 if (typeof(_range.startdate) != typeof(new Date()))
457 _range.startdate = new Date(_range.startdate);
458 if (typeof(_range.enddate) != typeof(new Date()))
459 _range.enddate = new Date(_range.enddate);
460
461 if (_range.startdate)
462 firstDate = _range.startdate;
463 if (query.startdate < firstDate)
464 firstDate = query.startdate;
465 //lastDate = _range.enddate;
466 if (query.enddate > lastDate)
467 lastDate = query.enddate;
468 if (_range.enddate > lastDate)
469 lastDate = _range.enddate;
470 */
471
472
473 //_range.enddate = lastDate;
474
475 //_range.startdate = _range.startdate.fixDate(true);
476 //_range.enddate = new Date(_range.enddate).fixDate(true);
477
478 _range.state = 'ready';
479 _range.key = _range.startdate.clean() + '_' + _range.enddate.clean();//_range.enddate.clean()
480 _range.debug = '1';
481 save(key, value, _range);
482 }
483 else if (value) {
484 var rangeToAdd =
485 {
486 key: firstDate.clean() + '_' + lastDate.clean(), //query.enddate.clean(),
487 startdate: firstDate,
488 enddate: lastDate,//new Date(query.enddate),
489 state: 'ready',
490 debug: '2'
491 };
492 value.ranges.push(rangeToAdd);
493 save(key, value, rangeToAdd);
494 }
495 else {
496 var value = {
497 ranges: [
498 {
499 key: firstDate.clean() + '_' + lastDate.clean(),// query.enddate.clean(),
500 startdate: firstDate,
501 enddate: lastDate,//new Date(query.enddate),
502 state: 'ready',
503 debug: '3'
504 }
505 ]};
506 save(key, value, value.ranges[0]);
507 }
508 });
509 }
510 else {
511
512 return callback(datatable, data, error);
513 }
514 //});
515 }
516 else {
517 return callback(datatable, data, error);
518 }
519
520 });
521};
522
523exports.fetch = function (query, callback) {
524 joola.logger.debug('Fetch from cache [' + query.datatable.id + '] with range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss') + ']');
525 try {
526 if (!query.datatable.datasource) query.datatable.datasource = _datasources.get(query.datatable.datasourceid);
527 var db = createConnection(query.datatable);
528
529 db.fetch(query, callback);
530 }
531 catch (ex) {
532 return callback(ex, arguments);
533 }
534};
535
536exports.eagerCache = function (dt, callback) {
537 var self = this;
538 joola.logger.debug('Eager cache running for table [' + dt.id + ']...');
539 var _ds;
540
541 var maxDate = null;
542 //self.maxCacheDate(dt, function (maxDate) {
543 //if (maxDate == null) {
544 _ds = _.find(joola.config.integration.datasources, function (ds) {
545 return ds.id == dt.datasource.id;
546 });
547 if (_ds && _ds.lastEagerCache) {
548 joola.logger.debug('Missing max cache date on [' + dt.id + '], setting to last used: ' + _ds.lastEagerCache);
549 maxDate = _ds.lastEagerCache;
550 }
551 else {
552 joola.logger.debug('Missing max cache date on [' + dt.id + '], setting to: ' + dt.datasource.enddate.value);
553 maxDate = dt.datasource.enddate.value;
554 if (_ds)
555 _ds.lastEagerCache = new Date(maxDate);
556 }
557 //}
558 var _todate = new Date(maxDate);
559 _todate.setMilliseconds(_todate.getMilliseconds() + dt.caching.eager.step);
560
561 maxDate = new Date(_todate);
562 if (_ds)
563 _ds.lastEagerCache = new Date(maxDate);
564
565 joola.logger.debug('Processing ' + dt.name + '[' + maxDate + ']:[' + _todate + ']');
566
567 var query = connector.createQuery();
568 query.type = 'eager';
569 dt.query = _datatables.basequery(dt);
570
571 var basequery = dt.query;
572 query.sql = basequery.sql;
573 query.datasource = dt.datasource;
574
575 var enddate = ce.clone(_todate);
576 var startdate = ce.clone(maxDate);
577 startdate.setMilliseconds(startdate.getMilliseconds() + 1);
578
579 query.enddate = enddate;
580 query.startdate = startdate;
581
582 self.cacheTable(dt, ce.clone(query), function (err) {
583 return callback(err);
584 });
585 //});
586};
587
588
589module.exports = exports;
\No newline at end of file