UNPKG

22.7 kBJavaScriptView Raw
1/**
2 * joola.io
3 *
4 * Copyright Joola Smart Solutions, Ltd. <info@joo.la>
5 *
6 * Licensed under GNU General Public License 3.0 or later.
7 * Some rights reserved. See LICENSE, AUTHORS.
8 *
9 * @license GPL-3.0+ <http://spdx.org/licenses/GPL-3.0+>
10 */
11
12var
13 utils = require('../shared/utils'),
14 _datasources = require('../objects/datasources'),
15 _datatables = require('../objects/datatables'),
16 connector = require('../connectors/connector'),
17 _mongo = require('mongodb'),
18 path = require('path');
19
20exports.minCacheDate = function (datatable, callback) {
21 joola.logger.debug('Checking min cache date on [' + datatable.id + ']...');
22 var connstring = datatable.caching.system.url;
23 joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
24
25 //noinspection JSPotentiallyInvalidConstructorUsage
26 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
27 if (err)
28 return callback(err);
29 db.createCollection(datatable.id, null, function (err, collection) {
30 if (err) {
31 db.close();
32 return callback(err);
33 }
34 collection.find().sort({date: 1}).limit(1).toArray(function (err, result) {
35 db.close();
36 if (err)
37 return callback(err);
38
39 return callback(null, new Date(result[0].date));
40 });
41 });
42 });
43};
44
45exports.maxCacheDate = function (datatable, callback) {
46 var connstring = datatable.caching.system.url;
47 joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
48
49 //noinspection JSPotentiallyInvalidConstructorUsage
50 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
51 if (err)
52 return callback(err);
53
54 db.createCollection(datatable.id, null, function (err, collection) {
55 if (err) {
56 db.close();
57 return callback(err);
58 }
59 collection.find().sort({date: -1}).limit(1).toArray(function (err, result) {
60 db.close();
61 if (err)
62 return callback(err);
63
64 return callback(null, new Date(result[0].date));
65 });
66 });
67 });
68};
69
70exports.maxNotHandledCacheDate = function (datatable, callback) {
71 joola.logger.debug('Checking max not handled cache date on [' + datatable.id + ']...');
72 var connstring = datatable.caching.system.url;
73 joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
74
75 //noinspection JSPotentiallyInvalidConstructorUsage
76 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
77 db.createCollection(datatable.id, null, function (err, collection) {
78 if (err)
79 return callback(err);
80 collection.find({handled: false}).sort({date: -1}).limit(1).toArray(function (err, result) {
81 db.close();
82 if (err)
83 return callback(null);
84
85 if (result && result.length > 0) {
86 joola.logger.debug('Found max not handled cache date [' + datatable.id + ']: ' + new Date(result[0].date));
87 return callback(new Date(result[0].date));
88 }
89 else
90 return callback(null);
91 });
92 });
93 });
94};
95
96exports.minNotHandledCacheDate = function (datatable, callback) {
97 joola.logger.debug('Checking min not handled cache date on [' + datatable.id + ']...');
98 var connstring = datatable.caching.system.url;
99 joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
100
101 //noinspection JSPotentiallyInvalidConstructorUsage
102 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
103 db.createCollection(datatable.id, null, function (err, collection) {
104 if (err)
105 return callback(err);
106
107 collection.find({handled: false}).sort({date: 1}, null, null).limit(1).toArray(function (err, result) {
108 db.close();
109 if (err)
110 return callback(null);
111
112 if (result && result.length > 0) {
113 joola.logger.debug('Found min not handled cache date [' + datatable.id + ']: ' + new Date(result[0].date));
114 return callback(new Date(result[0].date));
115 }
116 else
117 return callback(null);
118 });
119 });
120 });
121};
122
123function verifyCollection(db, datatable, query, data, callback) {
124 //noinspection JSUnresolvedFunction
125 joola.logger.silly('Creating collection [' + datatable.id + ']...');
126 db.createCollection(datatable.id, null, function (err, collection) {
127 if (err) throw err;
128
129 //noinspection JSUnresolvedFunction
130 joola.logger.silly('...Created collection [' + datatable.id + ']');
131
132 var indexKey = {key: 1};
133 verifyIndex(indexKey, collection, datatable, query, data, callback);
134 });
135}
136
137function verifyIndex(indexKey, collection, datatable, query, data, callback) {
138 var _indexKey = JSON.stringify(indexKey).replace(/"/g, '').replace(/:/g, '').replace(/ /g, '').replace(/\./g, '').replace(/,/g, '').replace('{', '').replace('}', '');
139 joola.logger.debug('Verifying index [' + _indexKey + ']...');
140 collection.ensureIndex(indexKey, { unique: true }, function (err) {
141 joola.logger.debug('... Index verfieid [' + _indexKey + ']' + (err ? ' Error: ' + err : ''));
142
143 indexKey = {'date': 1};
144 collection.ensureIndex(indexKey, { unique: false }, function () {
145 processRows(collection, datatable, query, data, callback);
146 });
147 });
148}
149
150function processRows(collection, datatable, query, data, callback) {
151 //noinspection JSUnresolvedFunction
152 joola.logger.silly('Processing rows, total count: ' + data.rows.length);
153
154 if (data.rows.length == 0) {
155 joola.logger.debug('Nothing to do here.');
156 return callback();
157 }
158
159 var chunks = chunk(data.rows, datatable.caching.chunkSize);
160 joola.logger.debug('Broke ' + data.rows.length + ' rows into ' + chunks.length + ' chunks of data [chunkSize:' + datatable.caching.chunkSize + '].');
161
162 data = null;
163
164 var $options = {w: 1};
165
166 var callsb = [];
167 _.each(chunks, function (chunk) {
168 joola.logger.debug('Processing chunk...');
169 var callb = function (callback) {
170 try {
171 var worker = path.resolve(__dirname + '/fork_processDocument');
172 var dimensions = _datatables.basequery(datatable).dimensionsUsed;
173 var metrics = _datatables.basequery(datatable).metricsUsed;
174
175 var done = function (err) {
176 if (err)
177 console.log(err);
178
179 return callback(null);
180 };
181
182 var counter = 0;
183 var args = {dimensions: dimensions, metrics: metrics, chunk: chunk};
184 var childProcess = require('child_process').fork(worker);
185 childProcess.on('message', function (message) {
186 if (message.type == 'init')
187 childProcess.send(require('JASON').stringify(args));
188 else if (message.type == 'document') {
189 var document = message.document;
190 var insertDocument = function (callback) {
191
192 document.date = new Date(document.date);
193
194 collection.insert(document, $options, function (err) {
195 if (counter % 100 == 0)
196 joola.logger.silly('Document saved [' + counter + '/' + chunk.length + '].');
197 if (err) {
198 if (err.message.indexOf('E11000') != -1) {
199 //TODO: check if we need to update docs in this case.
200 if (counter % 50 == 0)
201 joola.logger.warn('Found a duplicate document, key: ' + document.key);
202 }
203 else
204 joola.logger.error('Failed to save document [' + counter + '/' + chunk.length + ', ' + document.key + ']: ' + err);
205 }
206 counter++;
207 if (counter == chunk.length) {
208 childProcess.kill();
209 return done();
210 }
211 else
212 return callback();
213 });
214 };
215 insertDocument(function () {
216 });
217 }
218 });
219 }
220 catch (ex) {
221 console.log(ex);
222 console.log(ex.stack);
223 return callback(ex);
224 }
225 };
226 callsb.push(callb);
227 });
228 chunks = null;
229
230 fork(callsb, function () {
231 callsb = null;
232 joola.logger.debug('Finished parsing documents.');
233
234 exports.minNotHandledCacheDate(datatable, function (firstDate) {
235 exports.maxNotHandledCacheDate(datatable, function (lastDate) {
236 //TODO: If we extend on both sides, we should not mark as handled the entire lot.
237 //crunch(datatable, query, collection, db, function () {
238 // joola.logger.debug('Finished saving source data [' + firstDate + ']-[' + lastDate + '].');
239
240 if (!query.balanced) {
241 collection.update(
242 {handled: false},//, date: {$lte: $enddate, $gte: $startdate}},
243 {$set: {handled: true}},
244 {w: 1, multi: true, verbose: true},
245 function (err) {
246 if (err)
247 throw err;
248 joola.logger.debug('Processed rows marked as handled.');
249 try {
250 if (firstDate) {
251 joola.logger.debug('First date: ' + firstDate);
252 //firstDate.fixDate(true, true);
253 //joola.logger.debug('Fixed first date: ' + firstDate);
254 }
255 else
256 joola.logger.debug('Missing first date');
257 }
258 catch (ex) {
259 joola.logger.debug('Failed to fixdate on firstdate: ' + firstDate + ', ex:' + ex.message);
260 }
261 try {
262 if (lastDate) {
263 joola.logger.debug('Last date: ' + lastDate);
264 //lastDate.fixDate(true, true);
265 //joola.logger.debug('Fixed last date: ' + lastDate);
266 }
267 else
268 joola.logger.debug('Missing last date');
269 }
270 catch (ex) {
271 joola.logger.debug('Failed to fixdate on lastdate: ' + lastDate + ', ex:' + ex.message);
272 }
273 return callback(null, firstDate, lastDate);
274 });
275 }
276 else
277 return callback(null, firstDate, lastDate);
278 });
279 });
280 });
281}
282
283exports.saveData = function (query, datatable, data, callback) {
284 var connstring = datatable.caching.system.url;
285 joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
286
287 //noinspection JSPotentiallyInvalidConstructorUsage
288 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
289 if (err)
290 return callback(err);
291
292 joola.logger.debug('Connected to mongo @ ' + connstring);
293
294 verifyCollection(db, datatable, query, data, function (err, firstDate, lastDate) {
295 db.close();
296 return callback(err, firstDate, lastDate);
297 });
298 });
299};
300
301exports.fetch = function (query, callback) {
302 var connstring = query.datatable.caching.system.url;
303 joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
304 //noinspection JSPotentiallyInvalidConstructorUsage
305 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
306 if (err)
307 return callback(err);
308
309 joola.logger.silly('Verifying collection [' + query.datatable.id + '.' + query.resolution + ']...');
310 db.createCollection(query.datatable.id, {strict: false}, function (err, collection) {
311 var processFetch = function (collection) {
312 var fields = {};
313 _.each(query.dimensions, function (d) {
314 fields[d.name] = 1;
315 });
316
317 joola.logger.debug('Executing fetch from [' + query.datatable.id + '], range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss') + ']...');
318
319 var ds = _datasources.get(query.datatable.datasourceid);
320 var dbquery = connector.createQuery();
321
322 query.datatable.datasource = ds;
323 query.datatable.query = _datatables.basequery(query.datatable);
324
325 var basequery = query.datatable.query;
326 dbquery.sql = basequery.sql;
327 dbquery.datasource = ds;
328
329 dbquery.enddate = new Date(query.enddate).fixDate();
330 dbquery.startdate = new Date(query.startdate).fixDate();
331 dbquery.resolution = query.resolution;
332 dbquery._query = query;
333
334 try {
335 var manager = require('./manager'); //avoid circular requirement
336 //manager.cacheTable(query.datatable, ce.clone(dbquery), function () {
337 var groupBy = {};
338 _.each(query.dimensions, function (d) {
339 if (d.type == 'date') {
340 switch (query.resolution) {
341 case 'second':
342 groupBy['date'] = '$timebucket.second';
343 break;
344 case 'minute':
345 groupBy['date'] = '$timebucket.minute';
346 break;
347 case 'hour':
348 groupBy['date'] = '$timebucket.hour';
349 break;
350 case 'day':
351 groupBy['date'] = '$timebucket.day';
352 break;
353 case 'week':
354 groupBy['date'] = '$timebucket.day';
355 break;
356 case 'month':
357 groupBy['date'] = '$timebucket.month';
358 break;
359 case 'year':
360 groupBy['date'] = '$timebucket.year';
361 break;
362 default:
363 break;
364 }
365 }
366 else
367 groupBy[d.name] = '$' + d.name;
368 });
369
370 var $group = {};
371 $group._id = groupBy;
372
373 if (!query.distinctCount) {
374 $group.rowcount = {$sum: 1};
375
376 _.each(query.metrics, function (m) {
377 var exist = _.find(query.datatable.metrics, function (m) {
378 return _.find(query.metrics, function (m2) {
379 return m2.id == m.id;
380 })
381 });
382 if (exist) {
383 $group[m.name] = {$sum: '$' + m.name};
384 }
385 });
386 }
387 else {
388
389 }
390
391 var $sortkey = {};
392
393 if (query.sortKey) {
394 if ($group[query.sortKey.name])
395 $sortkey[query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
396 else {
397 $sortkey['_id.' + query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
398 }
399 }
400 else {
401 if (query.distinctCount) {
402 if (Object.keys($group._id).length > 1) {
403 if ($group._id.date)
404 $sortkey['_id.date'] = (query.sortDir == 'DESC' ? -1 : 1);
405 else
406 $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
407 }
408 else
409 $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
410 }
411 else {
412 $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
413 }
414 }
415
416 var $match = {
417 date: { $gte: new Date(query.startdate), $lte: new Date(query.enddate)}
418 };
419
420 _.each(query.filters, function (filter) {
421 if (filter.dimension.type != 'date') {
422 if (filter.operator == '=')
423 $match[filter.dimension.name] = filter.value;
424 else if (filter.operator == '*=')
425 $match[filter.dimension.name] = eval('/' + filter.value + '/gi');
426 else
427 $match[filter.dimension.name] = filter.value;
428 }
429 });
430
431 $match['handled'] = true;
432
433
434 if (!query.distinctCount) {
435 //console.log('match', $match);
436 //console.log('g', $group);
437 //console.log('sort', $sortkey);
438
439 joola.logger.debug('Executing mongodb aggregate [' + query.startdate + ']-[' + query.enddate + ']...');
440 collection.aggregate(
441 {$match: $match},
442 {$group: $group},
443 {$sort: $sortkey},
444 //{$limit: 500},
445 function (err, result) {
446 db.close();
447 if (!err) {
448 joola.logger.debug('Found [' + result.length + ' documents] in cached collection [' + query.datatable.id + '.' + query.resolution + ']');
449
450 function countKeys(obj) {
451 return Object.keys(obj).length;
452 }
453
454 if (result.length == 1) {
455 var r = result[0];
456 if (countKeys(r._id) == 0) {
457 joola.logger.debug('Found a missing dimension in collection.');
458 r._id = $group._id;
459 _.each(r._id, function (value, key) {
460 r._id[key] = '(not set)';
461 });
462 }
463 }
464
465 return callback(err, result, query);
466 }
467 else
468 return callback(err, null, query);
469
470 });
471 }
472 else {
473 joola.logger.debug('Executing mongodb distinct aggregate [' + query.startdate + ']-[' + query.enddate + ']...');
474 groupBy = {};
475 $group = {};
476 var $unwind = {};
477 _.each(query.dimensions, function (d) {
478 if (d.type == 'date') {
479 switch (query.resolution) {
480 case 'second':
481 groupBy['date'] = '$timebucket.second';
482 break;
483 case 'minute':
484 groupBy['date'] = '$timebucket.minute';
485 break;
486 case 'hour':
487 groupBy['date'] = '$timebucket.hour';
488 break;
489 case 'day':
490 groupBy['date'] = '$timebucket.day';
491 break;
492 case 'week':
493 groupBy['date'] = '$timebucket.day';
494 break;
495 case 'month':
496 groupBy['date'] = '$timebucket.month';
497 break;
498 case 'year':
499 groupBy['date'] = '$timebucket.year';
500 break;
501 default:
502 break;
503 }
504 }
505 else if (d.id != query.metrics[0].dimension) {
506 groupBy[d.name] = '$' + d.name;
507 }
508 else {
509 $group[d.name] = {'$addToSet': '$' + d.name};
510 $unwind = '$' + d.name;
511 }
512 });
513 $group._id = groupBy;
514 //
515
516 var $group2 = {
517 _id: '$_id'
518 };
519 $group2[query.metrics[0].name] = {$sum: 1};
520
521 //console.log('m', $match);
522 //console.log('g1', $group);
523 //console.log('un', $unwind);
524 //console.log('g2', $group2);
525 $sortkey = {};
526
527 if (query.sortKey) {
528 if ($group[query.sortKey.name])
529 $sortkey[query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
530 else {
531 $sortkey[ query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
532 }
533 }
534 else {
535 if (query.distinctCount) {
536 if (Object.keys($group._id).length > 1) {
537 if ($group._id.date)
538 $sortkey['_id.date'] = (query.sortDir == 'DESC' ? -1 : 1);
539 else
540 $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
541 }
542 else
543 $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
544 }
545 else {
546 $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
547 }
548 }
549
550 //console.log('m', $match);
551 //console.log('g', $group);
552 //console.log('u', $unwind);
553 //console.log('g2', $group2);
554 //console.log('sort', $sortkey);
555
556 collection.aggregate(
557 {$match: $match},
558 {$group: $group},
559 {$unwind: $unwind},
560 {$group: $group2},
561 {$sort: $sortkey},
562 //{$limit: 500},
563 function (err, result) {
564 db.close();
565 if (!err) {
566 joola.logger.debug('Found [' + result.length + ' documents] in cached collection [' + query.datatable.id + '.' + query.resolution + ']');
567
568 // console.log(result);
569
570 function countKeys(obj) {
571 return Object.keys(obj).length;
572 }
573
574 if (result.length == 1) {
575 var r = result[0];
576 if (countKeys(r._id) == 0) {
577 joola.logger.debug('Found a missing dimension in collection.');
578 r._id = $group._id;
579 _.each(r._id, function (value, key) {
580 r._id[key] = '(not set)';
581 })
582 }
583 }
584
585 return callback(err, result, query);
586 }
587 else {
588 return callback(err, null, query);
589 }
590 }
591 );
592 }
593 //});
594 }
595 catch (err) {
596 joola.logger.error('Error while calling cacheTable: ' + ex.message);
597 return callback(err);
598 }
599 };
600
601 if (err) {
602 db.collection(query.datatable.id, function (err, collection) {
603 if (err)
604 throw err;
605 processFetch(collection);
606 });
607 }
608 else {
609 processFetch(collection);
610 }
611 });
612 });
613};
614
615exports.flushCache = function (datatable, callback) {
616 var connstring = datatable.caching.system.url;
617 joola.logger.silly('Flushing mongo @ ' + connstring + '...');
618
619 var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
620 if (err) throw err;
621
622 joola.logger.warn('Flushed mongo @ ' + connstring);
623
624 db.dropDatabase(function (err) {
625 return callback(err);
626 });
627 });
628};
\No newline at end of file