1 | ;
|
2 |
|
3 | Object.defineProperty(exports, "__esModule", {
|
4 | value: true
|
5 | });
|
6 | exports.default = void 0;
|
7 |
|
8 | require("@babel/polyfill");
|
9 |
|
10 | var _lodash = _interopRequireDefault(require("lodash"));
|
11 |
|
12 | var _compressionUtils = require("./compressionUtils");
|
13 |
|
14 | var _asyncUtils = require("./asyncUtils");
|
15 |
|
16 | var _System = _interopRequireDefault(require("./System"));
|
17 |
|
18 | var _Module = _interopRequireDefault(require("./Module"));
|
19 |
|
20 | var _errorUtils = require("./errorUtils");
|
21 |
|
22 | function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
|
23 |
|
24 | function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
|
25 |
|
26 | function _awaitAsyncGenerator(value) { return new _AwaitValue(value); }
|
27 |
|
28 | function _wrapAsyncGenerator(fn) { return function () { return new _AsyncGenerator(fn.apply(this, arguments)); }; }
|
29 |
|
30 | function _AsyncGenerator(gen) { var front, back; function send(key, arg) { return new Promise(function (resolve, reject) { var request = { key: key, arg: arg, resolve: resolve, reject: reject, next: null }; if (back) { back = back.next = request; } else { front = back = request; resume(key, arg); } }); } function resume(key, arg) { try { var result = gen[key](arg); var value = result.value; var wrappedAwait = value instanceof _AwaitValue; Promise.resolve(wrappedAwait ? value.wrapped : value).then(function (arg) { if (wrappedAwait) { resume("next", arg); return; } settle(result.done ? "return" : "normal", arg); }, function (err) { resume("throw", err); }); } catch (err) { settle("throw", err); } } function settle(type, value) { switch (type) { case "return": front.resolve({ value: value, done: true }); break; case "throw": front.reject(value); break; default: front.resolve({ value: value, done: false }); break; } front = front.next; if (front) { resume(front.key, front.arg); } else { back = null; } } this._invoke = send; if (typeof gen.return !== "function") { this.return = undefined; } }
|
31 |
|
32 | if (typeof Symbol === "function" && Symbol.asyncIterator) { _AsyncGenerator.prototype[Symbol.asyncIterator] = function () { return this; }; }
|
33 |
|
34 | _AsyncGenerator.prototype.next = function (arg) { return this._invoke("next", arg); };
|
35 |
|
36 | _AsyncGenerator.prototype.throw = function (arg) { return this._invoke("throw", arg); };
|
37 |
|
38 | _AsyncGenerator.prototype.return = function (arg) { return this._invoke("return", arg); };
|
39 |
|
40 | function _AwaitValue(value) { this.wrapped = value; }
|
41 |
|
42 | function _asyncIterator(iterable) { var method; if (typeof Symbol === "function") { if (Symbol.asyncIterator) { method = iterable[Symbol.asyncIterator]; if (method != null) return method.call(iterable); } if (Symbol.iterator) { method = iterable[Symbol.iterator]; if (method != null) return method.call(iterable); } } throw new TypeError("Object is not async iterable"); }
|
43 |
|
44 | const {
|
45 | log,
|
46 | warn,
|
47 | error,
|
48 | noteGauge,
|
49 | noteCount,
|
50 | noteTimer,
|
51 | trackOp
|
52 | } = new _Module.default(__filename); // eslint-disable-line no-unused-vars
|
53 |
|
54 | const MAX_PARTITION_CACHE_SIZE = 1000;
|
55 |
|
56 | class AthenaClient {
|
57 | constructor(dependencies) {
|
58 | _defineProperty(this, "_awsClient", void 0);
|
59 |
|
60 | _defineProperty(this, "_addedPartitions", void 0);
|
61 |
|
62 | _defineProperty(this, "writeAthenaFile", async (items, options) => {
|
63 | await Promise.all([// persist data to S3
|
64 | (async () => {
|
65 | const gzippedBody = await (0, _compressionUtils.gzipCompress)(items.map(item => JSON.stringify(item)).join('\n'));
|
66 | await this._awsClient.runS3(s3 => s3.putObject({
|
67 | Bucket: options.bucket,
|
68 | Key: `${options.dirPath}/${options.fileName}.jsons.gz`,
|
69 | Body: gzippedBody,
|
70 | ContentType: 'text/plain',
|
71 | ContentEncoding: 'gzip',
|
72 | Metadata: {
|
73 | 'x-amz-meta-items': items.length.toString()
|
74 | }
|
75 | }), {
|
76 | noLog: true
|
77 | });
|
78 | })(), // create athena partition
|
79 | (async () => {
|
80 | const athenaFullTableName = options.athenaFullTableName;
|
81 | if (!athenaFullTableName) return;
|
82 | const partitionDesc = options.dirPath.split('/').map(part => part.split('=')).filter(partParts => partParts.length === 2).map(([field, value]) => `${field} = '${value}'`).join(', ');
|
83 |
|
84 | const existingPartition = this._addedPartitions.find(p => p === partitionDesc);
|
85 |
|
86 | if (!existingPartition) {
|
87 | const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`;
|
88 | await this.executeAthenaQuery(query, 'add-athena-partition', {
|
89 | dontWaitForCompletion: true,
|
90 | noLog: true
|
91 | }); // maintain partition cache
|
92 |
|
93 | this._addedPartitions.unshift(partitionDesc);
|
94 |
|
95 | if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE) this._addedPartitions.pop();
|
96 | }
|
97 | })()]);
|
98 | });
|
99 |
|
100 | _defineProperty(this, "executeAthenaQuery", async (query, queryName, options) => {
|
101 | options = options || {};
|
102 | const opts = {
|
103 | dontWaitForCompletion: options.dontWaitForCompletion || false,
|
104 | maxResults: options.maxResults || 0,
|
105 | noLog: options.noLog || false
|
106 | };
|
107 | return await trackOp(async () => {
|
108 | try {
|
109 | let results = [];
|
110 | const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName);
|
111 | if (opts.dontWaitForCompletion) return [];
|
112 | var _iteratorNormalCompletion = true;
|
113 | var _didIteratorError = false;
|
114 |
|
115 | var _iteratorError;
|
116 |
|
117 | try {
|
118 | for (var _iterator = _asyncIterator(asyncIterator), _step, _value; _step = await _iterator.next(), _iteratorNormalCompletion = _step.done, _value = await _step.value, !_iteratorNormalCompletion; _iteratorNormalCompletion = true) {
|
119 | const items = _value;
|
120 | const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults;
|
121 | const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items;
|
122 | results = [...results, ...itemsToAdd];
|
123 | if (reachedMaxResults) break;
|
124 | }
|
125 | } catch (err) {
|
126 | _didIteratorError = true;
|
127 | _iteratorError = err;
|
128 | } finally {
|
129 | try {
|
130 | if (!_iteratorNormalCompletion && _iterator.return != null) {
|
131 | await _iterator.return();
|
132 | }
|
133 | } finally {
|
134 | if (_didIteratorError) {
|
135 | throw _iteratorError;
|
136 | }
|
137 | }
|
138 | }
|
139 |
|
140 | return results;
|
141 | } catch (err) {
|
142 | error('Athena query execution failed', {
|
143 | queryName: queryName,
|
144 | query: query,
|
145 | err
|
146 | });
|
147 | throw err;
|
148 | }
|
149 | }, `athena-query-${queryName}`, null, {
|
150 | log: !opts.noLog
|
151 | });
|
152 | });
|
153 |
|
154 | _defineProperty(this, "executeAthenaQueryAndGetResultIterator", async (query, queryName) => {
|
155 | try {
|
156 | const awsConfig = _System.default.getConfig().aws;
|
157 |
|
158 | if (!awsConfig) throw new Error('aws config not set in system config');
|
159 | const startMS = Date.now();
|
160 | const OUTPUT_LOCATION = `s3://aws-athena-query-results-${awsConfig.accountId || (0, _errorUtils.throwUnsetArg)('systemConfig.aws.accountId')}-${awsConfig.defaultRegion || (0, _errorUtils.throwUnsetArg)('systemConfig.aws.defaultRegion')}/${_System.default.getConfig().env}/${queryName}/`; // execute query
|
161 |
|
162 | const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({
|
163 | QueryString: query,
|
164 | ResultConfiguration: {
|
165 | OutputLocation: OUTPUT_LOCATION
|
166 | }
|
167 | }));
|
168 | return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS);
|
169 | } catch (err) {
|
170 | error('Athena query execution failed', {
|
171 | queryName: queryName,
|
172 | query: query,
|
173 | err
|
174 | });
|
175 | throw err;
|
176 | }
|
177 | });
|
178 |
|
179 | this._awsClient = dependencies.awsClient;
|
180 | this._addedPartitions = [];
|
181 | }
|
182 |
|
183 | }
|
184 |
|
185 | exports.default = AthenaClient;
|
186 |
|
187 | function iterateAthenaQueryResults(_x, _x2, _x3, _x4, _x5) {
|
188 | return _iterateAthenaQueryResults.apply(this, arguments);
|
189 | }
|
190 |
|
191 | function _iterateAthenaQueryResults() {
|
192 | _iterateAthenaQueryResults = _wrapAsyncGenerator(function* (awsClient, queryExecutionId, queryName, query, startMS) {
|
193 | try {
|
194 | // wait for query execution to end
|
195 | let dataScannedInBytes = 0;
|
196 |
|
197 | while (true) {
|
198 | const statusRes = yield _awaitAsyncGenerator(awsClient.runAthena(athena => athena.getQueryExecution({
|
199 | QueryExecutionId: queryExecutionId
|
200 | })));
|
201 | const {
|
202 | State,
|
203 | StateChangeReason
|
204 | } = statusRes.result.QueryExecution.Status;
|
205 |
|
206 | if (State === 'SUCCEEDED') {
|
207 | dataScannedInBytes = _lodash.default.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes');
|
208 | break;
|
209 | } else if (['QUEUED', 'RUNNING'].includes(State)) {
|
210 | yield _awaitAsyncGenerator((0, _asyncUtils.sleep)(500));
|
211 | } else {
|
212 | throw new _errorUtils.ExtendedError('Athena query failed', {
|
213 | queryName: queryName,
|
214 | state: State,
|
215 | reason: StateChangeReason,
|
216 | query: query
|
217 | });
|
218 | }
|
219 | }
|
220 |
|
221 | const kbScanned = Math.round(dataScannedInBytes / 1000);
|
222 | const kbCharged = Math.max(kbScanned, 10 * 1000); // Athena charges a minimum of 10mb for every query
|
223 |
|
224 | const costDollar = 5 * kbCharged / 1000000000; // Athena charges 5$ for a TB of data
|
225 |
|
226 | noteCount(`${queryName}.kbScanned`, kbScanned);
|
227 | noteCount(`${queryName}.kbCharged`, kbCharged);
|
228 | noteCount(`${queryName}.dollarsCharged`, costDollar); // fetch response
|
229 |
|
230 | let nextToken = null;
|
231 |
|
232 | do {
|
233 | const resultsRes = yield _awaitAsyncGenerator(awsClient.runAthena(athena => athena.getQueryResults({
|
234 | QueryExecutionId: queryExecutionId,
|
235 | MaxResults: Infinity,
|
236 | NextToken: nextToken
|
237 | })));
|
238 | const athenaResults = getAthenaQueryResultLines(resultsRes.result);
|
239 | const newResultObjects = csvArrayToObjects(athenaResults);
|
240 | yield newResultObjects;
|
241 | nextToken = resultsRes.result.NextToken;
|
242 | } while (nextToken);
|
243 |
|
244 | const durationMS = Date.now() - startMS; // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS })
|
245 |
|
246 | noteTimer(`${queryName}.duration`, durationMS);
|
247 | } catch (err) {
|
248 | error('Athena query execution failed', {
|
249 | queryName: queryName,
|
250 | query: query,
|
251 | err
|
252 | });
|
253 | throw err;
|
254 | }
|
255 | });
|
256 | return _iterateAthenaQueryResults.apply(this, arguments);
|
257 | }
|
258 |
|
259 | function stripQuotes(input) {
|
260 | if (typeof input !== 'string') return input;
|
261 | if (input === '') return null;
|
262 | const length = input.length;
|
263 | if (length < 2 || !input.startsWith('"') || !input.endsWith('"')) return input;
|
264 | return input.substr(1, length - 2);
|
265 | }
|
266 |
|
267 | function getAthenaQueryResultLines(res) {
|
268 | const rolloutLines = [];
|
269 | const numericTypeNames = ['integer', 'bigint', 'double'];
|
270 | const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo;
|
271 | rolloutLines.push(columnMetas.map(cm => cm.Name));
|
272 | res.ResultSet.Rows.splice(1) // skip header
|
273 | .forEach(row => rolloutLines.push(row.Data.map((datum, i) => numericTypeNames.includes(columnMetas[i].Type) ? datum.VarCharValue === undefined ? null : Number(datum.VarCharValue) : datum.VarCharValue === undefined ? null : `"${datum.VarCharValue}"`)));
|
274 | const csvContent = rolloutLines;
|
275 | return csvContent;
|
276 | }
|
277 |
|
278 | function csvArrayToObjects(csvArray) {
|
279 | const propNames = csvArray[0];
|
280 | const valueLines = csvArray.slice(1);
|
281 | const objects = valueLines.map(valueLine => {
|
282 | const obj = {};
|
283 | valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value));
|
284 | return obj;
|
285 | });
|
286 | return objects;
|
287 | }
|
288 | //# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"sources":["../src/AthenaClient.js"],"names":["log","warn","error","noteGauge","noteCount","noteTimer","trackOp","Module","__filename","MAX_PARTITION_CACHE_SIZE","AthenaClient","constructor","dependencies","items","options","Promise","all","gzippedBody","map","item","JSON","stringify","join","_awsClient","runS3","s3","putObject","Bucket","bucket","Key","dirPath","fileName","Body","ContentType","ContentEncoding","Metadata","length","toString","noLog","athenaFullTableName","partitionDesc","split","part","filter","partParts","field","value","existingPartition","_addedPartitions","find","p","query","executeAthenaQuery","dontWaitForCompletion","unshift","pop","queryName","opts","maxResults","results","asyncIterator","executeAthenaQueryAndGetResultIterator","reachedMaxResults","itemsToAdd","slice","err","awsConfig","System","getConfig","aws","Error","startMS","Date","now","OUTPUT_LOCATION","accountId","defaultRegion","env","startRes","runAthena","athena","startQueryExecution","QueryString","ResultConfiguration","OutputLocation","iterateAthenaQueryResults","result","QueryExecutionId","awsClient","queryExecutionId","dataScannedInBytes","statusRes","getQueryExecution","State","StateChangeReason","QueryExecution","Status","_","get","includes","ExtendedError","state","reason","kbScanned","Math","round","kbCharged","max","costDollar","nextToken","resultsRes","getQueryResults","MaxResults","Infinity","NextToken","athenaResults","getAthenaQueryResultLines","newResultObjects","csvArrayToObjects","durationMS","stripQuotes","input","startsWith","endsWith","substr","res","rolloutLines","numericTypeNames","columnMetas","ResultSet","ResultSetMetadata","ColumnInfo","push","cm","Name","Rows","splice","forEach","row","Data","datum","i","Type","VarCharValue","undefined","Number","csvContent","csvArray","propNames","valueLines","objects","valueLine","obj"],"mappings":";;;;;;;AAEA;;AAEA;;AAEA;;AACA;;AACA;;AACA;;AACA;;;;;;;;;;;;;;;;;;;;;;;;AAIA,MAAM;AAAEA,EAAAA,GAAF;AAAOC,EAAAA,IAAP;AAAaC,EAAAA,KAAb;AAAoBC,EAAAA,SAApB;AAA+BC,EAAAA,SAA/B;AAA0CC,EAAAA,SAA1C;AAAqDC,EAAAA;AAArD,IAAiE,IAAIC,eAAJ,CAAWC,UAAX,CAAvE,C,CAA8F;;AAE9F,MAAMC,wBAAwB,GAAG,IAAjC;;AAEe,MAAMC,YAAN,CAAmB;AAMhCC,EAAAA,WAAW,CAACC,YAAD,EAAyC;AAAA;;AAAA;;AAAA,6CAMlC,OAAOC,KAAP,EAA6BC,OAA7B,KAMI;AACpB,YAAMC,OAAO,CAACC,GAAR,CAAY,CAEhB;AACA,OAAC,YAAY;AACX,cAAMC,WAAW,GAAG,MAAM,oCAAaJ,KAAK,CAACK,GAAN,CAAUC,IAAI,IAAIC,IAAI,CAACC,SAAL,CAAeF,IAAf,CAAlB,EAAwCG,IAAxC,CAA6C,IAA7C,CAAb,CAA1B;AACA,cAAM,KAAKC,UAAL,CAAgBC,KAAhB,CAAsBC,EAAE,IAAIA,EAAE,CAACC,SAAH,CAAa;AAC7CC,UAAAA,MAAM,EAAEb,OAAO,CAACc,MAD6B;AAE7CC,UAAAA,GAAG,EAAG,GAAEf,OAAO,CAACgB,OAAQ,IAAGhB,OAAO,CAACiB,QAAS,WAFC;AAG7CC,UAAAA,IAAI,EAAEf,WAHuC;AAI7CgB,UAAAA,WAAW,EAAE,YAJgC;AAK7CC,UAAAA,eAAe,EAAE,MAL4B;AAM7CC,UAAAA,QAAQ,EAAE;AACR,gCAAoBtB,KAAK,CAACuB,MAAN,CAAaC,QAAb;AADZ;AANmC,SAAb,CAA5B,EASF;AACFC,UAAAA,KAAK,EAAE;AADL,SATE,CAAN;AAYD,OAdD,GAHgB,EAmBhB;AACA,OAAC,YAAY;AACX,cAAMC,mBAAmB,GAAGzB,OAAO,CAACyB,mBAApC;AACA,YAAI,CAACA,mBAAL,EACE;AAEF,cAAMC,aAAa,GAAG1B,OAAO,CAACgB,OAAR,CACnBW,KADmB,CACb,GADa,EAEnBvB,GAFmB,CAEfwB,IAAI,IAAIA,IAAI,CAACD,KAAL,CAAW,GAAX,CAFO,EAGnBE,MAHmB,CAGZC,SAAS,IAAIA,SAAS,CAACR,MAAV,KAAqB,CAHtB,EAInBlB,GAJmB,CAIf,CAAC,CAAE2B,KAAF,EAASC,KAAT,CAAD,KAAsB,GAAED,KAAM,OAAMC,KAAM,GAJ3B,EAKnBxB,IALmB,CAKd,IALc,CAAtB;;AAOA,cAAMyB,iBAAiB,GAAG,KAAKC,gBAAL,CAAsBC,IAAtB,CAA2BC,CAAC,IAAIA,CAAC,KAAKV,aAAtC,CAA1B;;AACA,YAAI,CAACO,iBAAL,EAAwB;AACtB,gBAAMI,KAAK,GAAI,eAAcZ,mBAAoB,iCAAgCC,aAAc,oBAAmB1B,OAAO,CAACc,MAAO,IAAGd,OAAO,CAACgB,OAAQ,IAApJ;AACA,gBAAM,KAAKsB,kBAAL,CAAwBD,KAAxB,EAA+B,sBAA/B,EAAuD;AAAEE,YAAAA,qBAAqB,EAAE,IAAzB;AAA+Bf,YAAAA,KAAK,EAAE;AAAtC,WAAvD,CAAN,CAFsB,CAItB;;AACA,eAAKU,gBAAL,CAAsBM,OAAtB,CAA8Bd,aAA9B;;AACA,cAAI,KAAKQ,gBAAL,CAAsBZ,MAAtB,GAA+B3B,wBAAnC,EACE,KAAKuC,gBAAL,CAAsBO,GAAtB;AACH;AACF,OAtBD,GApBgB,CAAZ,CAAN;AA6CD,KA1DmD;;AAAA,gDA4D/B,OAAOJ,KAAP,EAAsBK,SAAtB,EAAyC1C,OAAzC,KAAyJ;AAC5KA,MAAAA,OAAO,GAAGA,OAAO,IAAI,EAArB;AACA,YAAM2C,IAAI,GAAG;AACXJ,QAAAA,qBAAqB,EAAEvC,OAAO,CAACuC,qBAAR,IAAiC,KAD7C;AAEXK,QAAAA,UAAU,EAAE5C,OAAO,CAAC4C,UAAR,IAAsB,CAFvB;AAGXpB,QAAAA,KAAK,EAAExB,OAAO,CAACwB,KAAR,IAAiB;AAHb,OAAb;AAMA,aAAO,MAAMhC,OAAO,CAAC,YAAY;AAC/B,YAAI;AACF,cAAIqD,OAAsB,GAAG,EAA7B;AACA,gBAAMC,aAAa,GAAG,MAAM,KAAKC,sCAAL,CAA4CV,KAA5C,EAAmDK,SAAnD,CAA5B;AACA,cAAIC,IAAI,CAACJ,qBAAT,EACE,OAAO,EAAP;AAJA;AAAA;;AAAA;;AAAA;AAKF,gDAA0BO,aAA1B,oLAAyC;AAAA,oBAAxB/C,KAAwB;AACvC,oBAAMiD,iBAAiB,GAAGL,IAAI,CAACC,UAAL,GAAkB,CAAlB,IAAuBC,OAAO,CAACvB,MAAR,GAAiBvB,KAAK,CAACuB,MAAvB,GAAgCqB,IAAI,CAACC,UAAtF;AACA,oBAAMK,UAAU,GAAGD,iBAAiB,GAAGjD,KAAK,CAACmD,KAAN,CAAY,CAAZ,EAAeP,IAAI,CAACC,UAAL,GAAkBC,OAAO,CAACvB,MAAzC,CAAH,GAAsDvB,KAA1F;AACA8C,cAAAA,OAAO,GAAG,CAAE,GAAGA,OAAL,EAAc,GAAGI,UAAjB,CAAV;AACA,kBAAID,iBAAJ,EACE;AACH;AAXC;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;;AAYF,iBAAOH,OAAP;AACD,SAbD,CAaE,OAAOM,GAAP,EAAY;AACZ/D,UAAAA,KAAK,CAAC,+BAAD,EAAkC;AAAEsD,YAAAA,SAAS,EAAEA,SAAb;AAAwBL,YAAAA,KAAK,EAAEA,KAA/B;AAAsCc,YAAAA;AAAtC,WAAlC,CAAL;AACA,gBAAMA,GAAN;AACD;AACF,OAlBmB,EAkBhB,gBAAeT,SAAU,EAlBT,EAkBY,IAlBZ,EAkBkB;AAAExD,QAAAA,GAAG,EAAE,CAACyD,IAAI,CAACnB;AAAb,OAlBlB,CAApB;AAmBD,KAvFmD;;AAAA,oEAyFX,OAAOa,KAAP,EAAsBK,SAAtB,KAA+F;AACtI,UAAI;AACF,cAAMU,SAAS,GAAGC,gBAAOC,SAAP,GAAmBC,GAArC;;AACA,YAAI,CAACH,SAAL,EACE,MAAM,IAAII,KAAJ,CAAU,qCAAV,CAAN;AAEF,cAAMC,OAAO,GAAGC,IAAI,CAACC,GAAL,EAAhB;AACA,cAAMC,eAAe,GAAI,iCAAiCR,SAAS,CAACS,SAAV,IAAuB,+BAAc,4BAAd,CAA6C,IAAIT,SAAS,CAACU,aAAV,IAA2B,+BAAc,gCAAd,CAAiD,IAAGT,gBAAOC,SAAP,GAAmBS,GAAI,IAAGrB,SAAU,GAArP,CANE,CAQF;;AACA,cAAMsB,QAAQ,GAAG,MAAM,KAAKvD,UAAL,CAAgBwD,SAAhB,CAA0BC,MAAM,IAAIA,MAAM,CAACC,mBAAP,CAA2B;AACpFC,UAAAA,WAAW,EAAE/B,KADuE;AAEpFgC,UAAAA,mBAAmB,EAAE;AACnBC,YAAAA,cAAc,EAAEV;AADG;AAF+D,SAA3B,CAApC,CAAvB;AAOA,eAAOW,yBAAyB,CAAC,KAAK9D,UAAN,EAAkBuD,QAAQ,CAACQ,MAAT,CAAgBC,gBAAlC,EAAoD/B,SAApD,EAA+DL,KAA/D,EAAsEoB,OAAtE,CAAhC;AACD,OAjBD,CAiBE,OAAON,GAAP,EAAY;AACZ/D,QAAAA,KAAK,CAAC,+BAAD,EAAkC;AAAEsD,UAAAA,SAAS,EAAEA,SAAb;AAAwBL,UAAAA,KAAK,EAAEA,KAA/B;AAAsCc,UAAAA;AAAtC,SAAlC,CAAL;AACA,cAAMA,GAAN;AACD;AACF,KA/GmD;;AAClD,SAAK1C,UAAL,GAAkBX,YAAY,CAAC4E,SAA/B;AACA,SAAKxC,gBAAL,GAAwB,EAAxB;AACD;;AAT+B;;;;SA0HlBqC,yB;;;;;mDAAhB,WAA0CG,SAA1C,EAAgEC,gBAAhE,EAA0FjC,SAA1F,EAA6GL,KAA7G,EAA4HoB,OAA5H,EAAuL;AACrL,QAAI;AACF;AACA,UAAImB,kBAAkB,GAAG,CAAzB;;AACA,aAAO,IAAP,EAAa;AACX,cAAMC,SAAS,8BAASH,SAAS,CAACT,SAAV,CAAoBC,MAAM,IAAIA,MAAM,CAACY,iBAAP,CAAyB;AAC7EL,UAAAA,gBAAgB,EAAEE;AAD2D,SAAzB,CAA9B,CAAT,CAAf;AAIA,cAAM;AAAEI,UAAAA,KAAF;AAASC,UAAAA;AAAT,YAA+BH,SAAS,CAACL,MAAV,CAAiBS,cAAjB,CAAgCC,MAArE;;AACA,YAAIH,KAAK,KAAK,WAAd,EAA2B;AACzBH,UAAAA,kBAAkB,GAAGO,gBAAEC,GAAF,CAAMP,SAAS,CAACL,MAAV,CAAiBS,cAAvB,EAAuC,+BAAvC,CAArB;AACA;AACD,SAHD,MAGO,IAAI,CAAC,QAAD,EAAW,SAAX,EAAsBI,QAAtB,CAA+BN,KAA/B,CAAJ,EAA2C;AAChD,qCAAM,uBAAM,GAAN,CAAN;AACD,SAFM,MAEA;AACL,gBAAM,IAAIO,yBAAJ,CAAkB,qBAAlB,EAAyC;AAAE5C,YAAAA,SAAS,EAAEA,SAAb;AAAwB6C,YAAAA,KAAK,EAAER,KAA/B;AAAsCS,YAAAA,MAAM,EAAER,iBAA9C;AAAiE3C,YAAAA,KAAK,EAAEA;AAAxE,WAAzC,CAAN;AACD;AACF;;AAED,YAAMoD,SAAS,GAAGC,IAAI,CAACC,KAAL,CAAWf,kBAAkB,GAAG,IAAhC,CAAlB;AACA,YAAMgB,SAAS,GAAGF,IAAI,CAACG,GAAL,CAASJ,SAAT,EAAoB,KAAK,IAAzB,CAAlB,CApBE,CAoB+C;;AACjD,YAAMK,UAAU,GAAG,IAAIF,SAAJ,GAAgB,UAAnC,CArBE,CAqB4C;;AAC9CtG,MAAAA,SAAS,CAAE,GAAEoD,SAAU,YAAd,EAA2B+C,SAA3B,CAAT;AACAnG,MAAAA,SAAS,CAAE,GAAEoD,SAAU,YAAd,EAA2BkD,SAA3B,CAAT;AACAtG,MAAAA,SAAS,CAAE,GAAEoD,SAAU,iBAAd,EAAgCoD,UAAhC,CAAT,CAxBE,CA0BF;;AACA,UAAIC,SAAkB,GAAG,IAAzB;;AACA,SAAG;AACD,cAAMC,UAAU,8BAAStB,SAAS,CAACT,SAAV,CAAoBC,MAAM,IAAIA,MAAM,CAAC+B,eAAP,CAAuB;AAC5ExB,UAAAA,gBAAgB,EAAEE,gBAD0D;AAE5EuB,UAAAA,UAAU,EAAEC,QAFgE;AAG5EC,UAAAA,SAAS,EAAEL;AAHiE,SAAvB,CAA9B,CAAT,CAAhB;AAMA,cAAMM,aAAa,GAAGC,yBAAyB,CAACN,UAAU,CAACxB,MAAZ,CAA/C;AACA,cAAM+B,gBAAgB,GAAGC,iBAAiB,CAACH,aAAD,CAA1C;AAEA,cAAME,gBAAN;AAEAR,QAAAA,SAAS,GAAGC,UAAU,CAACxB,MAAX,CAAkB4B,SAA9B;AACD,OAbD,QAaSL,SAbT;;AAeA,YAAMU,UAAU,GAAG/C,IAAI,CAACC,GAAL,KAAaF,OAAhC,CA3CE,CA4CF;;AACAlE,MAAAA,SAAS,CAAE,GAAEmD,SAAU,WAAd,EAA0B+D,UAA1B,CAAT;AACD,KA9CD,CA8CE,OAAOtD,GAAP,EAAY;AACZ/D,MAAAA,KAAK,CAAC,+BAAD,EAAkC;AAAEsD,QAAAA,SAAS,EAAEA,SAAb;AAAwBL,QAAAA,KAAK,EAAEA,KAA/B;AAAsCc,QAAAA;AAAtC,OAAlC,CAAL;AACA,YAAMA,GAAN;AACD;AACF,G;;;;AAED,SAASuD,WAAT,CAAqBC,KAArB,EAAkC;AAChC,MAAI,OAAOA,KAAP,KAAiB,QAArB,EACE,OAAOA,KAAP;AACF,MAAIA,KAAK,KAAK,EAAd,EACE,OAAO,IAAP;AACF,QAAMrF,MAAM,GAAGqF,KAAK,CAACrF,MAArB;AACA,MAAIA,MAAM,GAAG,CAAT,IAAc,CAACqF,KAAK,CAACC,UAAN,CAAiB,GAAjB,CAAf,IAAwC,CAACD,KAAK,CAACE,QAAN,CAAe,GAAf,CAA7C,EACE,OAAOF,KAAP;AACF,SAAOA,KAAK,CAACG,MAAN,CAAa,CAAb,EAAgBxF,MAAM,GAAG,CAAzB,CAAP;AACD;;AAED,SAASgF,yBAAT,CAAmCS,GAAnC,EAAsE;AACpE,QAAMC,YAAkC,GAAG,EAA3C;AACA,QAAMC,gBAAgB,GAAG,CAAC,SAAD,EAAY,QAAZ,EAAsB,QAAtB,CAAzB;AACA,QAAMC,WAAW,GAAGH,GAAG,CAACI,SAAJ,CAAcC,iBAAd,CAAgCC,UAApD;AACAL,EAAAA,YAAY,CAACM,IAAb,CAAkBJ,WAAW,CAAC9G,GAAZ,CAAgBmH,EAAE,IAAIA,EAAE,CAACC,IAAzB,CAAlB;AACAT,EAAAA,GAAG,CAACI,SAAJ,CAAcM,IAAd,CACGC,MADH,CACU,CADV,EACa;AADb,GAEGC,OAFH,CAGIC,GAAG,IAAIZ,YAAY,CAACM,IAAb,CACLM,GAAG,CAACC,IAAJ,CAASzH,GAAT,CACE,CAAC0H,KAAD,EAAQC,CAAR,KAAcd,gBAAgB,CAC3B5B,QADW,CACF6B,WAAW,CAACa,CAAD,CAAX,CAAeC,IADb,IAEXF,KAAK,CAACG,YAAN,KAAuBC,SAAvB,GAAmC,IAAnC,GAA0CC,MAAM,CAACL,KAAK,CAACG,YAAP,CAFrC,GAGXH,KAAK,CAACG,YAAN,KAAuBC,SAAvB,GAAmC,IAAnC,GAA2C,IAAGJ,KAAK,CAACG,YAAa,GAJtE,CADK,CAHX;AAYA,QAAMG,UAAU,GAAGpB,YAAnB;AACA,SAAOoB,UAAP;AACD;;AAED,SAAS5B,iBAAT,CAA2B6B,QAA3B,EAA2D;AACzD,QAAMC,SAAS,GAAGD,QAAQ,CAAC,CAAD,CAA1B;AACA,QAAME,UAAU,GAAGF,QAAQ,CAACnF,KAAT,CAAe,CAAf,CAAnB;AACA,QAAMsF,OAAO,GAAGD,UAAU,CAACnI,GAAX,CAAeqI,SAAS,IAAI;AAC1C,UAAMC,GAAG,GAAG,EAAZ;AACAD,IAAAA,SAAS,CAACd,OAAV,CAAkB,CAAC3F,KAAD,EAAQ+F,CAAR,KAAcW,GAAG,CAACJ,SAAS,CAACP,CAAD,CAAV,CAAH,GAAoBrB,WAAW,CAAC1E,KAAD,CAA/D;AACA,WAAO0G,GAAP;AACD,GAJe,CAAhB;AAKA,SAAOF,OAAP;AACD","sourcesContent":["// @flow\n\nimport \"@babel/polyfill\"\n\nimport _ from 'lodash'\n\nimport { gzipCompress } from './compressionUtils'\nimport { sleep } from './asyncUtils'\nimport System from \"./System\"\nimport Module from \"./Module\"\nimport { ExtendedError, throwUnsetArg } from './errorUtils' // eslint-disable-line no-unused-vars\n\nimport type AWSClient from './AWSClient'\n\nconst { log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars\n\nconst MAX_PARTITION_CACHE_SIZE = 1000\n\nexport default class AthenaClient {\n\n  _awsClient: AWSClient\n  _addedPartitions: Array<string>\n\n\n  constructor(dependencies: { awsClient: AWSClient }) {\n    this._awsClient = dependencies.awsClient\n    this._addedPartitions = []\n  }\n\n\n  writeAthenaFile = async (items: Array<Object>, options: {|\n    bucket: string,\n    dirPath: string,\n    fileName: string,\n    athenaFullTableName?: ?string,\n    noLog?: ?boolean,\n  |}): Promise<void> => {\n    await Promise.all([\n  \n      // persist data to S3\n      (async () => {\n        const gzippedBody = await gzipCompress(items.map(item => JSON.stringify(item)).join('\\n'))\n        await this._awsClient.runS3(s3 => s3.putObject({\n          Bucket: options.bucket,\n          Key: `${options.dirPath}/${options.fileName}.jsons.gz`,\n          Body: gzippedBody, \n          ContentType: 'text/plain',\n          ContentEncoding: 'gzip',\n          Metadata: {\n            'x-amz-meta-items': items.length.toString(),\n          },\n        }), {\n          noLog: true,\n        })\n      })(),\n  \n      // create athena partition\n      (async () => {\n        const athenaFullTableName = options.athenaFullTableName\n        if (!athenaFullTableName)\n          return\n          \n        const partitionDesc = options.dirPath\n          .split('/')\n          .map(part => part.split('='))\n          .filter(partParts => partParts.length === 2)\n          .map(([ field, value ])=> `${field} = '${value}'`)\n          .join(', ')\n\n        const existingPartition = this._addedPartitions.find(p => p === partitionDesc)\n        if (!existingPartition) {\n          const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`     \n          await this.executeAthenaQuery(query, 'add-athena-partition', { dontWaitForCompletion: true, noLog: true })\n          \n          // maintain partition cache\n          this._addedPartitions.unshift(partitionDesc)\n          if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE)\n            this._addedPartitions.pop()\n        }\n      })(),\n      \n    ])\n  }\n  \n  executeAthenaQuery = async (query: string, queryName: string, options: ?{ dontWaitForCompletion?: boolean, maxResults?: number, noLog?: boolean }): Promise<Array<Object>> => {\n    options = options || {}\n    const opts = {\n      dontWaitForCompletion: options.dontWaitForCompletion || false,\n      maxResults: options.maxResults || 0,\n      noLog: options.noLog || false,\n    }\n    \n    return await trackOp(async () => {\n      try {     \n        let results: Array<Object> = []\n        const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName)\n        if (opts.dontWaitForCompletion)\n          return []\n        for await (const items of asyncIterator) {\n          const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults\n          const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items\n          results = [ ...results, ...itemsToAdd ]\n          if (reachedMaxResults)\n            break\n        }\n        return results\n      } catch (err) {\n        error('Athena query execution failed', { queryName: queryName, query: query, err })\n        throw err\n      }\n    }, `athena-query-${queryName}`, null, { log: !opts.noLog })\n  }\n  \n  executeAthenaQueryAndGetResultIterator = async (query: string, queryName: string): Promise<AsyncGenerator<Array<Object>, void, any>> => {\n    try {\n      const awsConfig = System.getConfig().aws\n      if (!awsConfig)\n        throw new Error('aws config not set in system config')\n  \n      const startMS = Date.now()\n      const OUTPUT_LOCATION = `s3://aws-athena-query-results-${ awsConfig.accountId || throwUnsetArg('systemConfig.aws.accountId') }-${ awsConfig.defaultRegion || throwUnsetArg('systemConfig.aws.defaultRegion') }/${System.getConfig().env}/${queryName}/`\n  \n      // execute query\n      const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({\n        QueryString: query,\n        ResultConfiguration: {\n          OutputLocation: OUTPUT_LOCATION,\n        },\n      }))\n  \n      return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS)\n    } catch (err) {\n      error('Athena query execution failed', { queryName: queryName, query: query, err })\n      throw err\n    }\n  }\n}\n\n\n\nasync function* iterateAthenaQueryResults(awsClient: AWSClient, queryExecutionId: string, queryName: string, query: string, startMS: number): AsyncGenerator<Array<Object>, void, any> {\n  try {\n    // wait for query execution to end\n    let dataScannedInBytes = 0\n    while (true) {\n      const statusRes = await awsClient.runAthena(athena => athena.getQueryExecution({\n        QueryExecutionId: queryExecutionId,\n      }))\n      \n      const { State, StateChangeReason } = statusRes.result.QueryExecution.Status\n      if (State === 'SUCCEEDED') {\n        dataScannedInBytes = _.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes')\n        break\n      } else if (['QUEUED', 'RUNNING'].includes(State)) {\n        await sleep(500)\n      } else {\n        throw new ExtendedError('Athena query failed', { queryName: queryName, state: State, reason: StateChangeReason, query: query })\n      }\n    }\n\n    const kbScanned = Math.round(dataScannedInBytes / 1000)\n    const kbCharged = Math.max(kbScanned, 10 * 1000) // Athena charges a minimum of 10mb for every query\n    const costDollar = 5 * kbCharged / 1000000000 // Athena charges 5$ for a TB of data\n    noteCount(`${queryName}.kbScanned`, kbScanned)\n    noteCount(`${queryName}.kbCharged`, kbCharged)\n    noteCount(`${queryName}.dollarsCharged`, costDollar)\n\n    // fetch response\n    let nextToken: ?string = null\n    do {\n      const resultsRes = await awsClient.runAthena(athena => athena.getQueryResults({\n        QueryExecutionId: queryExecutionId,\n        MaxResults: Infinity,\n        NextToken: nextToken,\n      }))\n\n      const athenaResults = getAthenaQueryResultLines(resultsRes.result)\n      const newResultObjects = csvArrayToObjects(athenaResults)\n\n      yield newResultObjects\n\n      nextToken = resultsRes.result.NextToken      \n    } while (nextToken)\n\n    const durationMS = Date.now() - startMS\n    // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS })\n    noteTimer(`${queryName}.duration`, durationMS)\n  } catch (err) {\n    error('Athena query execution failed', { queryName: queryName, query: query, err })\n    throw err\n  }\n}\n\nfunction stripQuotes(input: *): * {\n  if (typeof input !== 'string')\n    return input\n  if (input === '')\n    return null\n  const length = input.length\n  if (length < 2 || !input.startsWith('\"') || !input.endsWith('\"'))\n    return input\n  return input.substr(1, length - 2)\n}\n\nfunction getAthenaQueryResultLines(res: Object): Array<Array<string>> {\n  const rolloutLines: Array<Array<string>> = []     \n  const numericTypeNames = ['integer', 'bigint', 'double']\n  const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo\n  rolloutLines.push(columnMetas.map(cm => cm.Name))\n  res.ResultSet.Rows\n    .splice(1) // skip header\n    .forEach(\n      row => rolloutLines.push(\n        row.Data.map(\n          (datum, i) => numericTypeNames\n            .includes(columnMetas[i].Type) ? \n            (datum.VarCharValue === undefined ? null : Number(datum.VarCharValue)) : \n            (datum.VarCharValue === undefined ? null : `\"${datum.VarCharValue}\"`)\n        )\n      )\n    )\n  const csvContent = rolloutLines\n  return csvContent\n}\n\nfunction csvArrayToObjects(csvArray: Array<Array<string>>) {\n  const propNames = csvArray[0]\n  const valueLines = csvArray.slice(1)\n  const objects = valueLines.map(valueLine => {\n    const obj = {}\n    valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value))\n    return obj\n  })\n  return objects\n}"]} |
\ | No newline at end of file |