UNPKG

36.6 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = void 0;
7
8require("@babel/polyfill");
9
10var _lodash = _interopRequireDefault(require("lodash"));
11
12var _compressionUtils = require("./compressionUtils");
13
14var _asyncUtils = require("./asyncUtils");
15
16var _System = _interopRequireDefault(require("./System"));
17
18var _Module = _interopRequireDefault(require("./Module"));
19
20var _errorUtils = require("./errorUtils");
21
22function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
23
24function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
25
26function _awaitAsyncGenerator(value) { return new _AwaitValue(value); }
27
28function _wrapAsyncGenerator(fn) { return function () { return new _AsyncGenerator(fn.apply(this, arguments)); }; }
29
30function _AsyncGenerator(gen) { var front, back; function send(key, arg) { return new Promise(function (resolve, reject) { var request = { key: key, arg: arg, resolve: resolve, reject: reject, next: null }; if (back) { back = back.next = request; } else { front = back = request; resume(key, arg); } }); } function resume(key, arg) { try { var result = gen[key](arg); var value = result.value; var wrappedAwait = value instanceof _AwaitValue; Promise.resolve(wrappedAwait ? value.wrapped : value).then(function (arg) { if (wrappedAwait) { resume("next", arg); return; } settle(result.done ? "return" : "normal", arg); }, function (err) { resume("throw", err); }); } catch (err) { settle("throw", err); } } function settle(type, value) { switch (type) { case "return": front.resolve({ value: value, done: true }); break; case "throw": front.reject(value); break; default: front.resolve({ value: value, done: false }); break; } front = front.next; if (front) { resume(front.key, front.arg); } else { back = null; } } this._invoke = send; if (typeof gen.return !== "function") { this.return = undefined; } }
31
32if (typeof Symbol === "function" && Symbol.asyncIterator) { _AsyncGenerator.prototype[Symbol.asyncIterator] = function () { return this; }; }
33
34_AsyncGenerator.prototype.next = function (arg) { return this._invoke("next", arg); };
35
36_AsyncGenerator.prototype.throw = function (arg) { return this._invoke("throw", arg); };
37
38_AsyncGenerator.prototype.return = function (arg) { return this._invoke("return", arg); };
39
40function _AwaitValue(value) { this.wrapped = value; }
41
42function _asyncIterator(iterable) { var method; if (typeof Symbol === "function") { if (Symbol.asyncIterator) { method = iterable[Symbol.asyncIterator]; if (method != null) return method.call(iterable); } if (Symbol.iterator) { method = iterable[Symbol.iterator]; if (method != null) return method.call(iterable); } } throw new TypeError("Object is not async iterable"); }
43
44const {
45 log,
46 warn,
47 error,
48 noteGauge,
49 noteCount,
50 noteTimer,
51 trackOp
52} = new _Module.default(__filename); // eslint-disable-line no-unused-vars
53
54const MAX_PARTITION_CACHE_SIZE = 1000;
55
56class AthenaClient {
57 constructor(dependencies) {
58 _defineProperty(this, "_awsClient", void 0);
59
60 _defineProperty(this, "_addedPartitions", void 0);
61
62 _defineProperty(this, "writeAthenaFile", async (items, options) => {
63 await Promise.all([// persist data to S3
64 (async () => {
65 const gzippedBody = await (0, _compressionUtils.gzipCompress)(items.map(item => JSON.stringify(item)).join('\n'));
66 await this._awsClient.runS3(s3 => s3.putObject({
67 Bucket: options.bucket,
68 Key: `${options.dirPath}/${options.fileName}.jsons.gz`,
69 Body: gzippedBody,
70 ContentType: 'text/plain',
71 ContentEncoding: 'gzip',
72 Metadata: {
73 'x-amz-meta-items': items.length.toString()
74 }
75 }), {
76 noLog: true
77 });
78 })(), // create athena partition
79 (async () => {
80 const athenaFullTableName = options.athenaFullTableName;
81 if (!athenaFullTableName) return;
82 const partitionDesc = options.dirPath.split('/').map(part => part.split('=')).filter(partParts => partParts.length === 2).map(([field, value]) => `${field} = '${value}'`).join(', ');
83
84 const existingPartition = this._addedPartitions.find(p => p === partitionDesc);
85
86 if (!existingPartition) {
87 const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`;
88 await this.executeAthenaQuery(query, 'add-athena-partition', {
89 dontWaitForCompletion: true,
90 noLog: true
91 }); // maintain partition cache
92
93 this._addedPartitions.unshift(partitionDesc);
94
95 if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE) this._addedPartitions.pop();
96 }
97 })()]);
98 });
99
100 _defineProperty(this, "executeAthenaQuery", async (query, queryName, options) => {
101 options = options || {};
102 const opts = {
103 dontWaitForCompletion: options.dontWaitForCompletion || false,
104 maxResults: options.maxResults || 0,
105 noLog: options.noLog || false
106 };
107 return await trackOp(async () => {
108 try {
109 let results = [];
110 const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName);
111 if (opts.dontWaitForCompletion) return [];
112 var _iteratorNormalCompletion = true;
113 var _didIteratorError = false;
114
115 var _iteratorError;
116
117 try {
118 for (var _iterator = _asyncIterator(asyncIterator), _step, _value; _step = await _iterator.next(), _iteratorNormalCompletion = _step.done, _value = await _step.value, !_iteratorNormalCompletion; _iteratorNormalCompletion = true) {
119 const items = _value;
120 const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults;
121 const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items;
122 results = [...results, ...itemsToAdd];
123 if (reachedMaxResults) break;
124 }
125 } catch (err) {
126 _didIteratorError = true;
127 _iteratorError = err;
128 } finally {
129 try {
130 if (!_iteratorNormalCompletion && _iterator.return != null) {
131 await _iterator.return();
132 }
133 } finally {
134 if (_didIteratorError) {
135 throw _iteratorError;
136 }
137 }
138 }
139
140 return results;
141 } catch (err) {
142 error('Athena query execution failed', {
143 queryName: queryName,
144 query: query,
145 err
146 });
147 throw err;
148 }
149 }, `athena-query-${queryName}`, null, {
150 log: !opts.noLog
151 });
152 });
153
154 _defineProperty(this, "executeAthenaQueryAndGetResultIterator", async (query, queryName) => {
155 try {
156 const awsConfig = _System.default.getConfig().aws;
157
158 if (!awsConfig) throw new Error('aws config not set in system config');
159 const startMS = Date.now();
160 const OUTPUT_LOCATION = `s3://aws-athena-query-results-${awsConfig.accountId || (0, _errorUtils.throwUnsetArg)('systemConfig.aws.accountId')}-${awsConfig.defaultRegion || (0, _errorUtils.throwUnsetArg)('systemConfig.aws.defaultRegion')}/${_System.default.getConfig().env}/${queryName}/`; // execute query
161
162 const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({
163 QueryString: query,
164 ResultConfiguration: {
165 OutputLocation: OUTPUT_LOCATION
166 }
167 }));
168 return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS);
169 } catch (err) {
170 error('Athena query execution failed', {
171 queryName: queryName,
172 query: query,
173 err
174 });
175 throw err;
176 }
177 });
178
179 this._awsClient = dependencies.awsClient;
180 this._addedPartitions = [];
181 }
182
183}
184
185exports.default = AthenaClient;
186
187function iterateAthenaQueryResults(_x, _x2, _x3, _x4, _x5) {
188 return _iterateAthenaQueryResults.apply(this, arguments);
189}
190
191function _iterateAthenaQueryResults() {
192 _iterateAthenaQueryResults = _wrapAsyncGenerator(function* (awsClient, queryExecutionId, queryName, query, startMS) {
193 try {
194 // wait for query execution to end
195 let dataScannedInBytes = 0;
196
197 while (true) {
198 const statusRes = yield _awaitAsyncGenerator(awsClient.runAthena(athena => athena.getQueryExecution({
199 QueryExecutionId: queryExecutionId
200 })));
201 const {
202 State,
203 StateChangeReason
204 } = statusRes.result.QueryExecution.Status;
205
206 if (State === 'SUCCEEDED') {
207 dataScannedInBytes = _lodash.default.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes');
208 break;
209 } else if (['QUEUED', 'RUNNING'].includes(State)) {
210 yield _awaitAsyncGenerator((0, _asyncUtils.sleep)(500));
211 } else {
212 throw new _errorUtils.ExtendedError('Athena query failed', {
213 queryName: queryName,
214 state: State,
215 reason: StateChangeReason,
216 query: query
217 });
218 }
219 }
220
221 const kbScanned = Math.round(dataScannedInBytes / 1000);
222 const kbCharged = Math.max(kbScanned, 10 * 1000); // Athena charges a minimum of 10mb for every query
223
224 const costDollar = 5 * kbCharged / 1000000000; // Athena charges 5$ for a TB of data
225
226 noteCount(`${queryName}.kbScanned`, kbScanned);
227 noteCount(`${queryName}.kbCharged`, kbCharged);
228 noteCount(`${queryName}.dollarsCharged`, costDollar); // fetch response
229
230 let nextToken = null;
231
232 do {
233 const resultsRes = yield _awaitAsyncGenerator(awsClient.runAthena(athena => athena.getQueryResults({
234 QueryExecutionId: queryExecutionId,
235 MaxResults: Infinity,
236 NextToken: nextToken
237 })));
238 const athenaResults = getAthenaQueryResultLines(resultsRes.result);
239 const newResultObjects = csvArrayToObjects(athenaResults);
240 yield newResultObjects;
241 nextToken = resultsRes.result.NextToken;
242 } while (nextToken);
243
244 const durationMS = Date.now() - startMS; // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS })
245
246 noteTimer(`${queryName}.duration`, durationMS);
247 } catch (err) {
248 error('Athena query execution failed', {
249 queryName: queryName,
250 query: query,
251 err
252 });
253 throw err;
254 }
255 });
256 return _iterateAthenaQueryResults.apply(this, arguments);
257}
258
259function stripQuotes(input) {
260 if (typeof input !== 'string') return input;
261 if (input === '') return null;
262 const length = input.length;
263 if (length < 2 || !input.startsWith('"') || !input.endsWith('"')) return input;
264 return input.substr(1, length - 2);
265}
266
267function getAthenaQueryResultLines(res) {
268 const rolloutLines = [];
269 const numericTypeNames = ['integer', 'bigint', 'double'];
270 const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo;
271 rolloutLines.push(columnMetas.map(cm => cm.Name));
272 res.ResultSet.Rows.splice(1) // skip header
273 .forEach(row => rolloutLines.push(row.Data.map((datum, i) => numericTypeNames.includes(columnMetas[i].Type) ? datum.VarCharValue === undefined ? null : Number(datum.VarCharValue) : datum.VarCharValue === undefined ? null : `"${datum.VarCharValue}"`)));
274 const csvContent = rolloutLines;
275 return csvContent;
276}
277
278function csvArrayToObjects(csvArray) {
279 const propNames = csvArray[0];
280 const valueLines = csvArray.slice(1);
281 const objects = valueLines.map(valueLine => {
282 const obj = {};
283 valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value));
284 return obj;
285 });
286 return objects;
287}
288//# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"sources":["../src/AthenaClient.js"],"names":["log","warn","error","noteGauge","noteCount","noteTimer","trackOp","Module","__filename","MAX_PARTITION_CACHE_SIZE","AthenaClient","constructor","dependencies","items","options","Promise","all","gzippedBody","map","item","JSON","stringify","join","_awsClient","runS3","s3","putObject","Bucket","bucket","Key","dirPath","fileName","Body","ContentType","ContentEncoding","Metadata","length","toString","noLog","athenaFullTableName","partitionDesc","split","part","filter","partParts","field","value","existingPartition","_addedPartitions","find","p","query","executeAthenaQuery","dontWaitForCompletion","unshift","pop","queryName","opts","maxResults","results","asyncIterator","executeAthenaQueryAndGetResultIterator","reachedMaxResults","itemsToAdd","slice","err","awsConfig","System","getConfig","aws","Error","startMS","Date","now","OUTPUT_LOCATION","accountId","defaultRegion","env","startRes","runAthena","athena","startQueryExecution","QueryString","ResultConfiguration","OutputLocation","iterateAthenaQueryResults","result","QueryExecutionId","awsClient","queryExecutionId","dataScannedInBytes","statusRes","getQueryExecution","State","StateChangeReason","QueryExecution","Status","_","get","includes","ExtendedError","state","reason","kbScanned","Math","round","kbCharged","max","costDollar","nextToken","resultsRes","getQueryResults","MaxResults","Infinity","NextToken","athenaResults","getAthenaQueryResultLines","newResultObjects","csvArrayToObjects","durationMS","stripQuotes","input","startsWith","endsWith","substr","res","rolloutLines","numericTypeNames","columnMetas","ResultSet","ResultSetMetadata","ColumnInfo","push","cm","Name","Rows","splice","forEach","row","Data","datum","i","Type","VarCharValue","undefined","Number","csvContent","csvArray","propNames","valueLines","objects","valueLine","obj"],"mappings":";;;;;;;AAEA;;AAEA;;AAEA;;AACA;;AACA;;AACA;;AACA;;;;;;;;;;;;;;;;;;;;;;;;AAIA,MAAM;AAAEA,EAAAA,GAAF;AAAOC,EAAAA,IAAP;AAAaC,EAAAA,KAAb;AAAoBC,EAAAA,SAApB;AAA+BC,EAAAA,SAA/B;AAA0CC,EAAAA,SAA1C;AAAqDC,EAAAA;AAArD,IAAiE,IAAIC,eAAJ,CAAWC,UAAX,CAAvE,C,CAA8F;;AAE9F,MAAMC,wBAAwB,GAAG,IAAjC;;AAEe,MAAMC,YAAN,CAAmB;AAMhCC,EAAAA,WAAW,CAACC,YAAD,EAAyC;AAAA;;AAAA;;AAAA,6CAMlC,OAAOC,KAAP,EAA6BC,OAA7B,KAMI;AACpB,YAAMC,OAAO,CAACC,GAAR,CAAY,CAEhB;AACA,OAAC,YAAY;AACX,cAAMC,WAAW,GAAG,MAAM,oCAAaJ,KAAK,CAACK,GAAN,CAAUC,IAAI,IAAIC,IAAI,CAACC,SAAL,CAAeF,IAAf,CAAlB,EAAwCG,IAAxC,CAA6C,IAA7C,CAAb,CAA1B;AACA,cAAM,KAAKC,UAAL,CAAgBC,KAAhB,CAAsBC,EAAE,IAAIA,EAAE,CAACC,SAAH,CAAa;AAC7CC,UAAAA,MAAM,EAAEb,OAAO,CAACc,MAD6B;AAE7CC,UAAAA,GAAG,EAAG,GAAEf,OAAO,CAACgB,OAAQ,IAAGhB,OAAO,CAACiB,QAAS,WAFC;AAG7CC,UAAAA,IAAI,EAAEf,WAHuC;AAI7CgB,UAAAA,WAAW,EAAE,YAJgC;AAK7CC,UAAAA,eAAe,EAAE,MAL4B;AAM7CC,UAAAA,QAAQ,EAAE;AACR,gCAAoBtB,KAAK,CAACuB,MAAN,CAAaC,QAAb;AADZ;AANmC,SAAb,CAA5B,EASF;AACFC,UAAAA,KAAK,EAAE;AADL,SATE,CAAN;AAYD,OAdD,GAHgB,EAmBhB;AACA,OAAC,YAAY;AACX,cAAMC,mBAAmB,GAAGzB,OAAO,CAACyB,mBAApC;AACA,YAAI,CAACA,mBAAL,EACE;AAEF,cAAMC,aAAa,GAAG1B,OAAO,CAACgB,OAAR,CACnBW,KADmB,CACb,GADa,EAEnBvB,GAFmB,CAEfwB,IAAI,IAAIA,IAAI,CAACD,KAAL,CAAW,GAAX,CAFO,EAGnBE,MAHmB,CAGZC,SAAS,IAAIA,SAAS,CAACR,MAAV,KAAqB,CAHtB,EAInBlB,GAJmB,CAIf,CAAC,CAAE2B,KAAF,EAASC,KAAT,CAAD,KAAsB,GAAED,KAAM,OAAMC,KAAM,GAJ3B,EAKnBxB,IALmB,CAKd,IALc,CAAtB;;AAOA,cAAMyB,iBAAiB,GAAG,KAAKC,gBAAL,CAAsBC,IAAtB,CAA2BC,CAAC,IAAIA,CAAC,KAAKV,aAAtC,CAA1B;;AACA,YAAI,CAACO,iBAAL,EAAwB;AACtB,gBAAMI,KAAK,GAAI,eAAcZ,mBAAoB,iCAAgCC,aAAc,oBAAmB1B,OAAO,CAACc,MAAO,IAAGd,OAAO,CAACgB,OAAQ,IAApJ;AACA,gBAAM,KAAKsB,kBAAL,CAAwBD,KAAxB,EAA+B,sBAA/B,EAAuD;AAAEE,YAAAA,qBAAqB,EAAE,IAAzB;AAA+Bf,YAAAA,KAAK,EAAE;AAAtC,WAAvD,CAAN,CAFsB,CAItB;;AACA,eAAKU,gBAAL,CAAsBM,OAAtB,CAA8Bd,aAA9B;;AACA,cAAI,KAAKQ,gBAAL,CAAsBZ,MAAtB,GAA+B3B,wBAAnC,EACE,KAAKuC,gBAAL,CAAsBO,GAAtB;AACH;AACF,OAtBD,GApBgB,CAAZ,CAAN;AA6CD,KA1DmD;;AAAA,gDA4D/B,OAAOJ,KAAP,EAAsBK,SAAtB,EAAyC1C,OAAzC,KAAyJ;AAC5KA,MAAAA,OAAO,GAAGA,OAAO,IAAI,EAArB;AACA,YAAM2C,IAAI,GAAG;AACXJ,QAAAA,qBAAqB,EAAEvC,OAAO,CAACuC,qBAAR,IAAiC,KAD7C;AAEXK,QAAAA,UAAU,EAAE5C,OAAO,CAAC4C,UAAR,IAAsB,CAFvB;AAGXpB,QAAAA,KAAK,EAAExB,OAAO,CAACwB,KAAR,IAAiB;AAHb,OAAb;AAMA,aAAO,MAAMhC,OAAO,CAAC,YAAY;AAC/B,YAAI;AACF,cAAIqD,OAAsB,GAAG,EAA7B;AACA,gBAAMC,aAAa,GAAG,MAAM,KAAKC,sCAAL,CAA4CV,KAA5C,EAAmDK,SAAnD,CAA5B;AACA,cAAIC,IAAI,CAACJ,qBAAT,EACE,OAAO,EAAP;AAJA;AAAA;;AAAA;;AAAA;AAKF,gDAA0BO,aAA1B,oLAAyC;AAAA,oBAAxB/C,KAAwB;AACvC,oBAAMiD,iBAAiB,GAAGL,IAAI,CAACC,UAAL,GAAkB,CAAlB,IAAuBC,OAAO,CAACvB,MAAR,GAAiBvB,KAAK,CAACuB,MAAvB,GAAgCqB,IAAI,CAACC,UAAtF;AACA,oBAAMK,UAAU,GAAGD,iBAAiB,GAAGjD,KAAK,CAACmD,KAAN,CAAY,CAAZ,EAAeP,IAAI,CAACC,UAAL,GAAkBC,OAAO,CAACvB,MAAzC,CAAH,GAAsDvB,KAA1F;AACA8C,cAAAA,OAAO,GAAG,CAAE,GAAGA,OAAL,EAAc,GAAGI,UAAjB,CAAV;AACA,kBAAID,iBAAJ,EACE;AACH;AAXC;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;;AAYF,iBAAOH,OAAP;AACD,SAbD,CAaE,OAAOM,GAAP,EAAY;AACZ/D,UAAAA,KAAK,CAAC,+BAAD,EAAkC;AAAEsD,YAAAA,SAAS,EAAEA,SAAb;AAAwBL,YAAAA,KAAK,EAAEA,KAA/B;AAAsCc,YAAAA;AAAtC,WAAlC,CAAL;AACA,gBAAMA,GAAN;AACD;AACF,OAlBmB,EAkBhB,gBAAeT,SAAU,EAlBT,EAkBY,IAlBZ,EAkBkB;AAAExD,QAAAA,GAAG,EAAE,CAACyD,IAAI,CAACnB;AAAb,OAlBlB,CAApB;AAmBD,KAvFmD;;AAAA,oEAyFX,OAAOa,KAAP,EAAsBK,SAAtB,KAA+F;AACtI,UAAI;AACF,cAAMU,SAAS,GAAGC,gBAAOC,SAAP,GAAmBC,GAArC;;AACA,YAAI,CAACH,SAAL,EACE,MAAM,IAAII,KAAJ,CAAU,qCAAV,CAAN;AAEF,cAAMC,OAAO,GAAGC,IAAI,CAACC,GAAL,EAAhB;AACA,cAAMC,eAAe,GAAI,iCAAiCR,SAAS,CAACS,SAAV,IAAuB,+BAAc,4BAAd,CAA6C,IAAIT,SAAS,CAACU,aAAV,IAA2B,+BAAc,gCAAd,CAAiD,IAAGT,gBAAOC,SAAP,GAAmBS,GAAI,IAAGrB,SAAU,GAArP,CANE,CAQF;;AACA,cAAMsB,QAAQ,GAAG,MAAM,KAAKvD,UAAL,CAAgBwD,SAAhB,CAA0BC,MAAM,IAAIA,MAAM,CAACC,mBAAP,CAA2B;AACpFC,UAAAA,WAAW,EAAE/B,KADuE;AAEpFgC,UAAAA,mBAAmB,EAAE;AACnBC,YAAAA,cAAc,EAAEV;AADG;AAF+D,SAA3B,CAApC,CAAvB;AAOA,eAAOW,yBAAyB,CAAC,KAAK9D,UAAN,EAAkBuD,QAAQ,CAACQ,MAAT,CAAgBC,gBAAlC,EAAoD/B,SAApD,EAA+DL,KAA/D,EAAsEoB,OAAtE,CAAhC;AACD,OAjBD,CAiBE,OAAON,GAAP,EAAY;AACZ/D,QAAAA,KAAK,CAAC,+BAAD,EAAkC;AAAEsD,UAAAA,SAAS,EAAEA,SAAb;AAAwBL,UAAAA,KAAK,EAAEA,KAA/B;AAAsCc,UAAAA;AAAtC,SAAlC,CAAL;AACA,cAAMA,GAAN;AACD;AACF,KA/GmD;;AAClD,SAAK1C,UAAL,GAAkBX,YAAY,CAAC4E,SAA/B;AACA,SAAKxC,gBAAL,GAAwB,EAAxB;AACD;;AAT+B;;;;SA0HlBqC,yB;;;;;mDAAhB,WAA0CG,SAA1C,EAAgEC,gBAAhE,EAA0FjC,SAA1F,EAA6GL,KAA7G,EAA4HoB,OAA5H,EAAuL;AACrL,QAAI;AACF;AACA,UAAImB,kBAAkB,GAAG,CAAzB;;AACA,aAAO,IAAP,EAAa;AACX,cAAMC,SAAS,8BAASH,SAAS,CAACT,SAAV,CAAoBC,MAAM,IAAIA,MAAM,CAACY,iBAAP,CAAyB;AAC7EL,UAAAA,gBAAgB,EAAEE;AAD2D,SAAzB,CAA9B,CAAT,CAAf;AAIA,cAAM;AAAEI,UAAAA,KAAF;AAASC,UAAAA;AAAT,YAA+BH,SAAS,CAACL,MAAV,CAAiBS,cAAjB,CAAgCC,MAArE;;AACA,YAAIH,KAAK,KAAK,WAAd,EAA2B;AACzBH,UAAAA,kBAAkB,GAAGO,gBAAEC,GAAF,CAAMP,SAAS,CAACL,MAAV,CAAiBS,cAAvB,EAAuC,+BAAvC,CAArB;AACA;AACD,SAHD,MAGO,IAAI,CAAC,QAAD,EAAW,SAAX,EAAsBI,QAAtB,CAA+BN,KAA/B,CAAJ,EAA2C;AAChD,qCAAM,uBAAM,GAAN,CAAN;AACD,SAFM,MAEA;AACL,gBAAM,IAAIO,yBAAJ,CAAkB,qBAAlB,EAAyC;AAAE5C,YAAAA,SAAS,EAAEA,SAAb;AAAwB6C,YAAAA,KAAK,EAAER,KAA/B;AAAsCS,YAAAA,MAAM,EAAER,iBAA9C;AAAiE3C,YAAAA,KAAK,EAAEA;AAAxE,WAAzC,CAAN;AACD;AACF;;AAED,YAAMoD,SAAS,GAAGC,IAAI,CAACC,KAAL,CAAWf,kBAAkB,GAAG,IAAhC,CAAlB;AACA,YAAMgB,SAAS,GAAGF,IAAI,CAACG,GAAL,CAASJ,SAAT,EAAoB,KAAK,IAAzB,CAAlB,CApBE,CAoB+C;;AACjD,YAAMK,UAAU,GAAG,IAAIF,SAAJ,GAAgB,UAAnC,CArBE,CAqB4C;;AAC9CtG,MAAAA,SAAS,CAAE,GAAEoD,SAAU,YAAd,EAA2B+C,SAA3B,CAAT;AACAnG,MAAAA,SAAS,CAAE,GAAEoD,SAAU,YAAd,EAA2BkD,SAA3B,CAAT;AACAtG,MAAAA,SAAS,CAAE,GAAEoD,SAAU,iBAAd,EAAgCoD,UAAhC,CAAT,CAxBE,CA0BF;;AACA,UAAIC,SAAkB,GAAG,IAAzB;;AACA,SAAG;AACD,cAAMC,UAAU,8BAAStB,SAAS,CAACT,SAAV,CAAoBC,MAAM,IAAIA,MAAM,CAAC+B,eAAP,CAAuB;AAC5ExB,UAAAA,gBAAgB,EAAEE,gBAD0D;AAE5EuB,UAAAA,UAAU,EAAEC,QAFgE;AAG5EC,UAAAA,SAAS,EAAEL;AAHiE,SAAvB,CAA9B,CAAT,CAAhB;AAMA,cAAMM,aAAa,GAAGC,yBAAyB,CAACN,UAAU,CAACxB,MAAZ,CAA/C;AACA,cAAM+B,gBAAgB,GAAGC,iBAAiB,CAACH,aAAD,CAA1C;AAEA,cAAME,gBAAN;AAEAR,QAAAA,SAAS,GAAGC,UAAU,CAACxB,MAAX,CAAkB4B,SAA9B;AACD,OAbD,QAaSL,SAbT;;AAeA,YAAMU,UAAU,GAAG/C,IAAI,CAACC,GAAL,KAAaF,OAAhC,CA3CE,CA4CF;;AACAlE,MAAAA,SAAS,CAAE,GAAEmD,SAAU,WAAd,EAA0B+D,UAA1B,CAAT;AACD,KA9CD,CA8CE,OAAOtD,GAAP,EAAY;AACZ/D,MAAAA,KAAK,CAAC,+BAAD,EAAkC;AAAEsD,QAAAA,SAAS,EAAEA,SAAb;AAAwBL,QAAAA,KAAK,EAAEA,KAA/B;AAAsCc,QAAAA;AAAtC,OAAlC,CAAL;AACA,YAAMA,GAAN;AACD;AACF,G;;;;AAED,SAASuD,WAAT,CAAqBC,KAArB,EAAkC;AAChC,MAAI,OAAOA,KAAP,KAAiB,QAArB,EACE,OAAOA,KAAP;AACF,MAAIA,KAAK,KAAK,EAAd,EACE,OAAO,IAAP;AACF,QAAMrF,MAAM,GAAGqF,KAAK,CAACrF,MAArB;AACA,MAAIA,MAAM,GAAG,CAAT,IAAc,CAACqF,KAAK,CAACC,UAAN,CAAiB,GAAjB,CAAf,IAAwC,CAACD,KAAK,CAACE,QAAN,CAAe,GAAf,CAA7C,EACE,OAAOF,KAAP;AACF,SAAOA,KAAK,CAACG,MAAN,CAAa,CAAb,EAAgBxF,MAAM,GAAG,CAAzB,CAAP;AACD;;AAED,SAASgF,yBAAT,CAAmCS,GAAnC,EAAsE;AACpE,QAAMC,YAAkC,GAAG,EAA3C;AACA,QAAMC,gBAAgB,GAAG,CAAC,SAAD,EAAY,QAAZ,EAAsB,QAAtB,CAAzB;AACA,QAAMC,WAAW,GAAGH,GAAG,CAACI,SAAJ,CAAcC,iBAAd,CAAgCC,UAApD;AACAL,EAAAA,YAAY,CAACM,IAAb,CAAkBJ,WAAW,CAAC9G,GAAZ,CAAgBmH,EAAE,IAAIA,EAAE,CAACC,IAAzB,CAAlB;AACAT,EAAAA,GAAG,CAACI,SAAJ,CAAcM,IAAd,CACGC,MADH,CACU,CADV,EACa;AADb,GAEGC,OAFH,CAGIC,GAAG,IAAIZ,YAAY,CAACM,IAAb,CACLM,GAAG,CAACC,IAAJ,CAASzH,GAAT,CACE,CAAC0H,KAAD,EAAQC,CAAR,KAAcd,gBAAgB,CAC3B5B,QADW,CACF6B,WAAW,CAACa,CAAD,CAAX,CAAeC,IADb,IAEXF,KAAK,CAACG,YAAN,KAAuBC,SAAvB,GAAmC,IAAnC,GAA0CC,MAAM,CAACL,KAAK,CAACG,YAAP,CAFrC,GAGXH,KAAK,CAACG,YAAN,KAAuBC,SAAvB,GAAmC,IAAnC,GAA2C,IAAGJ,KAAK,CAACG,YAAa,GAJtE,CADK,CAHX;AAYA,QAAMG,UAAU,GAAGpB,YAAnB;AACA,SAAOoB,UAAP;AACD;;AAED,SAAS5B,iBAAT,CAA2B6B,QAA3B,EAA2D;AACzD,QAAMC,SAAS,GAAGD,QAAQ,CAAC,CAAD,CAA1B;AACA,QAAME,UAAU,GAAGF,QAAQ,CAACnF,KAAT,CAAe,CAAf,CAAnB;AACA,QAAMsF,OAAO,GAAGD,UAAU,CAACnI,GAAX,CAAeqI,SAAS,IAAI;AAC1C,UAAMC,GAAG,GAAG,EAAZ;AACAD,IAAAA,SAAS,CAACd,OAAV,CAAkB,CAAC3F,KAAD,EAAQ+F,CAAR,KAAcW,GAAG,CAACJ,SAAS,CAACP,CAAD,CAAV,CAAH,GAAoBrB,WAAW,CAAC1E,KAAD,CAA/D;AACA,WAAO0G,GAAP;AACD,GAJe,CAAhB;AAKA,SAAOF,OAAP;AACD","sourcesContent":["// @flow\n\nimport \"@babel/polyfill\"\n\nimport _ from 'lodash'\n\nimport { gzipCompress } from './compressionUtils'\nimport { sleep } from './asyncUtils'\nimport System from \"./System\"\nimport Module from \"./Module\"\nimport { ExtendedError, throwUnsetArg } from './errorUtils' // eslint-disable-line no-unused-vars\n\nimport type AWSClient from './AWSClient'\n\nconst { log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars\n\nconst MAX_PARTITION_CACHE_SIZE = 1000\n\nexport default class AthenaClient {\n\n  _awsClient: AWSClient\n  _addedPartitions: Array<string>\n\n\n  constructor(dependencies: { awsClient: AWSClient }) {\n    this._awsClient = dependencies.awsClient\n    this._addedPartitions = []\n  }\n\n\n  writeAthenaFile = async (items: Array<Object>, options: {|\n    bucket: string,\n    dirPath: string,\n    fileName: string,\n    athenaFullTableName?: ?string,\n    noLog?: ?boolean,\n  |}): Promise<void> => {\n    await Promise.all([\n  \n      // persist data to S3\n      (async () => {\n        const gzippedBody = await gzipCompress(items.map(item => JSON.stringify(item)).join('\\n'))\n        await this._awsClient.runS3(s3 => s3.putObject({\n          Bucket: options.bucket,\n          Key: `${options.dirPath}/${options.fileName}.jsons.gz`,\n          Body: gzippedBody, \n          ContentType: 'text/plain',\n          ContentEncoding: 'gzip',\n          Metadata: {\n            'x-amz-meta-items': items.length.toString(),\n          },\n        }), {\n          noLog: true,\n        })\n      })(),\n  \n      // create athena partition\n      (async () => {\n        const athenaFullTableName = options.athenaFullTableName\n        if (!athenaFullTableName)\n          return\n          \n        const partitionDesc = options.dirPath\n          .split('/')\n          .map(part => part.split('='))\n          .filter(partParts => partParts.length === 2)\n          .map(([ field, value ])=> `${field} = '${value}'`)\n          .join(', ')\n\n        const existingPartition = this._addedPartitions.find(p => p === partitionDesc)\n        if (!existingPartition) {\n          const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`     \n          await this.executeAthenaQuery(query, 'add-athena-partition', { dontWaitForCompletion: true, noLog: true })\n          \n          // maintain partition cache\n          this._addedPartitions.unshift(partitionDesc)\n          if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE)\n            this._addedPartitions.pop()\n        }\n      })(),\n      \n    ])\n  }\n  \n  executeAthenaQuery = async (query: string, queryName: string, options: ?{ dontWaitForCompletion?: boolean, maxResults?: number, noLog?: boolean }): Promise<Array<Object>> => {\n    options = options || {}\n    const opts = {\n      dontWaitForCompletion: options.dontWaitForCompletion || false,\n      maxResults: options.maxResults || 0,\n      noLog: options.noLog || false,\n    }\n    \n    return await trackOp(async () => {\n      try {     \n        let results: Array<Object> = []\n        const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName)\n        if (opts.dontWaitForCompletion)\n          return []\n        for await (const items of asyncIterator) {\n          const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults\n          const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items\n          results = [ ...results, ...itemsToAdd ]\n          if (reachedMaxResults)\n            break\n        }\n        return results\n      } catch (err) {\n        error('Athena query execution failed', { queryName: queryName, query: query, err })\n        throw err\n      }\n    }, `athena-query-${queryName}`, null, { log: !opts.noLog })\n  }\n  \n  executeAthenaQueryAndGetResultIterator = async (query: string, queryName: string): Promise<AsyncGenerator<Array<Object>, void, any>> => {\n    try {\n      const awsConfig = System.getConfig().aws\n      if (!awsConfig)\n        throw new Error('aws config not set in system config')\n  \n      const startMS = Date.now()\n      const OUTPUT_LOCATION = `s3://aws-athena-query-results-${ awsConfig.accountId || throwUnsetArg('systemConfig.aws.accountId') }-${ awsConfig.defaultRegion || throwUnsetArg('systemConfig.aws.defaultRegion') }/${System.getConfig().env}/${queryName}/`\n  \n      // execute query\n      const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({\n        QueryString: query,\n        ResultConfiguration: {\n          OutputLocation: OUTPUT_LOCATION,\n        },\n      }))\n  \n      return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS)\n    } catch (err) {\n      error('Athena query execution failed', { queryName: queryName, query: query, err })\n      throw err\n    }\n  }\n}\n\n\n\nasync function* iterateAthenaQueryResults(awsClient: AWSClient, queryExecutionId: string, queryName: string, query: string, startMS: number): AsyncGenerator<Array<Object>, void, any> {\n  try {\n    // wait for query execution to end\n    let dataScannedInBytes = 0\n    while (true) {\n      const statusRes = await awsClient.runAthena(athena => athena.getQueryExecution({\n        QueryExecutionId: queryExecutionId,\n      }))\n      \n      const { State, StateChangeReason } = statusRes.result.QueryExecution.Status\n      if (State === 'SUCCEEDED') {\n        dataScannedInBytes = _.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes')\n        break\n      } else if (['QUEUED', 'RUNNING'].includes(State)) {\n        await sleep(500)\n      } else {\n        throw new ExtendedError('Athena query failed', { queryName: queryName, state: State, reason: StateChangeReason, query: query })\n      }\n    }\n\n    const kbScanned = Math.round(dataScannedInBytes / 1000)\n    const kbCharged = Math.max(kbScanned, 10 * 1000) // Athena charges a minimum of 10mb for every query\n    const costDollar = 5 * kbCharged / 1000000000 // Athena charges 5$ for a TB of data\n    noteCount(`${queryName}.kbScanned`, kbScanned)\n    noteCount(`${queryName}.kbCharged`, kbCharged)\n    noteCount(`${queryName}.dollarsCharged`, costDollar)\n\n    // fetch response\n    let nextToken: ?string = null\n    do {\n      const resultsRes = await awsClient.runAthena(athena => athena.getQueryResults({\n        QueryExecutionId: queryExecutionId,\n        MaxResults: Infinity,\n        NextToken: nextToken,\n      }))\n\n      const athenaResults = getAthenaQueryResultLines(resultsRes.result)\n      const newResultObjects = csvArrayToObjects(athenaResults)\n\n      yield newResultObjects\n\n      nextToken = resultsRes.result.NextToken      \n    } while (nextToken)\n\n    const durationMS = Date.now() - startMS\n    // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS })\n    noteTimer(`${queryName}.duration`, durationMS)\n  } catch (err) {\n    error('Athena query execution failed', { queryName: queryName, query: query, err })\n    throw err\n  }\n}\n\nfunction stripQuotes(input: *): * {\n  if (typeof input !== 'string')\n    return input\n  if (input === '')\n    return null\n  const length = input.length\n  if (length < 2 || !input.startsWith('\"') || !input.endsWith('\"'))\n    return input\n  return input.substr(1, length - 2)\n}\n\nfunction getAthenaQueryResultLines(res: Object): Array<Array<string>> {\n  const rolloutLines: Array<Array<string>> = []     \n  const numericTypeNames = ['integer', 'bigint', 'double']\n  const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo\n  rolloutLines.push(columnMetas.map(cm => cm.Name))\n  res.ResultSet.Rows\n    .splice(1) // skip header\n    .forEach(\n      row => rolloutLines.push(\n        row.Data.map(\n          (datum, i) => numericTypeNames\n            .includes(columnMetas[i].Type) ? \n            (datum.VarCharValue === undefined ? null : Number(datum.VarCharValue)) : \n            (datum.VarCharValue === undefined ? null : `\"${datum.VarCharValue}\"`)\n        )\n      )\n    )\n  const csvContent = rolloutLines\n  return csvContent\n}\n\nfunction csvArrayToObjects(csvArray: Array<Array<string>>) {\n  const propNames = csvArray[0]\n  const valueLines = csvArray.slice(1)\n  const objects = valueLines.map(valueLine => {\n    const obj = {}\n    valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value))\n    return obj\n  })\n  return objects\n}"]}
\No newline at end of file