UNPKG

36.6 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = void 0;
7
8require("@babel/polyfill");
9
10var _lodash = _interopRequireDefault(require("lodash"));
11
12var _compressionUtils = require("./compressionUtils");
13
14var _asyncUtils = require("./asyncUtils");
15
16var _System = _interopRequireDefault(require("./System"));
17
18var _Module = _interopRequireDefault(require("./Module"));
19
20var _errorUtils = require("./errorUtils");
21
22function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
23
24function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
25
26function _awaitAsyncGenerator(value) { return new _AwaitValue(value); }
27
28function _wrapAsyncGenerator(fn) { return function () { return new _AsyncGenerator(fn.apply(this, arguments)); }; }
29
30function _AsyncGenerator(gen) { var front, back; function send(key, arg) { return new Promise(function (resolve, reject) { var request = { key: key, arg: arg, resolve: resolve, reject: reject, next: null }; if (back) { back = back.next = request; } else { front = back = request; resume(key, arg); } }); } function resume(key, arg) { try { var result = gen[key](arg); var value = result.value; var wrappedAwait = value instanceof _AwaitValue; Promise.resolve(wrappedAwait ? value.wrapped : value).then(function (arg) { if (wrappedAwait) { resume("next", arg); return; } settle(result.done ? "return" : "normal", arg); }, function (err) { resume("throw", err); }); } catch (err) { settle("throw", err); } } function settle(type, value) { switch (type) { case "return": front.resolve({ value: value, done: true }); break; case "throw": front.reject(value); break; default: front.resolve({ value: value, done: false }); break; } front = front.next; if (front) { resume(front.key, front.arg); } else { back = null; } } this._invoke = send; if (typeof gen.return !== "function") { this.return = undefined; } }
31
32if (typeof Symbol === "function" && Symbol.asyncIterator) { _AsyncGenerator.prototype[Symbol.asyncIterator] = function () { return this; }; }
33
34_AsyncGenerator.prototype.next = function (arg) { return this._invoke("next", arg); };
35
36_AsyncGenerator.prototype.throw = function (arg) { return this._invoke("throw", arg); };
37
38_AsyncGenerator.prototype.return = function (arg) { return this._invoke("return", arg); };
39
40function _AwaitValue(value) { this.wrapped = value; }
41
42function _asyncIterator(iterable) { var method; if (typeof Symbol === "function") { if (Symbol.asyncIterator) { method = iterable[Symbol.asyncIterator]; if (method != null) return method.call(iterable); } if (Symbol.iterator) { method = iterable[Symbol.iterator]; if (method != null) return method.call(iterable); } } throw new TypeError("Object is not async iterable"); }
43
44const {
45 log,
46 warn,
47 error,
48 noteGauge,
49 noteCount,
50 noteTimer,
51 trackOp
52} = new _Module.default(__filename); // eslint-disable-line no-unused-vars
53
54const MAX_PARTITION_CACHE_SIZE = 1000;
55
56class AthenaClient {
57 constructor(dependencies) {
58 _defineProperty(this, "_awsClient", void 0);
59
60 _defineProperty(this, "_addedPartitions", void 0);
61
62 _defineProperty(this, "writeAthenaFile", async (items, options) => {
63 await Promise.all([// persist data to S3
64 (async () => {
65 const gzippedBody = await (0, _compressionUtils.gzipCompress)(items.map(item => JSON.stringify(item)).join('\n'));
66 await this._awsClient.runS3(s3 => s3.putObject({
67 Bucket: options.bucket,
68 Key: `${options.dirPath}/${options.fileName}.jsons.gz`,
69 Body: gzippedBody,
70 ContentType: 'text/plain',
71 ContentEncoding: 'gzip',
72 Metadata: {
73 'x-amz-meta-items': items.length.toString()
74 }
75 }), {
76 noLog: true
77 });
78 })(), // create athena partition
79 (async () => {
80 const athenaFullTableName = options.athenaFullTableName;
81 if (!athenaFullTableName) return;
82 const partitionDesc = options.dirPath.split('/').map(part => part.split('=')).filter(partParts => partParts.length === 2).map(([field, value]) => `${field} = '${value}'`).join(', ');
83
84 const existingPartition = this._addedPartitions.find(p => p === partitionDesc);
85
86 if (!existingPartition) {
87 const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`;
88 await this.executeAthenaQuery(query, 'add-athena-partition', {
89 dontWaitForCompletion: true,
90 noLog: true
91 }); // maintain partition cache
92
93 this._addedPartitions.unshift(partitionDesc);
94
95 if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE) this._addedPartitions.pop();
96 }
97 })()]);
98 });
99
100 _defineProperty(this, "executeAthenaQuery", async (query, queryName, options) => {
101 options = options || {};
102 const opts = {
103 dontWaitForCompletion: options.dontWaitForCompletion || false,
104 maxResults: options.maxResults || 0,
105 noLog: options.noLog || false
106 };
107 return await trackOp(async () => {
108 try {
109 let results = [];
110 const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName);
111 if (opts.dontWaitForCompletion) return [];
112 var _iteratorNormalCompletion = true;
113 var _didIteratorError = false;
114
115 var _iteratorError;
116
117 try {
118 for (var _iterator = _asyncIterator(asyncIterator), _step, _value; _step = await _iterator.next(), _iteratorNormalCompletion = _step.done, _value = await _step.value, !_iteratorNormalCompletion; _iteratorNormalCompletion = true) {
119 const items = _value;
120 const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults;
121 const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items;
122 results = [...results, ...itemsToAdd];
123 if (reachedMaxResults) break;
124 }
125 } catch (err) {
126 _didIteratorError = true;
127 _iteratorError = err;
128 } finally {
129 try {
130 if (!_iteratorNormalCompletion && _iterator.return != null) {
131 await _iterator.return();
132 }
133 } finally {
134 if (_didIteratorError) {
135 throw _iteratorError;
136 }
137 }
138 }
139
140 return results;
141 } catch (err) {
142 error('Athena query execution failed', {
143 queryName: queryName,
144 query: query,
145 err
146 });
147 throw err;
148 }
149 }, `athena-query-${queryName}`, null, {
150 log: !opts.noLog
151 });
152 });
153
154 _defineProperty(this, "executeAthenaQueryAndGetResultIterator", async (query, queryName) => {
155 try {
156 const awsConfig = _System.default.getConfig().aws;
157
158 if (!awsConfig) throw new Error('aws config not set in system config');
159 const startMS = Date.now();
160 const OUTPUT_LOCATION = `s3://aws-athena-query-results-${awsConfig.accountId || (0, _errorUtils.throwUnsetArg)('systemConfig.aws.accountId')}-${awsConfig.defaultRegion || (0, _errorUtils.throwUnsetArg)('systemConfig.aws.defaultRegion')}/${_System.default.getConfig().env}/${queryName}/`; // execute query
161
162 const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({
163 QueryString: query,
164 ResultConfiguration: {
165 OutputLocation: OUTPUT_LOCATION
166 }
167 }));
168 return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS);
169 } catch (err) {
170 error('Athena query execution failed', {
171 queryName: queryName,
172 query: query,
173 err
174 });
175 throw err;
176 }
177 });
178
179 this._awsClient = dependencies.awsClient;
180 this._addedPartitions = [];
181 }
182
183}
184
185exports.default = AthenaClient;
186
187function iterateAthenaQueryResults(_x, _x2, _x3, _x4, _x5) {
188 return _iterateAthenaQueryResults.apply(this, arguments);
189}
190
191function _iterateAthenaQueryResults() {
192 _iterateAthenaQueryResults = _wrapAsyncGenerator(function* (awsClient, queryExecutionId, queryName, query, startMS) {
193 try {
194 // wait for query execution to end
195 let dataScannedInBytes = 0;
196
197 while (true) {
198 const statusRes = yield _awaitAsyncGenerator(awsClient.runAthena(athena => athena.getQueryExecution({
199 QueryExecutionId: queryExecutionId
200 })));
201 const {
202 State,
203 StateChangeReason
204 } = statusRes.result.QueryExecution.Status;
205
206 if (State === 'SUCCEEDED') {
207 dataScannedInBytes = _lodash.default.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes');
208 break;
209 } else if (['QUEUED', 'RUNNING'].includes(State)) {
210 yield _awaitAsyncGenerator((0, _asyncUtils.sleep)(500));
211 } else {
212 throw new _errorUtils.ExtendedError('Athena query failed', {
213 queryName: queryName,
214 state: State,
215 reason: StateChangeReason,
216 query: query
217 });
218 }
219 }
220
221 const kbScanned = Math.round(dataScannedInBytes / 1000);
222 const kbCharged = Math.max(kbScanned, 10 * 1000); // Athena charges a minimum of 10mb for every query
223
224 const costDollar = 5 * kbCharged / 1000000000; // Athena charges 5$ for a TB of data
225
226 noteCount(`${queryName}.kbScanned`, kbScanned);
227 noteCount(`${queryName}.kbCharged`, kbCharged);
228 noteCount(`${queryName}.dollarsCharged`, costDollar); // fetch response
229
230 let nextToken = null;
231
232 do {
233 const resultsRes = yield _awaitAsyncGenerator(awsClient.runAthena(athena => athena.getQueryResults({
234 QueryExecutionId: queryExecutionId,
235 MaxResults: Infinity,
236 NextToken: nextToken
237 })));
238 const athenaResults = getAthenaQueryResultLines(resultsRes.result);
239 const newResultObjects = csvArrayToObjects(athenaResults);
240 yield newResultObjects;
241 nextToken = resultsRes.result.NextToken;
242 } while (nextToken);
243
244 const durationMS = Date.now() - startMS; // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS })
245
246 noteTimer(`${queryName}.duration`, durationMS);
247 } catch (err) {
248 error('Athena query execution failed', {
249 queryName: queryName,
250 query: query,
251 err
252 });
253 throw err;
254 }
255 });
256 return _iterateAthenaQueryResults.apply(this, arguments);
257}
258
259function stripQuotes(input) {
260 if (typeof input !== 'string') return input;
261 if (input === '') return null;
262 const length = input.length;
263 if (length < 2 || !input.startsWith('"') || !input.endsWith('"')) return input;
264 return input.substr(1, length - 2);
265}
266
267function getAthenaQueryResultLines(res) {
268 const rolloutLines = [];
269 const numericTypeNames = ['integer', 'bigint', 'double'];
270 const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo;
271 rolloutLines.push(columnMetas.map(cm => cm.Name));
272 res.ResultSet.Rows.splice(1) // skip header
273 .forEach(row => rolloutLines.push(row.Data.map((datum, i) => numericTypeNames.includes(columnMetas[i].Type) ? datum.VarCharValue === undefined ? null : Number(datum.VarCharValue) : datum.VarCharValue === undefined ? null : `"${datum.VarCharValue}"`)));
274 const csvContent = rolloutLines;
275 return csvContent;
276}
277
278function csvArrayToObjects(csvArray) {
279 const propNames = csvArray[0];
280 const valueLines = csvArray.slice(1);
281 const objects = valueLines.map(valueLine => {
282 const obj = {};
283 valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value));
284 return obj;
285 });
286 return objects;
287}
288//# sourceMappingURL=data:application/json;charset=utf-8;base64,
\No newline at end of file