1 | const fs_extra = require('fs-extra');
|
2 | const fs = require('fs');
|
3 | const path = require('path');
|
4 | const fetch = require("node-fetch");
|
5 | const shell = require("shelljs");
|
6 | const line_by_line = require('line-by-line');
|
7 | const async_series = require('async/series');
|
8 | const zlib = require("zlib");
|
9 | const jsome = require('jsome');
|
10 |
|
11 | const Promises = require('@opentribe/packages_promises');
|
12 |
|
13 | const LogInsights = require('./insights.js');
|
14 | const LogAdapters = require('./adapters.js');
|
15 |
|
16 | const DataDir = path.resolve(__dirname, './data');
|
17 | const ArchiveDir = path.resolve(__dirname, './s3-log-archive');
|
18 | const BucketName = "open-tribe-logdna-archive";
|
19 |
|
20 | const moveFolders = (oldPath, newPath, callback) => {
|
21 | fs.rename(oldPath, newPath, function (err) {
|
22 | if (err) {
|
23 | if (err.code === 'EXDEV') {
|
24 | return copy();
|
25 | }
|
26 | return callback(err);
|
27 | }
|
28 | callback();
|
29 | });
|
30 | const copy = () => {
|
31 | var readStream = fs.createReadStream(oldPath);
|
32 | var writeStream = fs.createWriteStream(newPath);
|
33 | readStream.on('error', callback);
|
34 | writeStream.on('error', callback);
|
35 | readStream.on('close', () => fs.unlink(oldPath, callback));
|
36 | readStream.pipe(writeStream);
|
37 | }
|
38 | };
|
39 |
|
40 | const safe_fetch = (...args) => new Promise(async (resolve, reject) => { try {
|
41 | const response = await fetch(...args);
|
42 | if (response.ok) {
|
43 | const text = await response.text();
|
44 | resolve(text)
|
45 | }
|
46 | reject(response);
|
47 | } catch(err) { reject(err) } });
|
48 |
|
49 | const get_last_5000_logs = (service_key, target) => new Promise(async (resolve, reject) => { try {
|
50 | const text = await safe_fetch(`https://user:${service_key}@api.logdna.com/v1/export?to=${Date.now()}&from=${0}`);
|
51 | fs.writeFileSync(target, text);
|
52 | resolve();
|
53 | } catch(err) { reject(err) } });
|
54 |
|
55 | const sync_archived_logs = () => new Promise(async (resolve, reject) => { try {
|
56 | const child = shell.exec('aws s3 sync s3://' + BucketName + ' ' + ArchiveDir + '/');
|
57 | resolve();
|
58 | } catch(err) { reject(err) } });
|
59 |
|
60 | const resync_archived_logs = () => new Promise(async (resolve, reject) => { try {
|
61 | fs_extra.removeSync(ArchiveDir);
|
62 | fs.mkdirSync(ArchiveDir);
|
63 | const child = shell.exec('aws s3 sync s3://' + BucketName + ' ' + ArchiveDir + '/');
|
64 | resolve();
|
65 | } catch(err) { reject(err) } });
|
66 |
|
67 | const segregate_relevant_logs = () => new Promise(async (resolve, reject) => { try {
|
68 | Object.keys(LogAdapters).map(type => {
|
69 | fs.writeFileSync(DataDir + '/' + type + '.txt', '');
|
70 | });
|
71 | const log_line_by_line = new line_by_line(DataDir + '/raw.txt');
|
72 | let log_stats = {};
|
73 | log_line_by_line.on('line', (line) => {
|
74 | if (line.length > 0) {
|
75 | let line_json = JSON.parse(line);
|
76 | Object.keys(LogAdapters)
|
77 | .filter(type => LogAdapters[type].condition(line_json))
|
78 | .map(type => ([type, LogAdapters[type].adapter(line_json)]))
|
79 | .forEach(([type, mapped_data]) => {
|
80 | if (typeof log_stats[type] === 'undefined') {
|
81 | log_stats[type] = 1;
|
82 | }
|
83 | log_stats[type] = log_stats[type] + 1;
|
84 | fs.appendFileSync(DataDir + '/' + type + '.txt', JSON.stringify(mapped_data) + '\n')
|
85 | })
|
86 | }
|
87 | })
|
88 | log_line_by_line.on('error', (err) => {
|
89 | console.log(err);
|
90 | reject(err);
|
91 | })
|
92 | log_line_by_line.on('end', () => {
|
93 | jsome({
|
94 | available_shards: log_stats
|
95 | });
|
96 | resolve();
|
97 | })
|
98 | } catch(err) { reject(err) } });
|
99 |
|
100 | const filepathList = (dir, filelist) => {
|
101 | files = fs.readdirSync(dir);
|
102 | filelist = filelist || [];
|
103 | files.forEach(file => {
|
104 | if (fs.statSync(dir + '/' + file).isDirectory()) {
|
105 | filelist = filepathList(dir + '/' + file, filelist);
|
106 | return
|
107 | }
|
108 | filelist.push(dir + '/' + file);
|
109 | });
|
110 | return filelist;
|
111 | };
|
112 |
|
113 | const unzip_merge_logs = (targetdir, outputfile) => new Promise(async (resolve, reject) => { try {
|
114 | fs.writeFileSync(outputfile, '');
|
115 | const filesToUnzip = filepathList(targetdir);
|
116 | await Promises.series(filesToUnzip.map(filePath => () => new Promise(async (resolve, reject) => { try {
|
117 | const _d = fs.readFileSync(filePath);
|
118 | zlib.gunzip(_d, (err, decoded) => {
|
119 | if (err) {
|
120 | console.log(err);
|
121 | reject(err)
|
122 | }
|
123 | console.log('Appending log chunk of size: ' + (fs.statSync(filePath).size / 1000000).toFixed(2) + ' MBs');
|
124 | fs.appendFileSync(outputfile, decoded.toString());
|
125 | resolve();
|
126 | });
|
127 | } catch(err) { reject(err) } })));
|
128 | resolve();
|
129 | } catch(err) { reject(err) } });
|
130 |
|
131 | exports.prepare_resync = () => new Promise(async (resolve, reject) => { try {
|
132 | await resync_archived_logs();
|
133 | await unzip_merge_logs(ArchiveDir, DataDir + '/raw.txt');
|
134 | await segregate_relevant_logs();
|
135 | resolve();
|
136 | } catch(err) { reject(err) } });
|
137 |
|
138 | exports.prepare_live = (service_key) => new Promise(async (resolve, reject) => { try {
|
139 | await get_last_5000_logs(service_key, DataDir + '/raw.txt');
|
140 | await segregate_relevant_logs();
|
141 | resolve();
|
142 | } catch(err) { reject(err) } });
|
143 |
|
144 | exports.prepare_archived = () => new Promise(async (resolve, reject) => { try {
|
145 | await sync_archived_logs();
|
146 | await unzip_merge_logs(ArchiveDir, DataDir + '/raw.txt');
|
147 | await segregate_relevant_logs();
|
148 | resolve();
|
149 | } catch(err) { reject(err) } });
|
150 |
|
151 |
|
152 |
|
153 | exports.prepare_from_cache = () => new Promise(async (resolve, reject) => { try {
|
154 | await segregate_relevant_logs();
|
155 | resolve();
|
156 | } catch(err) { reject(err) } });
|
157 |
|
158 | exports.insight = (query_name, args) => LogInsights[query_name](...(args || []));
|
159 | exports.insight_list = () => new Promise((resolve, reject) => {
|
160 | console.log(JSON.stringify(Object.keys(LogInsights), null, 2));
|
161 | resolve();
|
162 | });
|