UNPKG

5.61 kBJavaScriptView Raw
1const fs_extra = require('fs-extra');
2const fs = require('fs');
3const path = require('path');
4const fetch = require("node-fetch");
5const shell = require("shelljs");
6const line_by_line = require('line-by-line');
7const async_series = require('async/series');
8const zlib = require("zlib");
9const jsome = require('jsome');
10
11const Promises = require('@opentribe/packages_promises');
12
13const LogInsights = require('./insights.js');
14const LogAdapters = require('./adapters.js');
15
16const DataDir = path.resolve(__dirname, './data');
17const ArchiveDir = path.resolve(__dirname, './s3-log-archive');
18const BucketName = "open-tribe-logdna-archive";
19
20const moveFolders = (oldPath, newPath, callback) => {
21 fs.rename(oldPath, newPath, function (err) {
22 if (err) {
23 if (err.code === 'EXDEV') {
24 return copy();
25 }
26 return callback(err);
27 }
28 callback();
29 });
30 const copy = () => {
31 var readStream = fs.createReadStream(oldPath);
32 var writeStream = fs.createWriteStream(newPath);
33 readStream.on('error', callback);
34 writeStream.on('error', callback);
35 readStream.on('close', () => fs.unlink(oldPath, callback));
36 readStream.pipe(writeStream);
37 }
38};
39
40const safe_fetch = (...args) => new Promise(async (resolve, reject) => { try {
41 const response = await fetch(...args);
42 if (response.ok) {
43 const text = await response.text();
44 resolve(text)
45 }
46 reject(response);
47} catch(err) { reject(err) } });
48
49const get_last_5000_logs = (service_key, target) => new Promise(async (resolve, reject) => { try {
50 const text = await safe_fetch(`https://user:${service_key}@api.logdna.com/v1/export?to=${Date.now()}&from=${0}`);
51 fs.writeFileSync(target, text);
52 resolve();
53} catch(err) { reject(err) } });
54
55const sync_archived_logs = () => new Promise(async (resolve, reject) => { try {
56 const child = shell.exec('aws s3 sync s3://' + BucketName + ' ' + ArchiveDir + '/');
57 resolve();
58} catch(err) { reject(err) } });
59
60const resync_archived_logs = () => new Promise(async (resolve, reject) => { try {
61 fs_extra.removeSync(ArchiveDir);
62 fs.mkdirSync(ArchiveDir);
63 const child = shell.exec('aws s3 sync s3://' + BucketName + ' ' + ArchiveDir + '/');
64 resolve();
65} catch(err) { reject(err) } });
66
67const segregate_relevant_logs = () => new Promise(async (resolve, reject) => { try {
68 Object.keys(LogAdapters).map(type => {
69 fs.writeFileSync(DataDir + '/' + type + '.txt', '');
70 });
71 const log_line_by_line = new line_by_line(DataDir + '/raw.txt');
72 let log_stats = {};
73 log_line_by_line.on('line', (line) => {
74 if (line.length > 0) {
75 let line_json = JSON.parse(line);
76 Object.keys(LogAdapters)
77 .filter(type => LogAdapters[type].condition(line_json))
78 .map(type => ([type, LogAdapters[type].adapter(line_json)]))
79 .forEach(([type, mapped_data]) => {
80 if (typeof log_stats[type] === 'undefined') {
81 log_stats[type] = 1;
82 }
83 log_stats[type] = log_stats[type] + 1;
84 fs.appendFileSync(DataDir + '/' + type + '.txt', JSON.stringify(mapped_data) + '\n')
85 })
86 }
87 })
88 log_line_by_line.on('error', (err) => {
89 console.log(err);
90 reject(err);
91 })
92 log_line_by_line.on('end', () => {
93 jsome({
94 available_shards: log_stats
95 });
96 resolve();
97 })
98} catch(err) { reject(err) } });
99
100const filepathList = (dir, filelist) => {
101 files = fs.readdirSync(dir);
102 filelist = filelist || [];
103 files.forEach(file => {
104 if (fs.statSync(dir + '/' + file).isDirectory()) {
105 filelist = filepathList(dir + '/' + file, filelist);
106 return
107 }
108 filelist.push(dir + '/' + file);
109 });
110 return filelist;
111};
112
113const unzip_merge_logs = (targetdir, outputfile) => new Promise(async (resolve, reject) => { try {
114 fs.writeFileSync(outputfile, '');
115 const filesToUnzip = filepathList(targetdir);
116 await Promises.series(filesToUnzip.map(filePath => () => new Promise(async (resolve, reject) => { try {
117 const _d = fs.readFileSync(filePath);
118 zlib.gunzip(_d, (err, decoded) => {
119 if (err) {
120 console.log(err);
121 reject(err)
122 }
123 console.log('Appending log chunk of size: ' + (fs.statSync(filePath).size / 1000000).toFixed(2) + ' MBs');
124 fs.appendFileSync(outputfile, decoded.toString());
125 resolve();
126 });
127 } catch(err) { reject(err) } })));
128 resolve();
129} catch(err) { reject(err) } });
130
131exports.prepare_resync = () => new Promise(async (resolve, reject) => { try {
132 await resync_archived_logs();
133 await unzip_merge_logs(ArchiveDir, DataDir + '/raw.txt');
134 await segregate_relevant_logs();
135 resolve();
136} catch(err) { reject(err) } });
137
138exports.prepare_live = (service_key) => new Promise(async (resolve, reject) => { try {
139 await get_last_5000_logs(service_key, DataDir + '/raw.txt');
140 await segregate_relevant_logs();
141 resolve();
142} catch(err) { reject(err) } });
143
144exports.prepare_archived = () => new Promise(async (resolve, reject) => { try {
145 await sync_archived_logs();
146 await unzip_merge_logs(ArchiveDir, DataDir + '/raw.txt');
147 await segregate_relevant_logs();
148 resolve();
149} catch(err) { reject(err) } });
150
151//good for iterating over an idea on how to map a particular log file
152//without incurring a request limit penalty
153exports.prepare_from_cache = () => new Promise(async (resolve, reject) => { try {
154 await segregate_relevant_logs();
155 resolve();
156} catch(err) { reject(err) } });
157
158exports.insight = (query_name, args) => LogInsights[query_name](...(args || []));
159exports.insight_list = () => new Promise((resolve, reject) => {
160 console.log(JSON.stringify(Object.keys(LogInsights), null, 2));
161 resolve();
162});