1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | import { ConsoleLogger as Logger } from '@aws-amplify/core';
|
15 | import { AWSKinesisProvider } from './AWSKinesisProvider';
|
16 | import {
|
17 | PutRecordBatchCommand,
|
18 | FirehoseClient,
|
19 | } from '@aws-sdk/client-firehose';
|
20 | import { fromUtf8 } from '@aws-sdk/util-utf8-browser';
|
21 |
|
22 | const logger = new Logger('AWSKineisFirehoseProvider');
|
23 |
|
24 | export class AWSKinesisFirehoseProvider extends AWSKinesisProvider {
|
25 | private _kinesisFirehose: FirehoseClient;
|
26 |
|
27 | constructor(config?) {
|
28 | super(config);
|
29 | }
|
30 |
|
31 | |
32 |
|
33 |
|
34 | public getProviderName(): string {
|
35 | return 'AWSKinesisFirehose';
|
36 | }
|
37 |
|
38 | protected _sendEvents(group) {
|
39 | if (group.length === 0) {
|
40 | return;
|
41 | }
|
42 |
|
43 | const { config, credentials } = group[0];
|
44 |
|
45 | const initClients = this._init(config, credentials);
|
46 | if (!initClients) return false;
|
47 |
|
48 | const records = {};
|
49 |
|
50 | group.map(params => {
|
51 |
|
52 | const evt = params.event;
|
53 | const { streamName, data } = evt;
|
54 | if (records[streamName] === undefined) {
|
55 | records[streamName] = [];
|
56 | }
|
57 |
|
58 | const bufferData =
|
59 | data && typeof data !== 'string' ? JSON.stringify(data) : data;
|
60 | const Data = fromUtf8(bufferData);
|
61 | const record = { Data };
|
62 | records[streamName].push(record);
|
63 | });
|
64 |
|
65 | Object.keys(records).map(streamName => {
|
66 | logger.debug(
|
67 | 'putting records to kinesis',
|
68 | streamName,
|
69 | 'with records',
|
70 | records[streamName]
|
71 | );
|
72 |
|
73 | this._kinesisFirehose
|
74 | .send(
|
75 | new PutRecordBatchCommand({
|
76 | Records: records[streamName],
|
77 | DeliveryStreamName: streamName,
|
78 | })
|
79 | )
|
80 | .then(res => logger.debug('Upload records to stream', streamName))
|
81 | .catch(err => logger.debug('Failed to upload records to Kinesis', err));
|
82 | });
|
83 | }
|
84 |
|
85 | protected _init(config, credentials) {
|
86 | logger.debug('init clients');
|
87 |
|
88 | if (
|
89 | this._kinesisFirehose &&
|
90 | this._config.credentials &&
|
91 | this._config.credentials.sessionToken === credentials.sessionToken &&
|
92 | this._config.credentials.identityId === credentials.identityId
|
93 | ) {
|
94 | logger.debug('no change for analytics config, directly return from init');
|
95 | return true;
|
96 | }
|
97 |
|
98 | this._config.credentials = credentials;
|
99 | const { region } = config;
|
100 |
|
101 | return this._initFirehose(region, credentials);
|
102 | }
|
103 |
|
104 | private _initFirehose(region, credentials) {
|
105 | logger.debug('initialize kinesis firehose with credentials', credentials);
|
106 | this._kinesisFirehose = new FirehoseClient({
|
107 | apiVersion: '2015-08-04',
|
108 | region,
|
109 | credentials,
|
110 | });
|
111 | return true;
|
112 | }
|
113 | }
|
114 |
|
115 |
|
116 |
|
117 |
|
118 | export default AWSKinesisFirehoseProvider;
|