UNPKG

7.94 kBJavaScriptView Raw
1//@flow
2
3import AWS from 'aws-sdk'
4
5import System from './System'
6import Module from './Module'
7import { throwUnsetArg } from './errorUtils'
8
9const { MODULE_NAME, debug, log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars
10
11class AWSError extends Error {
12
13 message: string
14 code: string
15 time: string
16 statusCode: number
17 retryable: boolean
18 data: ?any
19
20
21 constructor(err: Error, opName: string, data: ?any) {
22 // $FlowIgnore
23 const message = `AWS Op '${opName}' threw '${err.code}': ${err.message}`
24
25 super(message)
26
27 this.message = message
28 // $FlowIgnore
29 this.code = err.code
30 // $FlowIgnore
31 this.time = err.time
32 // $FlowIgnore
33 this.statusCode = err.statusCode
34 // $FlowIgnore
35 this.retryable = err.retryable
36
37 this.data = data
38 }
39}
40
41type ClientConfig = {
42 region: string,
43}
44
45type AWSOpOptions = {
46 args?: ?Array<any>,
47 opName?: ?string,
48 region?: ?string,
49 errorHandlers?: ?{
50 [code: string]: {
51 type: 'ignore' | 'retry',
52 tries?: ?number,
53 action?: ?(err: AWSError) => Promise<void>,
54 }
55 },
56 noLog?: ?boolean,
57}
58
59export default class AWSClient {
60
61 constructor() {
62 const awsConfig = System.getConfig().aws
63 if (!awsConfig)
64 throw new Error('aws system config is unset')
65
66 AWS.config.region = awsConfig.defaultRegion
67
68 // support local execution
69 if (awsConfig.profile) {
70 log('Using profile', { profile: awsConfig.profile })
71 AWS.config.credentials = new AWS.SharedIniFileCredentials({ profile: awsConfig.profile })
72 }
73 }
74
75 async runSES<D, E> (
76 func: (service: AWS.SES) => AWS.Request<D, E>,
77 options: ?AWSOpOptions,
78 ): Promise<{ result: D, code?: ?string }> {
79 return await this._runOp(
80 'SES',
81 (clientConfig: ClientConfig) => new AWS.SES(clientConfig),
82 func,
83 options)
84 }
85
86 async runS3<D, E> (
87 func: (service: AWS.S3) => AWS.Request<D, E>,
88 options: ?AWSOpOptions,
89 ): Promise<{ result: D, code?: ?string }> {
90 return await this._runOp(
91 'S3',
92 (clientConfig: ClientConfig) => new AWS.S3(clientConfig),
93 func,
94 options)
95 }
96
97 async runDocumentClient<D, E> (
98 func: (service: AWS.DynamoDB.DocumentClient) => AWS.Request<D, E>,
99 options: ?AWSOpOptions,
100 ): Promise<{ result: D, code?: ?string }> {
101 return await this._runOp(
102 'DocumentClient',
103 (clientConfig: ClientConfig) => new AWS.DynamoDB.DocumentClient({ convertEmptyValues: true, ...clientConfig }),
104 func,
105 options)
106 }
107
108 async runDynamoDB<D, E> (
109 func: (service: AWS.DynamoDB) => AWS.Request<D, E>,
110 options: ?AWSOpOptions,
111 ): Promise<{ result: D, code?: ?string }> {
112 return await this._runOp(
113 'DynamoDB',
114 (clientConfig: ClientConfig) => new AWS.DynamoDB(clientConfig),
115 func,
116 options)
117 }
118
119 async runApplicationAutoScaling<D, E> (
120 func: (service: AWS.ApplicationAutoScaling) => AWS.Request<D, E>,
121 options: ?AWSOpOptions,
122 ): Promise<{ result: D, code?: ?string }> {
123 return await this._runOp(
124 'ApplicationAutoScaling',
125 (clientConfig: ClientConfig) => new AWS.ApplicationAutoScaling(clientConfig),
126 func,
127 options)
128 }
129
130 async runCloudFront<D, E> (
131 func: (service: AWS.CloudFront) => AWS.Request<D, E>,
132 options: ?AWSOpOptions,
133 ): Promise<{ result: D, code?: ?string }> {
134 return await this._runOp(
135 'CloudFront',
136 (clientConfig: ClientConfig) => new AWS.CloudFront(clientConfig),
137 func,
138 options)
139 }
140
141 async runAthena<D, E> (
142 func: (service: AWS.Athena) => AWS.Request<D, E>,
143 options: ?AWSOpOptions,
144 ): Promise<{ result: D, code?: ?string }> {
145 return await this._runOp(
146 'Athena',
147 (clientConfig: ClientConfig) => new AWS.Athena(clientConfig),
148 func,
149 options)
150 }
151
152 async runSQS<D, E> (
153 func: (service: AWS.SQS) => AWS.Request<D, E>,
154 options: ?AWSOpOptions,
155 ): Promise<{ result: D, code?: ?string }> {
156 return await this._runOp(
157 'SQS',
158 (clientConfig: ClientConfig) => new AWS.SQS(clientConfig),
159 func,
160 options)
161 }
162
163 async runCloudWatch<D, E> (
164 func: (service: AWS.SQS) => AWS.Request<D, E>,
165 options: ?AWSOpOptions,
166 ): Promise<{ result: D, code?: ?string }> {
167 return await this._runOp(
168 'CloudWatch',
169 (clientConfig: ClientConfig) => new AWS.CloudWatch(clientConfig),
170 func,
171 options)
172 }
173
174 async runFirehose<D, E> (
175 func: (service: AWS.SQS) => AWS.Request<D, E>,
176 options: ?AWSOpOptions,
177 ): Promise<{ result: D, code?: ?string }> {
178 return await this._runOp(
179 'Firehose',
180 (clientConfig: ClientConfig) => new AWS.Firehose(clientConfig),
181 func,
182 options)
183 }
184
185 async runES<D, E>(
186 func: (service: AWS.ES) => AWS.Request<D, E>,
187 options: ?AWSOpOptions,
188 ): Promise<{ result: D, code?: ?string }> {
189 return await this._runOp(
190 'Firehose',
191 (clientConfig: ClientConfig) => new AWS.ES(clientConfig),
192 func,
193 options)
194 }
195
196 async _runOp<D, E> (
197 serviceName: string,
198 serviceGenerator: (clientConfig: ClientConfig) => Object,
199 func: (service: Object) => AWS.Request<D, E>,
200 options: ?AWSOpOptions,
201 ): Promise<{ result: D, code?: ?string }> {
202 const awsConfig = System.getConfig().aws
203 if (!awsConfig)
204 throw new Error('aws config not set in system config')
205 const region = awsConfig.defaultRegion || (options || {}).region || throwUnsetArg('aws region')
206
207 options = options || {}
208 options.errorHandlers = options.errorHandlers || {}
209
210 const credentials = AWS.config.credentials
211 let start = 0
212 let opName = 'unknown-operation'
213 try {
214 // TODO: recreate only when creds expire?
215 const service = serviceGenerator({ credentials: credentials, region: region })
216 const req = func(service)
217 if (req === undefined)
218 throw new Error(`AWS runXXX method call on service '${serviceName}' is missing, did you user 'return'?`)
219 // $FlowIgnore
220 opName = options.opName || req['operation']
221
222 if (!options.noLog)
223 debug(`AWS operation started...`, {serviceName, opName} )
224
225 let result: D
226 let success = false
227 let code: ?string
228 do {
229 try {
230 start = Date.now()
231 result = await req.promise()
232 success = true
233 } catch (err) {
234 // $FlowIgnore
235 const errorHandler = options.errorHandlers[err.code]
236 if (!errorHandler)
237 throw err
238 switch (errorHandler.type) {
239 case 'retry':
240 try {
241 errorHandler.tries = errorHandler.tries || 1
242 if (errorHandler.tries-- > 0 && errorHandler.action) {
243 await errorHandler.action(err)
244 }
245 } catch (err2) {
246 err2.leadingError = err
247 throw err2
248 }
249 break
250 case 'ignore':
251 code = err.code
252 success = true
253 log(`${serviceName} op '${opName}' completed with an ignorable error code`, { code: code, ms: Date.now() - start })
254 break
255 default:
256 throw new Error(`Unexpected errorHandler type: '${errorHandler.type}'`)
257 }
258 }
259 } while (!success)
260
261 if (!options.noLog)
262 debug(`AWS operation completed`, { serviceName, opName, ms: Date.now() - start })
263 noteCount(`${MODULE_NAME}.${serviceName}.${opName}.success`, 1)
264 // $FlowIgnore
265 return {
266 result: result,
267 code: code,
268 }
269 } catch (err) {
270 log(`${serviceName} op '${opName}' failed`, { err, args: options.args })
271 noteCount(`${MODULE_NAME}.${serviceName}.${opName}.fail`, 1)
272 throw new AWSError(err, `${serviceName}.${opName}`, { args: options.args })
273 } finally {
274 noteTimer(`${MODULE_NAME}.${serviceName}.${opName}`, Date.now() - start)
275 }
276 }
277}
\No newline at end of file