1 |
|
2 |
|
3 | import AWS from 'aws-sdk'
|
4 |
|
5 | import System from './System'
|
6 | import Module from './Module'
|
7 | import { throwUnsetArg } from './errorUtils'
|
8 |
|
9 | const { MODULE_NAME, debug, log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename)
|
10 |
|
11 | class AWSError extends Error {
|
12 |
|
13 | message: string
|
14 | code: string
|
15 | time: string
|
16 | statusCode: number
|
17 | retryable: boolean
|
18 | data: ?any
|
19 |
|
20 |
|
21 | constructor(err: Error, opName: string, data: ?any) {
|
22 |
|
23 | const message = `AWS Op '${opName}' threw '${err.code}': ${err.message}`
|
24 |
|
25 | super(message)
|
26 |
|
27 | this.message = message
|
28 |
|
29 | this.code = err.code
|
30 |
|
31 | this.time = err.time
|
32 |
|
33 | this.statusCode = err.statusCode
|
34 |
|
35 | this.retryable = err.retryable
|
36 |
|
37 | this.data = data
|
38 | }
|
39 | }
|
40 |
|
41 | type ClientConfig = {
|
42 | region: string,
|
43 | }
|
44 |
|
45 | type AWSOpOptions = {
|
46 | args?: ?Array<any>,
|
47 | opName?: ?string,
|
48 | region?: ?string,
|
49 | errorHandlers?: ?{
|
50 | [code: string]: {
|
51 | type: 'ignore' | 'retry',
|
52 | tries?: ?number,
|
53 | action?: ?(err: AWSError) => Promise<void>,
|
54 | }
|
55 | },
|
56 | noLog?: ?boolean,
|
57 | }
|
58 |
|
59 | export default class AWSClient {
|
60 |
|
61 | constructor() {
|
62 | const awsConfig = System.getConfig().aws
|
63 | if (!awsConfig)
|
64 | throw new Error('aws system config is unset')
|
65 |
|
66 | AWS.config.region = awsConfig.defaultRegion
|
67 |
|
68 |
|
69 | if (awsConfig.profile) {
|
70 | log('Using profile', { profile: awsConfig.profile })
|
71 | AWS.config.credentials = new AWS.SharedIniFileCredentials({ profile: awsConfig.profile })
|
72 | }
|
73 | }
|
74 |
|
75 | async runSES<D, E> (
|
76 | func: (service: AWS.SES) => AWS.Request<D, E>,
|
77 | options: ?AWSOpOptions,
|
78 | ): Promise<{ result: D, code?: ?string }> {
|
79 | return await this._runOp(
|
80 | 'SES',
|
81 | (clientConfig: ClientConfig) => new AWS.SES(clientConfig),
|
82 | func,
|
83 | options)
|
84 | }
|
85 |
|
86 | async runS3<D, E> (
|
87 | func: (service: AWS.S3) => AWS.Request<D, E>,
|
88 | options: ?AWSOpOptions,
|
89 | ): Promise<{ result: D, code?: ?string }> {
|
90 | return await this._runOp(
|
91 | 'S3',
|
92 | (clientConfig: ClientConfig) => new AWS.S3(clientConfig),
|
93 | func,
|
94 | options)
|
95 | }
|
96 |
|
97 | async runDocumentClient<D, E> (
|
98 | func: (service: AWS.DynamoDB.DocumentClient) => AWS.Request<D, E>,
|
99 | options: ?AWSOpOptions,
|
100 | ): Promise<{ result: D, code?: ?string }> {
|
101 | return await this._runOp(
|
102 | 'DocumentClient',
|
103 | (clientConfig: ClientConfig) => new AWS.DynamoDB.DocumentClient({ convertEmptyValues: true, ...clientConfig }),
|
104 | func,
|
105 | options)
|
106 | }
|
107 |
|
108 | async runDynamoDB<D, E> (
|
109 | func: (service: AWS.DynamoDB) => AWS.Request<D, E>,
|
110 | options: ?AWSOpOptions,
|
111 | ): Promise<{ result: D, code?: ?string }> {
|
112 | return await this._runOp(
|
113 | 'DynamoDB',
|
114 | (clientConfig: ClientConfig) => new AWS.DynamoDB(clientConfig),
|
115 | func,
|
116 | options)
|
117 | }
|
118 |
|
119 | async runApplicationAutoScaling<D, E> (
|
120 | func: (service: AWS.ApplicationAutoScaling) => AWS.Request<D, E>,
|
121 | options: ?AWSOpOptions,
|
122 | ): Promise<{ result: D, code?: ?string }> {
|
123 | return await this._runOp(
|
124 | 'ApplicationAutoScaling',
|
125 | (clientConfig: ClientConfig) => new AWS.ApplicationAutoScaling(clientConfig),
|
126 | func,
|
127 | options)
|
128 | }
|
129 |
|
130 | async runCloudFront<D, E> (
|
131 | func: (service: AWS.CloudFront) => AWS.Request<D, E>,
|
132 | options: ?AWSOpOptions,
|
133 | ): Promise<{ result: D, code?: ?string }> {
|
134 | return await this._runOp(
|
135 | 'CloudFront',
|
136 | (clientConfig: ClientConfig) => new AWS.CloudFront(clientConfig),
|
137 | func,
|
138 | options)
|
139 | }
|
140 |
|
141 | async runAthena<D, E> (
|
142 | func: (service: AWS.Athena) => AWS.Request<D, E>,
|
143 | options: ?AWSOpOptions,
|
144 | ): Promise<{ result: D, code?: ?string }> {
|
145 | return await this._runOp(
|
146 | 'Athena',
|
147 | (clientConfig: ClientConfig) => new AWS.Athena(clientConfig),
|
148 | func,
|
149 | options)
|
150 | }
|
151 |
|
152 | async runSQS<D, E> (
|
153 | func: (service: AWS.SQS) => AWS.Request<D, E>,
|
154 | options: ?AWSOpOptions,
|
155 | ): Promise<{ result: D, code?: ?string }> {
|
156 | return await this._runOp(
|
157 | 'SQS',
|
158 | (clientConfig: ClientConfig) => new AWS.SQS(clientConfig),
|
159 | func,
|
160 | options)
|
161 | }
|
162 |
|
163 | async runCloudWatch<D, E> (
|
164 | func: (service: AWS.SQS) => AWS.Request<D, E>,
|
165 | options: ?AWSOpOptions,
|
166 | ): Promise<{ result: D, code?: ?string }> {
|
167 | return await this._runOp(
|
168 | 'CloudWatch',
|
169 | (clientConfig: ClientConfig) => new AWS.CloudWatch(clientConfig),
|
170 | func,
|
171 | options)
|
172 | }
|
173 |
|
174 | async runFirehose<D, E> (
|
175 | func: (service: AWS.SQS) => AWS.Request<D, E>,
|
176 | options: ?AWSOpOptions,
|
177 | ): Promise<{ result: D, code?: ?string }> {
|
178 | return await this._runOp(
|
179 | 'Firehose',
|
180 | (clientConfig: ClientConfig) => new AWS.Firehose(clientConfig),
|
181 | func,
|
182 | options)
|
183 | }
|
184 |
|
185 | async runES<D, E>(
|
186 | func: (service: AWS.ES) => AWS.Request<D, E>,
|
187 | options: ?AWSOpOptions,
|
188 | ): Promise<{ result: D, code?: ?string }> {
|
189 | return await this._runOp(
|
190 | 'Firehose',
|
191 | (clientConfig: ClientConfig) => new AWS.ES(clientConfig),
|
192 | func,
|
193 | options)
|
194 | }
|
195 |
|
196 | async _runOp<D, E> (
|
197 | serviceName: string,
|
198 | serviceGenerator: (clientConfig: ClientConfig) => Object,
|
199 | func: (service: Object) => AWS.Request<D, E>,
|
200 | options: ?AWSOpOptions,
|
201 | ): Promise<{ result: D, code?: ?string }> {
|
202 | const awsConfig = System.getConfig().aws
|
203 | if (!awsConfig)
|
204 | throw new Error('aws config not set in system config')
|
205 | const region = awsConfig.defaultRegion || (options || {}).region || throwUnsetArg('aws region')
|
206 |
|
207 | options = options || {}
|
208 | options.errorHandlers = options.errorHandlers || {}
|
209 |
|
210 | const credentials = AWS.config.credentials
|
211 | let start = 0
|
212 | let opName = 'unknown-operation'
|
213 | try {
|
214 |
|
215 | const service = serviceGenerator({ credentials: credentials, region: region })
|
216 | const req = func(service)
|
217 | if (req === undefined)
|
218 | throw new Error(`AWS runXXX method call on service '${serviceName}' is missing, did you user 'return'?`)
|
219 |
|
220 | opName = options.opName || req['operation']
|
221 |
|
222 | if (!options.noLog)
|
223 | debug(`AWS operation started...`, {serviceName, opName} )
|
224 |
|
225 | let result: D
|
226 | let success = false
|
227 | let code: ?string
|
228 | do {
|
229 | try {
|
230 | start = Date.now()
|
231 | result = await req.promise()
|
232 | success = true
|
233 | } catch (err) {
|
234 |
|
235 | const errorHandler = options.errorHandlers[err.code]
|
236 | if (!errorHandler)
|
237 | throw err
|
238 | switch (errorHandler.type) {
|
239 | case 'retry':
|
240 | try {
|
241 | errorHandler.tries = errorHandler.tries || 1
|
242 | if (errorHandler.tries-- > 0 && errorHandler.action) {
|
243 | await errorHandler.action(err)
|
244 | }
|
245 | } catch (err2) {
|
246 | err2.leadingError = err
|
247 | throw err2
|
248 | }
|
249 | break
|
250 | case 'ignore':
|
251 | code = err.code
|
252 | success = true
|
253 | log(`${serviceName} op '${opName}' completed with an ignorable error code`, { code: code, ms: Date.now() - start })
|
254 | break
|
255 | default:
|
256 | throw new Error(`Unexpected errorHandler type: '${errorHandler.type}'`)
|
257 | }
|
258 | }
|
259 | } while (!success)
|
260 |
|
261 | if (!options.noLog)
|
262 | debug(`AWS operation completed`, { serviceName, opName, ms: Date.now() - start })
|
263 | noteCount(`${MODULE_NAME}.${serviceName}.${opName}.success`, 1)
|
264 |
|
265 | return {
|
266 | result: result,
|
267 | code: code,
|
268 | }
|
269 | } catch (err) {
|
270 | log(`${serviceName} op '${opName}' failed`, { err, args: options.args })
|
271 | noteCount(`${MODULE_NAME}.${serviceName}.${opName}.fail`, 1)
|
272 | throw new AWSError(err, `${serviceName}.${opName}`, { args: options.args })
|
273 | } finally {
|
274 | noteTimer(`${MODULE_NAME}.${serviceName}.${opName}`, Date.now() - start)
|
275 | }
|
276 | }
|
277 | } |
\ | No newline at end of file |