UNPKG

3.13 kBPlain TextView Raw
1/*
2 * Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
5 * the License. A copy of the License is located at
6 *
7 * http://aws.amazon.com/apache2.0/
8 *
9 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
11 * and limitations under the License.
12 */
13
14import { ConsoleLogger as Logger } from '@aws-amplify/core';
15import { AWSKinesisProvider } from './AWSKinesisProvider';
16import {
17 PutRecordBatchCommand,
18 FirehoseClient,
19} from '@aws-sdk/client-firehose';
20import { fromUtf8 } from '@aws-sdk/util-utf8-browser';
21
22const logger = new Logger('AWSKineisFirehoseProvider');
23
24export class AWSKinesisFirehoseProvider extends AWSKinesisProvider {
25 private _kinesisFirehose: FirehoseClient;
26
27 constructor(config?) {
28 super(config);
29 }
30
31 /**
32 * get provider name of the plugin
33 */
34 public getProviderName(): string {
35 return 'AWSKinesisFirehose';
36 }
37
38 protected _sendEvents(group) {
39 if (group.length === 0) {
40 return;
41 }
42
43 const { config, credentials } = group[0];
44
45 const initClients = this._init(config, credentials);
46 if (!initClients) return false;
47
48 const records = {};
49
50 group.map(params => {
51 // split by streamName
52 const evt = params.event;
53 const { streamName, data } = evt;
54 if (records[streamName] === undefined) {
55 records[streamName] = [];
56 }
57
58 const bufferData =
59 data && typeof data !== 'string' ? JSON.stringify(data) : data;
60 const Data = fromUtf8(bufferData);
61 const record = { Data };
62 records[streamName].push(record);
63 });
64
65 Object.keys(records).map(streamName => {
66 logger.debug(
67 'putting records to kinesis',
68 streamName,
69 'with records',
70 records[streamName]
71 );
72
73 this._kinesisFirehose
74 .send(
75 new PutRecordBatchCommand({
76 Records: records[streamName],
77 DeliveryStreamName: streamName,
78 })
79 )
80 .then(res => logger.debug('Upload records to stream', streamName))
81 .catch(err => logger.debug('Failed to upload records to Kinesis', err));
82 });
83 }
84
85 protected _init(config, credentials) {
86 logger.debug('init clients');
87
88 if (
89 this._kinesisFirehose &&
90 this._config.credentials &&
91 this._config.credentials.sessionToken === credentials.sessionToken &&
92 this._config.credentials.identityId === credentials.identityId
93 ) {
94 logger.debug('no change for analytics config, directly return from init');
95 return true;
96 }
97
98 this._config.credentials = credentials;
99 const { region } = config;
100
101 return this._initFirehose(region, credentials);
102 }
103
104 private _initFirehose(region, credentials) {
105 logger.debug('initialize kinesis firehose with credentials', credentials);
106 this._kinesisFirehose = new FirehoseClient({
107 apiVersion: '2015-08-04',
108 region,
109 credentials,
110 });
111 return true;
112 }
113}
114
115/**
116 * @deprecated use named import
117 */
118export default AWSKinesisFirehoseProvider;