UNPKG

8.51 kBJavaScriptView Raw
1// @flow
2
3import "@babel/polyfill"
4
5import _ from 'lodash'
6
7import { gzipCompress } from './compressionUtils'
8import { sleep } from './asyncUtils'
9import System from "./System"
10import Module from "./Module"
11import { ExtendedError, throwUnsetArg } from './errorUtils' // eslint-disable-line no-unused-vars
12
13import type AWSClient from './AWSClient'
14
15const { log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars
16
17const MAX_PARTITION_CACHE_SIZE = 1000
18
19export default class AthenaClient {
20
21 _awsClient: AWSClient
22 _addedPartitions: Array<string>
23
24
25 constructor(dependencies: { awsClient: AWSClient }) {
26 this._awsClient = dependencies.awsClient
27 this._addedPartitions = []
28 }
29
30
31 writeAthenaFile = async (items: Array<Object>, options: {|
32 bucket: string,
33 dirPath: string,
34 fileName: string,
35 athenaFullTableName?: ?string,
36 noLog?: ?boolean,
37 |}): Promise<void> => {
38 await Promise.all([
39
40 // persist data to S3
41 (async () => {
42 const gzippedBody = await gzipCompress(items.map(item => JSON.stringify(item)).join('\n'))
43 await this._awsClient.runS3(s3 => s3.putObject({
44 Bucket: options.bucket,
45 Key: `${options.dirPath}/${options.fileName}.jsons.gz`,
46 Body: gzippedBody,
47 ContentType: 'text/plain',
48 ContentEncoding: 'gzip',
49 Metadata: {
50 'x-amz-meta-items': items.length.toString(),
51 },
52 }), {
53 noLog: true,
54 })
55 })(),
56
57 // create athena partition
58 (async () => {
59 const athenaFullTableName = options.athenaFullTableName
60 if (!athenaFullTableName)
61 return
62
63 const partitionDesc = options.dirPath
64 .split('/')
65 .map(part => part.split('='))
66 .filter(partParts => partParts.length === 2)
67 .map(([ field, value ])=> `${field} = '${value}'`)
68 .join(', ')
69
70 const existingPartition = this._addedPartitions.find(p => p === partitionDesc)
71 if (!existingPartition) {
72 const query = `ALTER TABLE ${athenaFullTableName} ADD IF NOT EXISTS PARTITION (${partitionDesc}) LOCATION 's3://${options.bucket}/${options.dirPath}';`
73 await this.executeAthenaQuery(query, 'add-athena-partition', { dontWaitForCompletion: true, noLog: true })
74
75 // maintain partition cache
76 this._addedPartitions.unshift(partitionDesc)
77 if (this._addedPartitions.length > MAX_PARTITION_CACHE_SIZE)
78 this._addedPartitions.pop()
79 }
80 })(),
81
82 ])
83 }
84
85 executeAthenaQuery = async (query: string, queryName: string, options: ?{ dontWaitForCompletion?: boolean, maxResults?: number, noLog?: boolean }): Promise<Array<Object>> => {
86 options = options || {}
87 const opts = {
88 dontWaitForCompletion: options.dontWaitForCompletion || false,
89 maxResults: options.maxResults || 0,
90 noLog: options.noLog || false,
91 }
92
93 return await trackOp(async () => {
94 try {
95 let results: Array<Object> = []
96 const asyncIterator = await this.executeAthenaQueryAndGetResultIterator(query, queryName)
97 if (opts.dontWaitForCompletion)
98 return []
99 for await (const items of asyncIterator) {
100 const reachedMaxResults = opts.maxResults > 0 && results.length + items.length > opts.maxResults
101 const itemsToAdd = reachedMaxResults ? items.slice(0, opts.maxResults - results.length) : items
102 results = [ ...results, ...itemsToAdd ]
103 if (reachedMaxResults)
104 break
105 }
106 return results
107 } catch (err) {
108 error('Athena query execution failed', { queryName: queryName, query: query, err })
109 throw err
110 }
111 }, `athena-query-${queryName}`, null, { log: !opts.noLog })
112 }
113
114 executeAthenaQueryAndGetResultIterator = async (query: string, queryName: string): Promise<AsyncGenerator<Array<Object>, void, any>> => {
115 try {
116 const awsConfig = System.getConfig().aws
117 if (!awsConfig)
118 throw new Error('aws config not set in system config')
119
120 const startMS = Date.now()
121 const OUTPUT_LOCATION = `s3://aws-athena-query-results-${ awsConfig.accountId || throwUnsetArg('systemConfig.aws.accountId') }-${ awsConfig.defaultRegion || throwUnsetArg('systemConfig.aws.defaultRegion') }/${System.getConfig().env}/${queryName}/`
122
123 // execute query
124 const startRes = await this._awsClient.runAthena(athena => athena.startQueryExecution({
125 QueryString: query,
126 ResultConfiguration: {
127 OutputLocation: OUTPUT_LOCATION,
128 },
129 }))
130
131 return iterateAthenaQueryResults(this._awsClient, startRes.result.QueryExecutionId, queryName, query, startMS)
132 } catch (err) {
133 error('Athena query execution failed', { queryName: queryName, query: query, err })
134 throw err
135 }
136 }
137}
138
139
140
141async function* iterateAthenaQueryResults(awsClient: AWSClient, queryExecutionId: string, queryName: string, query: string, startMS: number): AsyncGenerator<Array<Object>, void, any> {
142 try {
143 // wait for query execution to end
144 let dataScannedInBytes = 0
145 while (true) {
146 const statusRes = await awsClient.runAthena(athena => athena.getQueryExecution({
147 QueryExecutionId: queryExecutionId,
148 }))
149
150 const { State, StateChangeReason } = statusRes.result.QueryExecution.Status
151 if (State === 'SUCCEEDED') {
152 dataScannedInBytes = _.get(statusRes.result.QueryExecution, 'Statistics.DataScannedInBytes')
153 break
154 } else if (['QUEUED', 'RUNNING'].includes(State)) {
155 await sleep(500)
156 } else {
157 throw new ExtendedError('Athena query failed', { queryName: queryName, state: State, reason: StateChangeReason, query: query })
158 }
159 }
160
161 const kbScanned = Math.round(dataScannedInBytes / 1000)
162 const kbCharged = Math.max(kbScanned, 10 * 1000) // Athena charges a minimum of 10mb for every query
163 const costDollar = 5 * kbCharged / 1000000000 // Athena charges 5$ for a TB of data
164 noteCount(`${queryName}.kbScanned`, kbScanned)
165 noteCount(`${queryName}.kbCharged`, kbCharged)
166 noteCount(`${queryName}.dollarsCharged`, costDollar)
167
168 // fetch response
169 let nextToken: ?string = null
170 do {
171 const resultsRes = await awsClient.runAthena(athena => athena.getQueryResults({
172 QueryExecutionId: queryExecutionId,
173 MaxResults: Infinity,
174 NextToken: nextToken,
175 }))
176
177 const athenaResults = getAthenaQueryResultLines(resultsRes.result)
178 const newResultObjects = csvArrayToObjects(athenaResults)
179
180 yield newResultObjects
181
182 nextToken = resultsRes.result.NextToken
183 } while (nextToken)
184
185 const durationMS = Date.now() - startMS
186 // log(`Athena query complete`, { queryName, kbScanned, kbCharged, costDollar, resultsLoaded: resultObjects.length, durationMS: durationMS })
187 noteTimer(`${queryName}.duration`, durationMS)
188 } catch (err) {
189 error('Athena query execution failed', { queryName: queryName, query: query, err })
190 throw err
191 }
192}
193
194function stripQuotes(input: *): * {
195 if (typeof input !== 'string')
196 return input
197 if (input === '')
198 return null
199 const length = input.length
200 if (length < 2 || !input.startsWith('"') || !input.endsWith('"'))
201 return input
202 return input.substr(1, length - 2)
203}
204
205function getAthenaQueryResultLines(res: Object): Array<Array<string>> {
206 const rolloutLines: Array<Array<string>> = []
207 const numericTypeNames = ['integer', 'bigint', 'double']
208 const columnMetas = res.ResultSet.ResultSetMetadata.ColumnInfo
209 rolloutLines.push(columnMetas.map(cm => cm.Name))
210 res.ResultSet.Rows
211 .splice(1) // skip header
212 .forEach(
213 row => rolloutLines.push(
214 row.Data.map(
215 (datum, i) => numericTypeNames
216 .includes(columnMetas[i].Type) ?
217 (datum.VarCharValue === undefined ? null : Number(datum.VarCharValue)) :
218 (datum.VarCharValue === undefined ? null : `"${datum.VarCharValue}"`)
219 )
220 )
221 )
222 const csvContent = rolloutLines
223 return csvContent
224}
225
226function csvArrayToObjects(csvArray: Array<Array<string>>) {
227 const propNames = csvArray[0]
228 const valueLines = csvArray.slice(1)
229 const objects = valueLines.map(valueLine => {
230 const obj = {}
231 valueLine.forEach((value, i) => obj[propNames[i]] = stripQuotes(value))
232 return obj
233 })
234 return objects
235}
\No newline at end of file