'use strict'; async function startQueryExecution(params) { const output = await params.athena.startQueryExecution({ QueryString: params.sql, ExecutionParameters: params.executionParameters, WorkGroup: params.workgroup || "primary", QueryExecutionContext: { Database: params.db || "default", Catalog: params.catalog }, ResultConfiguration: { OutputLocation: params.outputLocation } }); if (!output.QueryExecutionId) { throw new Error("No QueryExecutionId was responded."); } return output.QueryExecutionId; } async function waitExecutionCompleted(params) { const data = await params.athena.getQueryExecution({ QueryExecutionId: params.QueryExecutionId }); const state = data.QueryExecution?.Status?.State; const reason = data.QueryExecution?.Status?.StateChangeReason; if (state === "SUCCEEDED") { return; } else if (state === "FAILED") { throw new Error(reason); } else { await wait(200); await waitExecutionCompleted(params); } } async function getQueryResults(params) { const queryResults = await params.athena.getQueryResults({ QueryExecutionId: params.QueryExecutionId, MaxResults: params.MaxResults, NextToken: params.NextToken }); return { items: cleanUpPaginatedDML( queryResults, // If NextToken is not given, ignore first data. // Because the first data is header info. !params.NextToken ), nextToken: queryResults.NextToken }; } function cleanUpPaginatedDML(queryResults, ignoreFirstData) { const dataTypes = getDataTypes(queryResults); if (!dataTypes) return []; const columnNames = Object.keys(dataTypes); const items = queryResults.ResultSet?.Rows?.reduce((acc, { Data }, index) => { if (ignoreFirstData && index === 0) return acc; if (!Data) return acc; const rowObject = Data?.reduce( (acc2, row, index2) => { if (row.VarCharValue !== void 0 && row.VarCharValue !== null) { acc2[columnNames[index2]] = row.VarCharValue; } return acc2; }, {} ); acc.push(addDataType(rowObject, dataTypes)); return acc; }, []); return items ?? []; } function addDataType(input, dataTypes) { const updatedObjectWithDataType = {}; for (const key in input) { if (input[key] === null || input[key] === void 0) { updatedObjectWithDataType[key] = null; } else { switch (dataTypes[key]) { case "varchar": updatedObjectWithDataType[key] = input[key]; break; case "boolean": updatedObjectWithDataType[key] = JSON.parse(input[key].toLowerCase()); break; case "bigint": updatedObjectWithDataType[key] = BigInt(input[key]); break; case "integer": case "tinyint": case "smallint": case "int": case "float": case "double": updatedObjectWithDataType[key] = Number(input[key]); break; case "json": updatedObjectWithDataType[key] = JSON.parse(input[key]); break; default: updatedObjectWithDataType[key] = input[key]; } } } return updatedObjectWithDataType; } function getDataTypes(queryResults) { const columnInfoArray = queryResults.ResultSet?.ResultSetMetadata?.ColumnInfo; const columnInfoObject = columnInfoArray?.reduce( (acc, columnInfo) => { if (columnInfo.Name && columnInfo.Type) { acc[columnInfo.Name] = columnInfo.Type; } return acc; }, {} ); return columnInfoObject; } const wait = (ms) => new Promise((res) => setTimeout(res, ms)); class AthenaQuery { constructor(athena, options) { this.athena = athena; this.options = options; } /** * @see https://github.com/classmethod/athena-query#usage * * @param sql * @param options */ async *query(sql, options) { const QueryExecutionId = await startQueryExecution({ athena: this.athena, sql, executionParameters: options?.executionParameters?.map((param) => { const typeOfParam = typeof param; switch (typeOfParam) { case "bigint": case "number": return param.toString(); case "string": return `'${param}'`; default: throw new Error(`${typeOfParam} type is not allowed.`); } }), ...this.options }); await waitExecutionCompleted({ athena: this.athena, QueryExecutionId }); let nextToken; do { const queryResults = await getQueryResults({ athena: this.athena, NextToken: nextToken, MaxResults: options?.maxResults, QueryExecutionId }); yield* queryResults.items; nextToken = queryResults.nextToken; } while (nextToken); } } module.exports = AthenaQuery;