// @flow import "@babel/polyfill" import _ from 'lodash' import { gzipCompress } from './compressionUtils' import { sleep } from './asyncUtils' import System from "./System" import Module from "./Module" import { ExtendedError, throwUnsetArg } from './errorUtils' // eslint-disable-line no-unused-vars import type AWSClient from './AWSClient' const { log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars const MAX_PARTITION_CACHE_SIZE = 1000 export default class AthenaClient { _awsClient: AWSClient _addedPartitions: Array constructor(dependencies: { awsClient: AWSClient }) { this._awsClient = dependencies.awsClient this._addedPartitions = [] } writeAthenaFile = async (items: Array, options: {| bucket: string, dirPath: string, fileName: string, athenaFullTableName?: ?string, noLog?: ?boolean, |}): Promise => { await Promise.all([ // persist data to S3 (async () => { const gzippedBody = await gzipCompress(items.map(item => JSON.stringify(item)).join('\n')) await this._awsClient.runS3(s3 => s3.putObject({ Bucket: options.bucket, Key: `${options.dirPath}/${options.fileName}.jsons.gz`, Body: gzippedBody, ContentType: 'text/plain', ContentEncoding: 'gzip', Metadata: { 'x-amz-meta-items': items.length.toString(), }, }), { noLog: true, }) })(), // create athena partition (async () => { const athenaFullTableName = options.athenaFullTableName if (!athenaFullTableName) return const partitionDesc = options.dirPath .split('/') .map(part => part.split('=')) .filter(partParts => partParts.length === 2) .map(([ field, value ])=> `${field} = '${value}'`) .join(', ') const existingPartition = this._addedPartitions.find(p => p === partitionDesc) if (!existingPartition) { const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';` await this.executeAthenaQuery(query, 'add-athena-partition', { dontWaitForCompletion: true, noLog: true }) // maintain partition cache this._addedPartitions.unshift(partitionDesc) if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE) this._addedPartitions.pop() } })(), ]) } executeAthenaQuery = async (query: string, queryName: string, options: ?{ dontWaitForCompletion?: boolean, maxResults?: number, noLog?: boolean }): Promise> => { options = options || {} const opts = { dontWaitForCompletion: options.dontWaitForCompletion || false, maxResults: options.maxResults || 0, noLog: options.noLog || false, } return await trackOp(async () => { try { let results: Array = [] const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName) if (opts.dontWaitForCompletion) return [] for await (const items of asyncIterator) { const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items results = [ ...results, ...itemsToAdd ] if (reachedMaxResults) break } return results } catch (err) { error('Athena query execution failed', { queryName: queryName, query: query, err }) throw err } }, `athena-query-${queryName}`, null, { log: !opts.noLog }) } executeAthenaQueryAndGetResultIterator = async (query: string, queryName: string): Promise, void, any>> => { try { const awsConfig = System.getConfig().aws if (!awsConfig) throw new Error('aws config not set in system config') const startMS = Date.now() const OUTPUT_LOCATION = `s3://aws-athena-query-results-${ awsConfig.accountId || throwUnsetArg('systemConfig.aws.accountId') }-${ awsConfig.defaultRegion || throwUnsetArg('systemConfig.aws.defaultRegion') }/${System.getConfig().env}/${queryName}/` // execute query const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({ QueryString: query, ResultConfiguration: { OutputLocation: OUTPUT_LOCATION, }, })) return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS) } catch (err) { error('Athena query execution failed', { queryName: queryName, query: query, err }) throw err } } } async function* iterateAthenaQueryResults(awsClient: AWSClient, queryExecutionId: string, queryName: string, query: string, startMS: number): AsyncGenerator, void, any> { try { // wait for query execution to end let dataScannedInBytes = 0 while (true) { const statusRes = await awsClient.runAthena(athena => athena.getQueryExecution({ QueryExecutionId: queryExecutionId, })) const { State, StateChangeReason } = statusRes.result.QueryExecution.Status if (State === 'SUCCEEDED') { dataScannedInBytes = _.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes') break } else if (['QUEUED', 'RUNNING'].includes(State)) { await sleep(500) } else { throw new ExtendedError('Athena query failed', { queryName: queryName, state: State, reason: StateChangeReason, query: query }) } } const kbScanned = Math.round(dataScannedInBytes / 1000) const kbCharged = Math.max(kbScanned, 10 * 1000) // Athena charges a minimum of 10mb for every query const costDollar = 5 * kbCharged / 1000000000 // Athena charges 5$ for a TB of data noteCount(`${queryName}.kbScanned`, kbScanned) noteCount(`${queryName}.kbCharged`, kbCharged) noteCount(`${queryName}.dollarsCharged`, costDollar) // fetch response let nextToken: ?string = null do { const resultsRes = await awsClient.runAthena(athena => athena.getQueryResults({ QueryExecutionId: queryExecutionId, MaxResults: Infinity, NextToken: nextToken, })) const athenaResults = getAthenaQueryResultLines(resultsRes.result) const newResultObjects = csvArrayToObjects(athenaResults) yield newResultObjects nextToken = resultsRes.result.NextToken } while (nextToken) const durationMS = Date.now() - startMS // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS }) noteTimer(`${queryName}.duration`, durationMS) } catch (err) { error('Athena query execution failed', { queryName: queryName, query: query, err }) throw err } } function stripQuotes(input: *): * { if (typeof input !== 'string') return input if (input === '') return null const length = input.length if (length < 2 || !input.startsWith('"') || !input.endsWith('"')) return input return input.substr(1, length - 2) } function getAthenaQueryResultLines(res: Object): Array> { const rolloutLines: Array> = [] const numericTypeNames = ['integer', 'bigint', 'double'] const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo rolloutLines.push(columnMetas.map(cm => cm.Name)) res.ResultSet.Rows .splice(1) // skip header .forEach( row => rolloutLines.push( row.Data.map( (datum, i) => numericTypeNames .includes(columnMetas[i].Type) ? (datum.VarCharValue === undefined ? null : Number(datum.VarCharValue)) : (datum.VarCharValue === undefined ? null : `"${datum.VarCharValue}"`) ) ) ) const csvContent = rolloutLines return csvContent } function csvArrayToObjects(csvArray: Array>) { const propNames = csvArray[0] const valueLines = csvArray.slice(1) const objects = valueLines.map(valueLine => { const obj = {} valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value)) return obj }) return objects }