1 |
|
2 |
|
3 | import "@babel/polyfill"
|
4 |
|
5 | import _ from 'lodash'
|
6 |
|
7 | import { gzipCompress } from './compressionUtils'
|
8 | import { sleep } from './asyncUtils'
|
9 | import System from "./System"
|
10 | import Module from "./Module"
|
11 | import { ExtendedError, throwUnsetArg } from './errorUtils'
|
12 |
|
13 | import type AWSClient from './AWSClient'
|
14 |
|
15 | const { log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename)
|
16 |
|
17 | const MAX_PARTITION_CACHE_SIZE = 1000
|
18 |
|
19 | export default class AthenaClient {
|
20 |
|
21 | _awsClient: AWSClient
|
22 | _addedPartitions: Array<string>
|
23 |
|
24 |
|
25 | constructor(dependencies: { awsClient: AWSClient }) {
|
26 | this._awsClient = dependencies.awsClient
|
27 | this._addedPartitions = []
|
28 | }
|
29 |
|
30 |
|
31 | writeAthenaFile = async (items: Array<Object>, options: {|
|
32 | bucket: string,
|
33 | dirPath: string,
|
34 | fileName: string,
|
35 | athenaFullTableName?: ?string,
|
36 | noLog?: ?boolean,
|
37 | |}): Promise<void> => {
|
38 | await Promise.all([
|
39 |
|
40 |
|
41 | (async () => {
|
42 | const gzippedBody = await gzipCompress(items.map(item => JSON.stringify(item)).join('\n'))
|
43 | await this._awsClient.runS3(s3 => s3.putObject({
|
44 | Bucket: options.bucket,
|
45 | Key: `${options.dirPath}/${options.fileName}.jsons.gz`,
|
46 | Body: gzippedBody,
|
47 | ContentType: 'text/plain',
|
48 | ContentEncoding: 'gzip',
|
49 | Metadata: {
|
50 | 'x-amz-meta-items': items.length.toString(),
|
51 | },
|
52 | }), {
|
53 | noLog: true,
|
54 | })
|
55 | })(),
|
56 |
|
57 |
|
58 | (async () => {
|
59 | const athenaFullTableName = options.athenaFullTableName
|
60 | if (!athenaFullTableName)
|
61 | return
|
62 |
|
63 | const partitionDesc = options.dirPath
|
64 | .split('/')
|
65 | .map(part => part.split('='))
|
66 | .filter(partParts => partParts.length === 2)
|
67 | .map(([ field, value ])=> `${field} = '${value}'`)
|
68 | .join(', ')
|
69 |
|
70 | const existingPartition = this._addedPartitions.find(p => p === partitionDesc)
|
71 | if (!existingPartition) {
|
72 | const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`
|
73 | await this.executeAthenaQuery(query, 'add-athena-partition', { dontWaitForCompletion: true, noLog: true })
|
74 |
|
75 |
|
76 | this._addedPartitions.unshift(partitionDesc)
|
77 | if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE)
|
78 | this._addedPartitions.pop()
|
79 | }
|
80 | })(),
|
81 |
|
82 | ])
|
83 | }
|
84 |
|
85 | executeAthenaQuery = async (query: string, queryName: string, options: ?{ dontWaitForCompletion?: boolean, maxResults?: number, noLog?: boolean }): Promise<Array<Object>> => {
|
86 | options = options || {}
|
87 | const opts = {
|
88 | dontWaitForCompletion: options.dontWaitForCompletion || false,
|
89 | maxResults: options.maxResults || 0,
|
90 | noLog: options.noLog || false,
|
91 | }
|
92 |
|
93 | return await trackOp(async () => {
|
94 | try {
|
95 | let results: Array<Object> = []
|
96 | const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName)
|
97 | if (opts.dontWaitForCompletion)
|
98 | return []
|
99 | for await (const items of asyncIterator) {
|
100 | const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults
|
101 | const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items
|
102 | results = [ ...results, ...itemsToAdd ]
|
103 | if (reachedMaxResults)
|
104 | break
|
105 | }
|
106 | return results
|
107 | } catch (err) {
|
108 | error('Athena query execution failed', { queryName: queryName, query: query, err })
|
109 | throw err
|
110 | }
|
111 | }, `athena-query-${queryName}`, null, { log: !opts.noLog })
|
112 | }
|
113 |
|
114 | executeAthenaQueryAndGetResultIterator = async (query: string, queryName: string): Promise<AsyncGenerator<Array<Object>, void, any>> => {
|
115 | try {
|
116 | const awsConfig = System.getConfig().aws
|
117 | if (!awsConfig)
|
118 | throw new Error('aws config not set in system config')
|
119 |
|
120 | const startMS = Date.now()
|
121 | const OUTPUT_LOCATION = `s3://aws-athena-query-results-${ awsConfig.accountId || throwUnsetArg('systemConfig.aws.accountId') }-${ awsConfig.defaultRegion || throwUnsetArg('systemConfig.aws.defaultRegion') }/${System.getConfig().env}/${queryName}/`
|
122 |
|
123 |
|
124 | const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({
|
125 | QueryString: query,
|
126 | ResultConfiguration: {
|
127 | OutputLocation: OUTPUT_LOCATION,
|
128 | },
|
129 | }))
|
130 |
|
131 | return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS)
|
132 | } catch (err) {
|
133 | error('Athena query execution failed', { queryName: queryName, query: query, err })
|
134 | throw err
|
135 | }
|
136 | }
|
137 | }
|
138 |
|
139 |
|
140 |
|
141 | async function* iterateAthenaQueryResults(awsClient: AWSClient, queryExecutionId: string, queryName: string, query: string, startMS: number): AsyncGenerator<Array<Object>, void, any> {
|
142 | try {
|
143 |
|
144 | let dataScannedInBytes = 0
|
145 | while (true) {
|
146 | const statusRes = await awsClient.runAthena(athena => athena.getQueryExecution({
|
147 | QueryExecutionId: queryExecutionId,
|
148 | }))
|
149 |
|
150 | const { State, StateChangeReason } = statusRes.result.QueryExecution.Status
|
151 | if (State === 'SUCCEEDED') {
|
152 | dataScannedInBytes = _.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes')
|
153 | break
|
154 | } else if (['QUEUED', 'RUNNING'].includes(State)) {
|
155 | await sleep(500)
|
156 | } else {
|
157 | throw new ExtendedError('Athena query failed', { queryName: queryName, state: State, reason: StateChangeReason, query: query })
|
158 | }
|
159 | }
|
160 |
|
161 | const kbScanned = Math.round(dataScannedInBytes / 1000)
|
162 | const kbCharged = Math.max(kbScanned, 10 * 1000)
|
163 | const costDollar = 5 * kbCharged / 1000000000
|
164 | noteCount(`${queryName}.kbScanned`, kbScanned)
|
165 | noteCount(`${queryName}.kbCharged`, kbCharged)
|
166 | noteCount(`${queryName}.dollarsCharged`, costDollar)
|
167 |
|
168 |
|
169 | let nextToken: ?string = null
|
170 | do {
|
171 | const resultsRes = await awsClient.runAthena(athena => athena.getQueryResults({
|
172 | QueryExecutionId: queryExecutionId,
|
173 | MaxResults: Infinity,
|
174 | NextToken: nextToken,
|
175 | }))
|
176 |
|
177 | const athenaResults = getAthenaQueryResultLines(resultsRes.result)
|
178 | const newResultObjects = csvArrayToObjects(athenaResults)
|
179 |
|
180 | yield newResultObjects
|
181 |
|
182 | nextToken = resultsRes.result.NextToken
|
183 | } while (nextToken)
|
184 |
|
185 | const durationMS = Date.now() - startMS
|
186 |
|
187 | noteTimer(`${queryName}.duration`, durationMS)
|
188 | } catch (err) {
|
189 | error('Athena query execution failed', { queryName: queryName, query: query, err })
|
190 | throw err
|
191 | }
|
192 | }
|
193 |
|
194 | function stripQuotes(input: *): * {
|
195 | if (typeof input !== 'string')
|
196 | return input
|
197 | if (input === '')
|
198 | return null
|
199 | const length = input.length
|
200 | if (length < 2 || !input.startsWith('"') || !input.endsWith('"'))
|
201 | return input
|
202 | return input.substr(1, length - 2)
|
203 | }
|
204 |
|
205 | function getAthenaQueryResultLines(res: Object): Array<Array<string>> {
|
206 | const rolloutLines: Array<Array<string>> = []
|
207 | const numericTypeNames = ['integer', 'bigint', 'double']
|
208 | const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo
|
209 | rolloutLines.push(columnMetas.map(cm => cm.Name))
|
210 | res.ResultSet.Rows
|
211 | .splice(1)
|
212 | .forEach(
|
213 | row => rolloutLines.push(
|
214 | row.Data.map(
|
215 | (datum, i) => numericTypeNames
|
216 | .includes(columnMetas[i].Type) ?
|
217 | (datum.VarCharValue === undefined ? null : Number(datum.VarCharValue)) :
|
218 | (datum.VarCharValue === undefined ? null : `"${datum.VarCharValue}"`)
|
219 | )
|
220 | )
|
221 | )
|
222 | const csvContent = rolloutLines
|
223 | return csvContent
|
224 | }
|
225 |
|
226 | function csvArrayToObjects(csvArray: Array<Array<string>>) {
|
227 | const propNames = csvArray[0]
|
228 | const valueLines = csvArray.slice(1)
|
229 | const objects = valueLines.map(valueLine => {
|
230 | const obj = {}
|
231 | valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value))
|
232 | return obj
|
233 | })
|
234 | return objects
|
235 | } |
\ | No newline at end of file |