//@flow import AWS from 'aws-sdk' import System from './System' import Module from './Module' import { throwUnsetArg } from './errorUtils' const { MODULE_NAME, debug, log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars class AWSError extends Error { message: string code: string time: string statusCode: number retryable: boolean data: ?any constructor(err: Error, opName: string, data: ?any) { // $FlowIgnore const message = `AWS Op '${opName}' threw '${err.code}': ${err.message}` super(message) this.message = message // $FlowIgnore this.code = err.code // $FlowIgnore this.time = err.time // $FlowIgnore this.statusCode = err.statusCode // $FlowIgnore this.retryable = err.retryable this.data = data } } type ClientConfig = { region: string, } type AWSOpOptions = { args?: ?Array, opName?: ?string, region?: ?string, errorHandlers?: ?{ [code: string]: { type: 'ignore' | 'retry', tries?: ?number, action?: ?(err: AWSError) => Promise, } }, noLog?: ?boolean, } export default class AWSClient { constructor() { const awsConfig = System.getConfig().aws if (!awsConfig) throw new Error('aws system config is unset') AWS.config.region = awsConfig.defaultRegion // support local execution if (awsConfig.profile) { log('Using profile', { profile: awsConfig.profile }) AWS.config.credentials = new AWS.SharedIniFileCredentials({ profile: awsConfig.profile }) } } async runSES ( func: (service: AWS.SES) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'SES', (clientConfig: ClientConfig) => new AWS.SES(clientConfig), func, options) } async runS3 ( func: (service: AWS.S3) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'S3', (clientConfig: ClientConfig) => new AWS.S3(clientConfig), func, options) } async runDocumentClient ( func: (service: AWS.DynamoDB.DocumentClient) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'DocumentClient', (clientConfig: ClientConfig) => new AWS.DynamoDB.DocumentClient({ convertEmptyValues: true, ...clientConfig }), func, options) } async runDynamoDB ( func: (service: AWS.DynamoDB) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'DynamoDB', (clientConfig: ClientConfig) => new AWS.DynamoDB(clientConfig), func, options) } async runApplicationAutoScaling ( func: (service: AWS.ApplicationAutoScaling) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'ApplicationAutoScaling', (clientConfig: ClientConfig) => new AWS.ApplicationAutoScaling(clientConfig), func, options) } async runCloudFront ( func: (service: AWS.CloudFront) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'CloudFront', (clientConfig: ClientConfig) => new AWS.CloudFront(clientConfig), func, options) } async runAthena ( func: (service: AWS.Athena) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'Athena', (clientConfig: ClientConfig) => new AWS.Athena(clientConfig), func, options) } async runSQS ( func: (service: AWS.SQS) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'SQS', (clientConfig: ClientConfig) => new AWS.SQS(clientConfig), func, options) } async runCloudWatch ( func: (service: AWS.SQS) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'CloudWatch', (clientConfig: ClientConfig) => new AWS.CloudWatch(clientConfig), func, options) } async runFirehose ( func: (service: AWS.SQS) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'Firehose', (clientConfig: ClientConfig) => new AWS.Firehose(clientConfig), func, options) } async runES( func: (service: AWS.ES) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { return await this._runOp( 'Firehose', (clientConfig: ClientConfig) => new AWS.ES(clientConfig), func, options) } async _runOp ( serviceName: string, serviceGenerator: (clientConfig: ClientConfig) => Object, func: (service: Object) => AWS.Request, options: ?AWSOpOptions, ): Promise<{ result: D, code?: ?string }> { const awsConfig = System.getConfig().aws if (!awsConfig) throw new Error('aws config not set in system config') const region = awsConfig.defaultRegion || (options || {}).region || throwUnsetArg('aws region') options = options || {} options.errorHandlers = options.errorHandlers || {} const credentials = AWS.config.credentials let start = 0 let opName = 'unknown-operation' try { // TODO: recreate only when creds expire? const service = serviceGenerator({ credentials: credentials, region: region }) const req = func(service) if (req === undefined) throw new Error(`AWS runXXX method call on service '${serviceName}' is missing, did you user 'return'?`) // $FlowIgnore opName = options.opName || req['operation'] if (!options.noLog) debug(`AWS operation started...`, {serviceName, opName} ) let result: D let success = false let code: ?string do { try { start = Date.now() result = await req.promise() success = true } catch (err) { // $FlowIgnore const errorHandler = options.errorHandlers[err.code] if (!errorHandler) throw err switch (errorHandler.type) { case 'retry': try { errorHandler.tries = errorHandler.tries || 1 if (errorHandler.tries-- > 0 && errorHandler.action) { await errorHandler.action(err) } } catch (err2) { err2.leadingError = err throw err2 } break case 'ignore': code = err.code success = true log(`${serviceName} op '${opName}' completed with an ignorable error code`, { code: code, ms: Date.now() - start }) break default: throw new Error(`Unexpected errorHandler type: '${errorHandler.type}'`) } } } while (!success) if (!options.noLog) debug(`AWS operation completed`, { serviceName, opName, ms: Date.now() - start }) noteCount(`${MODULE_NAME}.${serviceName}.${opName}.success`, 1) // $FlowIgnore return { result: result, code: code, } } catch (err) { log(`${serviceName} op '${opName}' failed`, { err, args: options.args }) noteCount(`${MODULE_NAME}.${serviceName}.${opName}.fail`, 1) throw new AWSError(err, `${serviceName}.${opName}`, { args: options.args }) } finally { noteTimer(`${MODULE_NAME}.${serviceName}.${opName}`, Date.now() - start) } } }