1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 | var
|
13 | utils = require('../shared/utils'),
|
14 | _datasources = require('../objects/datasources'),
|
15 | _datatables = require('../objects/datatables'),
|
16 | connector = require('../connectors/connector'),
|
17 | _mongo = require('mongodb'),
|
18 | path = require('path');
|
19 |
|
20 | exports.minCacheDate = function (datatable, callback) {
|
21 | joola.logger.debug('Checking min cache date on [' + datatable.id + ']...');
|
22 | var connstring = datatable.caching.system.url;
|
23 | joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
|
24 |
|
25 |
|
26 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
27 | if (err)
|
28 | return callback(err);
|
29 | db.createCollection(datatable.id, null, function (err, collection) {
|
30 | if (err) {
|
31 | db.close();
|
32 | return callback(err);
|
33 | }
|
34 | collection.find().sort({date: 1}).limit(1).toArray(function (err, result) {
|
35 | db.close();
|
36 | if (err)
|
37 | return callback(err);
|
38 |
|
39 | return callback(null, new Date(result[0].date));
|
40 | });
|
41 | });
|
42 | });
|
43 | };
|
44 |
|
45 | exports.maxCacheDate = function (datatable, callback) {
|
46 | var connstring = datatable.caching.system.url;
|
47 | joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
|
48 |
|
49 |
|
50 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
51 | if (err)
|
52 | return callback(err);
|
53 |
|
54 | db.createCollection(datatable.id, null, function (err, collection) {
|
55 | if (err) {
|
56 | db.close();
|
57 | return callback(err);
|
58 | }
|
59 | collection.find().sort({date: -1}).limit(1).toArray(function (err, result) {
|
60 | db.close();
|
61 | if (err)
|
62 | return callback(err);
|
63 |
|
64 | return callback(null, new Date(result[0].date));
|
65 | });
|
66 | });
|
67 | });
|
68 | };
|
69 |
|
70 | exports.maxNotHandledCacheDate = function (datatable, callback) {
|
71 | joola.logger.debug('Checking max not handled cache date on [' + datatable.id + ']...');
|
72 | var connstring = datatable.caching.system.url;
|
73 | joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
|
74 |
|
75 |
|
76 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
77 | db.createCollection(datatable.id, null, function (err, collection) {
|
78 | if (err)
|
79 | return callback(err);
|
80 | collection.find({handled: false}).sort({date: -1}).limit(1).toArray(function (err, result) {
|
81 | db.close();
|
82 | if (err)
|
83 | return callback(null);
|
84 |
|
85 | if (result && result.length > 0) {
|
86 | joola.logger.debug('Found max not handled cache date [' + datatable.id + ']: ' + new Date(result[0].date));
|
87 | return callback(new Date(result[0].date));
|
88 | }
|
89 | else
|
90 | return callback(null);
|
91 | });
|
92 | });
|
93 | });
|
94 | };
|
95 |
|
96 | exports.minNotHandledCacheDate = function (datatable, callback) {
|
97 | joola.logger.debug('Checking min not handled cache date on [' + datatable.id + ']...');
|
98 | var connstring = datatable.caching.system.url;
|
99 | joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
|
100 |
|
101 |
|
102 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
103 | db.createCollection(datatable.id, null, function (err, collection) {
|
104 | if (err)
|
105 | return callback(err);
|
106 |
|
107 | collection.find({handled: false}).sort({date: 1}, null, null).limit(1).toArray(function (err, result) {
|
108 | db.close();
|
109 | if (err)
|
110 | return callback(null);
|
111 |
|
112 | if (result && result.length > 0) {
|
113 | joola.logger.debug('Found min not handled cache date [' + datatable.id + ']: ' + new Date(result[0].date));
|
114 | return callback(new Date(result[0].date));
|
115 | }
|
116 | else
|
117 | return callback(null);
|
118 | });
|
119 | });
|
120 | });
|
121 | };
|
122 |
|
123 | function verifyCollection(db, datatable, query, data, callback) {
|
124 |
|
125 | joola.logger.silly('Creating collection [' + datatable.id + ']...');
|
126 | db.createCollection(datatable.id, null, function (err, collection) {
|
127 | if (err) throw err;
|
128 |
|
129 |
|
130 | joola.logger.silly('...Created collection [' + datatable.id + ']');
|
131 |
|
132 | var indexKey = {key: 1};
|
133 | verifyIndex(indexKey, collection, datatable, query, data, callback);
|
134 | });
|
135 | }
|
136 |
|
137 | function verifyIndex(indexKey, collection, datatable, query, data, callback) {
|
138 | var _indexKey = JSON.stringify(indexKey).replace(/"/g, '').replace(/:/g, '').replace(/ /g, '').replace(/\./g, '').replace(/,/g, '').replace('{', '').replace('}', '');
|
139 | joola.logger.debug('Verifying index [' + _indexKey + ']...');
|
140 | collection.ensureIndex(indexKey, { unique: true }, function (err) {
|
141 | joola.logger.debug('... Index verfieid [' + _indexKey + ']' + (err ? ' Error: ' + err : ''));
|
142 |
|
143 | indexKey = {'date': 1};
|
144 | collection.ensureIndex(indexKey, { unique: false }, function () {
|
145 | processRows(collection, datatable, query, data, callback);
|
146 | });
|
147 | });
|
148 | }
|
149 |
|
150 | function processRows(collection, datatable, query, data, callback) {
|
151 |
|
152 | joola.logger.silly('Processing rows, total count: ' + data.rows.length);
|
153 |
|
154 | if (data.rows.length == 0) {
|
155 | joola.logger.debug('Nothing to do here.');
|
156 | return callback();
|
157 | }
|
158 |
|
159 | var chunks = chunk(data.rows, datatable.caching.chunkSize);
|
160 | joola.logger.debug('Broke ' + data.rows.length + ' rows into ' + chunks.length + ' chunks of data [chunkSize:' + datatable.caching.chunkSize + '].');
|
161 |
|
162 | data = null;
|
163 |
|
164 | var $options = {w: 1};
|
165 |
|
166 | var callsb = [];
|
167 | _.each(chunks, function (chunk) {
|
168 | joola.logger.debug('Processing chunk...');
|
169 | var callb = function (callback) {
|
170 | try {
|
171 | var worker = path.resolve(__dirname + '/fork_processDocument');
|
172 | var dimensions = _datatables.basequery(datatable).dimensionsUsed;
|
173 | var metrics = _datatables.basequery(datatable).metricsUsed;
|
174 |
|
175 | var done = function (err) {
|
176 | if (err)
|
177 | console.log(err);
|
178 |
|
179 | return callback(null);
|
180 | };
|
181 |
|
182 | var counter = 0;
|
183 | var args = {dimensions: dimensions, metrics: metrics, chunk: chunk};
|
184 | var childProcess = require('child_process').fork(worker);
|
185 | childProcess.on('message', function (message) {
|
186 | if (message.type == 'init')
|
187 | childProcess.send(require('JASON').stringify(args));
|
188 | else if (message.type == 'document') {
|
189 | var document = message.document;
|
190 | var insertDocument = function (callback) {
|
191 |
|
192 | document.date = new Date(document.date);
|
193 |
|
194 | collection.insert(document, $options, function (err) {
|
195 | if (counter % 100 == 0)
|
196 | joola.logger.silly('Document saved [' + counter + '/' + chunk.length + '].');
|
197 | if (err) {
|
198 | if (err.message.indexOf('E11000') != -1) {
|
199 |
|
200 | if (counter % 50 == 0)
|
201 | joola.logger.warn('Found a duplicate document, key: ' + document.key);
|
202 | }
|
203 | else
|
204 | joola.logger.error('Failed to save document [' + counter + '/' + chunk.length + ', ' + document.key + ']: ' + err);
|
205 | }
|
206 | counter++;
|
207 | if (counter == chunk.length) {
|
208 | childProcess.kill();
|
209 | return done();
|
210 | }
|
211 | else
|
212 | return callback();
|
213 | });
|
214 | };
|
215 | insertDocument(function () {
|
216 | });
|
217 | }
|
218 | });
|
219 | }
|
220 | catch (ex) {
|
221 | console.log(ex);
|
222 | console.log(ex.stack);
|
223 | return callback(ex);
|
224 | }
|
225 | };
|
226 | callsb.push(callb);
|
227 | });
|
228 | chunks = null;
|
229 |
|
230 | fork(callsb, function () {
|
231 | callsb = null;
|
232 | joola.logger.debug('Finished parsing documents.');
|
233 |
|
234 | exports.minNotHandledCacheDate(datatable, function (firstDate) {
|
235 | exports.maxNotHandledCacheDate(datatable, function (lastDate) {
|
236 |
|
237 |
|
238 |
|
239 |
|
240 | if (!query.balanced) {
|
241 | collection.update(
|
242 | {handled: false},
|
243 | {$set: {handled: true}},
|
244 | {w: 1, multi: true, verbose: true},
|
245 | function (err) {
|
246 | if (err)
|
247 | throw err;
|
248 | joola.logger.debug('Processed rows marked as handled.');
|
249 | try {
|
250 | if (firstDate) {
|
251 | joola.logger.debug('First date: ' + firstDate);
|
252 |
|
253 |
|
254 | }
|
255 | else
|
256 | joola.logger.debug('Missing first date');
|
257 | }
|
258 | catch (ex) {
|
259 | joola.logger.debug('Failed to fixdate on firstdate: ' + firstDate + ', ex:' + ex.message);
|
260 | }
|
261 | try {
|
262 | if (lastDate) {
|
263 | joola.logger.debug('Last date: ' + lastDate);
|
264 |
|
265 |
|
266 | }
|
267 | else
|
268 | joola.logger.debug('Missing last date');
|
269 | }
|
270 | catch (ex) {
|
271 | joola.logger.debug('Failed to fixdate on lastdate: ' + lastDate + ', ex:' + ex.message);
|
272 | }
|
273 | return callback(null, firstDate, lastDate);
|
274 | });
|
275 | }
|
276 | else
|
277 | return callback(null, firstDate, lastDate);
|
278 | });
|
279 | });
|
280 | });
|
281 | }
|
282 |
|
283 | exports.saveData = function (query, datatable, data, callback) {
|
284 | var connstring = datatable.caching.system.url;
|
285 | joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
|
286 |
|
287 |
|
288 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
289 | if (err)
|
290 | return callback(err);
|
291 |
|
292 | joola.logger.debug('Connected to mongo @ ' + connstring);
|
293 |
|
294 | verifyCollection(db, datatable, query, data, function (err, firstDate, lastDate) {
|
295 | db.close();
|
296 | return callback(err, firstDate, lastDate);
|
297 | });
|
298 | });
|
299 | };
|
300 |
|
301 | exports.fetch = function (query, callback) {
|
302 | var connstring = query.datatable.caching.system.url;
|
303 | joola.logger.silly('Connecting to mongo @ ' + connstring + '...');
|
304 |
|
305 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
306 | if (err)
|
307 | return callback(err);
|
308 |
|
309 | joola.logger.silly('Verifying collection [' + query.datatable.id + '.' + query.resolution + ']...');
|
310 | db.createCollection(query.datatable.id, {strict: false}, function (err, collection) {
|
311 | var processFetch = function (collection) {
|
312 | var fields = {};
|
313 | _.each(query.dimensions, function (d) {
|
314 | fields[d.name] = 1;
|
315 | });
|
316 |
|
317 | joola.logger.debug('Executing fetch from [' + query.datatable.id + '], range [' + utils.formatDate(query.startdate, 'yyyy-mm-dd hh:nn:ss') + ']-[' + utils.formatDate(query.enddate, 'yyyy-mm-dd hh:nn:ss') + ']...');
|
318 |
|
319 | var ds = _datasources.get(query.datatable.datasourceid);
|
320 | var dbquery = connector.createQuery();
|
321 |
|
322 | query.datatable.datasource = ds;
|
323 | query.datatable.query = _datatables.basequery(query.datatable);
|
324 |
|
325 | var basequery = query.datatable.query;
|
326 | dbquery.sql = basequery.sql;
|
327 | dbquery.datasource = ds;
|
328 |
|
329 | dbquery.enddate = new Date(query.enddate).fixDate();
|
330 | dbquery.startdate = new Date(query.startdate).fixDate();
|
331 | dbquery.resolution = query.resolution;
|
332 | dbquery._query = query;
|
333 |
|
334 | try {
|
335 | var manager = require('./manager');
|
336 |
|
337 | var groupBy = {};
|
338 | _.each(query.dimensions, function (d) {
|
339 | if (d.type == 'date') {
|
340 | switch (query.resolution) {
|
341 | case 'second':
|
342 | groupBy['date'] = '$timebucket.second';
|
343 | break;
|
344 | case 'minute':
|
345 | groupBy['date'] = '$timebucket.minute';
|
346 | break;
|
347 | case 'hour':
|
348 | groupBy['date'] = '$timebucket.hour';
|
349 | break;
|
350 | case 'day':
|
351 | groupBy['date'] = '$timebucket.day';
|
352 | break;
|
353 | case 'week':
|
354 | groupBy['date'] = '$timebucket.day';
|
355 | break;
|
356 | case 'month':
|
357 | groupBy['date'] = '$timebucket.month';
|
358 | break;
|
359 | case 'year':
|
360 | groupBy['date'] = '$timebucket.year';
|
361 | break;
|
362 | default:
|
363 | break;
|
364 | }
|
365 | }
|
366 | else
|
367 | groupBy[d.name] = '$' + d.name;
|
368 | });
|
369 |
|
370 | var $group = {};
|
371 | $group._id = groupBy;
|
372 |
|
373 | if (!query.distinctCount) {
|
374 | $group.rowcount = {$sum: 1};
|
375 |
|
376 | _.each(query.metrics, function (m) {
|
377 | var exist = _.find(query.datatable.metrics, function (m) {
|
378 | return _.find(query.metrics, function (m2) {
|
379 | return m2.id == m.id;
|
380 | })
|
381 | });
|
382 | if (exist) {
|
383 | $group[m.name] = {$sum: '$' + m.name};
|
384 | }
|
385 | });
|
386 | }
|
387 | else {
|
388 |
|
389 | }
|
390 |
|
391 | var $sortkey = {};
|
392 |
|
393 | if (query.sortKey) {
|
394 | if ($group[query.sortKey.name])
|
395 | $sortkey[query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
|
396 | else {
|
397 | $sortkey['_id.' + query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
|
398 | }
|
399 | }
|
400 | else {
|
401 | if (query.distinctCount) {
|
402 | if (Object.keys($group._id).length > 1) {
|
403 | if ($group._id.date)
|
404 | $sortkey['_id.date'] = (query.sortDir == 'DESC' ? -1 : 1);
|
405 | else
|
406 | $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
|
407 | }
|
408 | else
|
409 | $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
|
410 | }
|
411 | else {
|
412 | $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
|
413 | }
|
414 | }
|
415 |
|
416 | var $match = {
|
417 | date: { $gte: new Date(query.startdate), $lte: new Date(query.enddate)}
|
418 | };
|
419 |
|
420 | _.each(query.filters, function (filter) {
|
421 | if (filter.dimension.type != 'date') {
|
422 | if (filter.operator == '=')
|
423 | $match[filter.dimension.name] = filter.value;
|
424 | else if (filter.operator == '*=')
|
425 | $match[filter.dimension.name] = eval('/' + filter.value + '/gi');
|
426 | else
|
427 | $match[filter.dimension.name] = filter.value;
|
428 | }
|
429 | });
|
430 |
|
431 | $match['handled'] = true;
|
432 |
|
433 |
|
434 | if (!query.distinctCount) {
|
435 |
|
436 |
|
437 |
|
438 |
|
439 | joola.logger.debug('Executing mongodb aggregate [' + query.startdate + ']-[' + query.enddate + ']...');
|
440 | collection.aggregate(
|
441 | {$match: $match},
|
442 | {$group: $group},
|
443 | {$sort: $sortkey},
|
444 |
|
445 | function (err, result) {
|
446 | db.close();
|
447 | if (!err) {
|
448 | joola.logger.debug('Found [' + result.length + ' documents] in cached collection [' + query.datatable.id + '.' + query.resolution + ']');
|
449 |
|
450 | function countKeys(obj) {
|
451 | return Object.keys(obj).length;
|
452 | }
|
453 |
|
454 | if (result.length == 1) {
|
455 | var r = result[0];
|
456 | if (countKeys(r._id) == 0) {
|
457 | joola.logger.debug('Found a missing dimension in collection.');
|
458 | r._id = $group._id;
|
459 | _.each(r._id, function (value, key) {
|
460 | r._id[key] = '(not set)';
|
461 | });
|
462 | }
|
463 | }
|
464 |
|
465 | return callback(err, result, query);
|
466 | }
|
467 | else
|
468 | return callback(err, null, query);
|
469 |
|
470 | });
|
471 | }
|
472 | else {
|
473 | joola.logger.debug('Executing mongodb distinct aggregate [' + query.startdate + ']-[' + query.enddate + ']...');
|
474 | groupBy = {};
|
475 | $group = {};
|
476 | var $unwind = {};
|
477 | _.each(query.dimensions, function (d) {
|
478 | if (d.type == 'date') {
|
479 | switch (query.resolution) {
|
480 | case 'second':
|
481 | groupBy['date'] = '$timebucket.second';
|
482 | break;
|
483 | case 'minute':
|
484 | groupBy['date'] = '$timebucket.minute';
|
485 | break;
|
486 | case 'hour':
|
487 | groupBy['date'] = '$timebucket.hour';
|
488 | break;
|
489 | case 'day':
|
490 | groupBy['date'] = '$timebucket.day';
|
491 | break;
|
492 | case 'week':
|
493 | groupBy['date'] = '$timebucket.day';
|
494 | break;
|
495 | case 'month':
|
496 | groupBy['date'] = '$timebucket.month';
|
497 | break;
|
498 | case 'year':
|
499 | groupBy['date'] = '$timebucket.year';
|
500 | break;
|
501 | default:
|
502 | break;
|
503 | }
|
504 | }
|
505 | else if (d.id != query.metrics[0].dimension) {
|
506 | groupBy[d.name] = '$' + d.name;
|
507 | }
|
508 | else {
|
509 | $group[d.name] = {'$addToSet': '$' + d.name};
|
510 | $unwind = '$' + d.name;
|
511 | }
|
512 | });
|
513 | $group._id = groupBy;
|
514 |
|
515 |
|
516 | var $group2 = {
|
517 | _id: '$_id'
|
518 | };
|
519 | $group2[query.metrics[0].name] = {$sum: 1};
|
520 |
|
521 |
|
522 |
|
523 |
|
524 |
|
525 | $sortkey = {};
|
526 |
|
527 | if (query.sortKey) {
|
528 | if ($group[query.sortKey.name])
|
529 | $sortkey[query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
|
530 | else {
|
531 | $sortkey[ query.sortKey.name] = (query.sortDir == 'DESC' ? -1 : 1);
|
532 | }
|
533 | }
|
534 | else {
|
535 | if (query.distinctCount) {
|
536 | if (Object.keys($group._id).length > 1) {
|
537 | if ($group._id.date)
|
538 | $sortkey['_id.date'] = (query.sortDir == 'DESC' ? -1 : 1);
|
539 | else
|
540 | $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
|
541 | }
|
542 | else
|
543 | $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
|
544 | }
|
545 | else {
|
546 | $sortkey['_id'] = (query.sortDir == 'DESC' ? -1 : 1);
|
547 | }
|
548 | }
|
549 |
|
550 |
|
551 |
|
552 |
|
553 |
|
554 |
|
555 |
|
556 | collection.aggregate(
|
557 | {$match: $match},
|
558 | {$group: $group},
|
559 | {$unwind: $unwind},
|
560 | {$group: $group2},
|
561 | {$sort: $sortkey},
|
562 |
|
563 | function (err, result) {
|
564 | db.close();
|
565 | if (!err) {
|
566 | joola.logger.debug('Found [' + result.length + ' documents] in cached collection [' + query.datatable.id + '.' + query.resolution + ']');
|
567 |
|
568 |
|
569 |
|
570 | function countKeys(obj) {
|
571 | return Object.keys(obj).length;
|
572 | }
|
573 |
|
574 | if (result.length == 1) {
|
575 | var r = result[0];
|
576 | if (countKeys(r._id) == 0) {
|
577 | joola.logger.debug('Found a missing dimension in collection.');
|
578 | r._id = $group._id;
|
579 | _.each(r._id, function (value, key) {
|
580 | r._id[key] = '(not set)';
|
581 | })
|
582 | }
|
583 | }
|
584 |
|
585 | return callback(err, result, query);
|
586 | }
|
587 | else {
|
588 | return callback(err, null, query);
|
589 | }
|
590 | }
|
591 | );
|
592 | }
|
593 |
|
594 | }
|
595 | catch (err) {
|
596 | joola.logger.error('Error while calling cacheTable: ' + ex.message);
|
597 | return callback(err);
|
598 | }
|
599 | };
|
600 |
|
601 | if (err) {
|
602 | db.collection(query.datatable.id, function (err, collection) {
|
603 | if (err)
|
604 | throw err;
|
605 | processFetch(collection);
|
606 | });
|
607 | }
|
608 | else {
|
609 | processFetch(collection);
|
610 | }
|
611 | });
|
612 | });
|
613 | };
|
614 |
|
615 | exports.flushCache = function (datatable, callback) {
|
616 | var connstring = datatable.caching.system.url;
|
617 | joola.logger.silly('Flushing mongo @ ' + connstring + '...');
|
618 |
|
619 | var mongo = new _mongo.MongoClient.connect(connstring, {w: 1}, function (err, db) {
|
620 | if (err) throw err;
|
621 |
|
622 | joola.logger.warn('Flushed mongo @ ' + connstring);
|
623 |
|
624 | db.dropDatabase(function (err) {
|
625 | return callback(err);
|
626 | });
|
627 | });
|
628 | }; |
\ | No newline at end of file |