UNPKG

12 kBJavaScriptView Raw
1/*
2 * Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
5 * the License. A copy of the License is located at
6 *
7 * http://aws.amazon.com/apache2.0/
8 *
9 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
11 * and limitations under the License.
12 */
13var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
14 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
15 return new (P || (P = Promise))(function (resolve, reject) {
16 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
17 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
18 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
19 step((generator = generator.apply(thisArg, _arguments || [])).next());
20 });
21};
22var __generator = (this && this.__generator) || function (thisArg, body) {
23 var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
24 return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
25 function verb(n) { return function (v) { return step([n, v]); }; }
26 function step(op) {
27 if (f) throw new TypeError("Generator is already executing.");
28 while (_) try {
29 if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
30 if (y = 0, t) op = [op[0] & 2, t.value];
31 switch (op[0]) {
32 case 0: case 1: t = op; break;
33 case 4: _.label++; return { value: op[1], done: false };
34 case 5: _.label++; y = op[1]; op = [0]; continue;
35 case 7: op = _.ops.pop(); _.trys.pop(); continue;
36 default:
37 if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
38 if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
39 if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
40 if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
41 if (t[2]) _.ops.pop();
42 _.trys.pop(); continue;
43 }
44 op = body.call(thisArg, _);
45 } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
46 if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
47 }
48};
49import { ConsoleLogger as Logger, Credentials, getAmplifyUserAgent, } from '@aws-amplify/core';
50import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
51import { fromUtf8 } from '@aws-sdk/util-utf8-browser';
52var logger = new Logger('AWSKinesisProvider');
53// events buffer
54var BUFFER_SIZE = 1000;
55var FLUSH_SIZE = 100;
56var FLUSH_INTERVAL = 5 * 1000; // 5s
57var RESEND_LIMIT = 5;
58var AWSKinesisProvider = /** @class */ (function () {
59 function AWSKinesisProvider(config) {
60 this._buffer = [];
61 this._config = config || {};
62 this._config.bufferSize = this._config.bufferSize || BUFFER_SIZE;
63 this._config.flushSize = this._config.flushSize || FLUSH_SIZE;
64 this._config.flushInterval = this._config.flushInterval || FLUSH_INTERVAL;
65 this._config.resendLimit = this._config.resendLimit || RESEND_LIMIT;
66 this._setupTimer();
67 }
68 AWSKinesisProvider.prototype._setupTimer = function () {
69 var _this = this;
70 if (this._timer) {
71 clearInterval(this._timer);
72 }
73 var _a = this._config, flushSize = _a.flushSize, flushInterval = _a.flushInterval;
74 this._timer = setInterval(function () {
75 var size = _this._buffer.length < flushSize ? _this._buffer.length : flushSize;
76 var events = [];
77 for (var i = 0; i < size; i += 1) {
78 var params = _this._buffer.shift();
79 events.push(params);
80 }
81 _this._sendFromBuffer(events);
82 }, flushInterval);
83 };
84 /**
85 * get the category of the plugin
86 */
87 AWSKinesisProvider.prototype.getCategory = function () {
88 return 'Analytics';
89 };
90 /**
91 * get provider name of the plugin
92 */
93 AWSKinesisProvider.prototype.getProviderName = function () {
94 return 'AWSKinesis';
95 };
96 /**
97 * configure the plugin
98 * @param {Object} config - configuration
99 */
100 AWSKinesisProvider.prototype.configure = function (config) {
101 logger.debug('configure Analytics', config);
102 var conf = config || {};
103 this._config = Object.assign({}, this._config, conf);
104 this._setupTimer();
105 return this._config;
106 };
107 /**
108 * record an event
109 * @param {Object} params - the params of an event
110 */
111 AWSKinesisProvider.prototype.record = function (params) {
112 return __awaiter(this, void 0, void 0, function () {
113 var credentials;
114 return __generator(this, function (_a) {
115 switch (_a.label) {
116 case 0: return [4 /*yield*/, this._getCredentials()];
117 case 1:
118 credentials = _a.sent();
119 if (!credentials)
120 return [2 /*return*/, Promise.resolve(false)];
121 Object.assign(params, { config: this._config, credentials: credentials });
122 return [2 /*return*/, this._putToBuffer(params)];
123 }
124 });
125 });
126 };
127 AWSKinesisProvider.prototype.updateEndpoint = function () {
128 logger.debug('updateEndpoint is not implemented in Kinesis provider');
129 return Promise.resolve(true);
130 };
131 /**
132 * @private
133 * @param params - params for the event recording
134 * Put events into buffer
135 */
136 AWSKinesisProvider.prototype._putToBuffer = function (params) {
137 if (this._buffer.length < BUFFER_SIZE) {
138 this._buffer.push(params);
139 return Promise.resolve(true);
140 }
141 else {
142 logger.debug('exceed analytics events buffer size');
143 return Promise.reject(false);
144 }
145 };
146 AWSKinesisProvider.prototype._sendFromBuffer = function (events) {
147 var _this = this;
148 // collapse events by credentials
149 // events = [ {params} ]
150 var eventsGroups = [];
151 var preCred = null;
152 var group = [];
153 for (var i = 0; i < events.length; i += 1) {
154 var cred = events[i].credentials;
155 if (i === 0) {
156 group.push(events[i]);
157 preCred = cred;
158 }
159 else {
160 if (cred.sessionToken === preCred.sessionToken &&
161 cred.identityId === preCred.identityId) {
162 logger.debug('no change for cred, put event in the same group');
163 group.push(events[i]);
164 }
165 else {
166 eventsGroups.push(group);
167 group = [];
168 group.push(events[i]);
169 preCred = cred;
170 }
171 }
172 }
173 eventsGroups.push(group);
174 eventsGroups.map(function (evts) {
175 _this._sendEvents(evts);
176 });
177 };
178 AWSKinesisProvider.prototype._sendEvents = function (group) {
179 var _this = this;
180 if (group.length === 0) {
181 return;
182 }
183 var _a = group[0], config = _a.config, credentials = _a.credentials;
184 var initClients = this._init(config, credentials);
185 if (!initClients)
186 return false;
187 var records = {};
188 group.map(function (params) {
189 // spit by streamName
190 var evt = params.event;
191 var streamName = evt.streamName;
192 if (records[streamName] === undefined) {
193 records[streamName] = [];
194 }
195 var bufferData = evt.data && typeof evt.data !== 'string'
196 ? JSON.stringify(evt.data)
197 : evt.data;
198 var Data = fromUtf8(bufferData);
199 var PartitionKey = evt.partitionKey || 'partition-' + credentials.identityId;
200 var record = { Data: Data, PartitionKey: PartitionKey };
201 records[streamName].push(record);
202 });
203 Object.keys(records).map(function (streamName) { return __awaiter(_this, void 0, void 0, function () {
204 var command, err_1;
205 return __generator(this, function (_a) {
206 switch (_a.label) {
207 case 0:
208 logger.debug('putting records to kinesis with records', records[streamName]);
209 _a.label = 1;
210 case 1:
211 _a.trys.push([1, 3, , 4]);
212 command = new PutRecordsCommand({
213 Records: records[streamName],
214 StreamName: streamName,
215 });
216 return [4 /*yield*/, this._kinesis.send(command)];
217 case 2:
218 _a.sent();
219 logger.debug('Upload records to stream', streamName);
220 return [3 /*break*/, 4];
221 case 3:
222 err_1 = _a.sent();
223 logger.debug('Failed to upload records to Kinesis', err_1);
224 return [3 /*break*/, 4];
225 case 4: return [2 /*return*/];
226 }
227 });
228 }); });
229 };
230 AWSKinesisProvider.prototype._init = function (config, credentials) {
231 logger.debug('init clients');
232 if (this._kinesis &&
233 this._config.credentials &&
234 this._config.credentials.sessionToken === credentials.sessionToken &&
235 this._config.credentials.identityId === credentials.identityId) {
236 logger.debug('no change for analytics config, directly return from init');
237 return true;
238 }
239 this._config.credentials = credentials;
240 var region = config.region, endpoint = config.endpoint;
241 return this._initKinesis(region, endpoint, credentials);
242 };
243 AWSKinesisProvider.prototype._initKinesis = function (region, endpoint, credentials) {
244 logger.debug('initialize kinesis with credentials', credentials);
245 this._kinesis = new KinesisClient({
246 region: region,
247 credentials: credentials,
248 customUserAgent: getAmplifyUserAgent(),
249 endpoint: endpoint,
250 });
251 return true;
252 };
253 /**
254 * @private
255 * check if current credentials exists
256 */
257 AWSKinesisProvider.prototype._getCredentials = function () {
258 var _this = this;
259 return Credentials.get()
260 .then(function (credentials) {
261 if (!credentials)
262 return null;
263 logger.debug('set credentials for analytics', _this._config.credentials);
264 return Credentials.shear(credentials);
265 })
266 .catch(function (err) {
267 logger.debug('ensure credentials error', err);
268 return null;
269 });
270 };
271 return AWSKinesisProvider;
272}());
273export { AWSKinesisProvider };
274/**
275 * @deprecated use named import
276 */
277export default AWSKinesisProvider;
278//# sourceMappingURL=AWSKinesisProvider.js.map
\No newline at end of file