UNPKG

6.41 kBPlain TextView Raw
1import { ConsoleLogger as Logger } from '@aws-amplify/core';
2
3import {
4 PutEventsResponse,
5 EventBuffer,
6 EventObject,
7 EventMap,
8} from '../types';
9import {
10 PutEventsCommand,
11 PutEventsCommandOutput,
12} from '@aws-sdk/client-pinpoint';
13import { isAppInForeground } from '../utils/AppUtils';
14
15const logger = new Logger('EventsBuffer');
16const RETRYABLE_CODES = [429, 500];
17const ACCEPTED_CODES = [202];
18
19type EventsBufferConfig = {
20 bufferSize: number;
21 flushSize: number;
22 flushInterval: number;
23 resendLimit: number;
24};
25
26export default class EventsBuffer {
27 private _config;
28 private _client;
29 private _interval;
30 private _buffer: EventBuffer;
31 private _pause = false;
32 private _flush = false;
33
34 constructor(client, config: EventsBufferConfig) {
35 logger.debug('Instantiating buffer with config:', config);
36 this._buffer = [];
37 this._client = client;
38 this._config = config;
39
40 this._sendBatch = this._sendBatch.bind(this);
41
42 this._startLoop();
43 }
44
45 public push(event: EventObject) {
46 // if the buffer is currently at the configured limit, pushing would exceed it
47 if (this._buffer.length >= this._config.bufferSize) {
48 logger.debug('Exceeded analytics events buffer size');
49 return event.handlers.reject(
50 new Error('Exceeded the size of analytics events buffer')
51 );
52 }
53
54 const { eventId } = event.params.event;
55 const bufferElement = { [eventId]: event };
56 this._buffer.push(bufferElement);
57 }
58
59 public pause() {
60 this._pause = true;
61 }
62
63 public resume() {
64 this._pause = false;
65 }
66
67 public updateClient(client) {
68 this._client = client;
69 }
70
71 public flush() {
72 this._flush = true;
73 }
74
75 private _startLoop() {
76 if (this._interval) {
77 clearInterval(this._interval);
78 }
79
80 const { flushInterval } = this._config;
81
82 this._interval = setInterval(this._sendBatch, flushInterval);
83 }
84
85 private _sendBatch() {
86 const bufferLength = this._buffer.length;
87
88 if (this._flush && !bufferLength) {
89 clearInterval(this._interval);
90 }
91
92 // Do not send the batch of events if
93 // the Buffer is paused or is empty or the App is not in the foreground
94 // Apps should be in the foreground since
95 // the OS may restrict access to the network in the background
96 if (this._pause || !bufferLength || !isAppInForeground()) {
97 return;
98 }
99
100 const { flushSize } = this._config;
101
102 const batchSize = Math.min(flushSize, bufferLength);
103 const bufferSubset = this._buffer.splice(0, batchSize);
104
105 this._putEvents(bufferSubset);
106 }
107
108 private async _putEvents(buffer: EventBuffer) {
109 const eventMap: EventMap = this._bufferToMap(buffer);
110 const batchEventParams = this._generateBatchEventParams(eventMap);
111
112 try {
113 const command: PutEventsCommand = new PutEventsCommand(batchEventParams);
114 const data: PutEventsCommandOutput = await this._client.send(command);
115 this._processPutEventsSuccessResponse(data, eventMap);
116 } catch (err) {
117 return this._handlePutEventsFailure(err, eventMap);
118 }
119 }
120
121 private _generateBatchEventParams(eventMap: EventMap) {
122 const batchEventParams = {
123 ApplicationId: '',
124 EventsRequest: {
125 BatchItem: {},
126 },
127 };
128
129 Object.values(eventMap).forEach(item => {
130 const { params } = item;
131 const { event, timestamp, config } = params;
132 const { name, attributes, metrics, eventId, session } = event;
133 const { appId, endpointId } = config;
134
135 const batchItem = batchEventParams.EventsRequest.BatchItem;
136
137 batchEventParams.ApplicationId = batchEventParams.ApplicationId || appId;
138
139 if (!batchItem[endpointId]) {
140 batchItem[endpointId] = {
141 Endpoint: {},
142 Events: {},
143 };
144 }
145
146 batchItem[endpointId].Events[eventId] = {
147 EventType: name,
148 Timestamp: new Date(timestamp).toISOString(),
149 Attributes: attributes,
150 Metrics: metrics,
151 Session: session,
152 };
153 });
154
155 return batchEventParams;
156 }
157
158 private _handlePutEventsFailure(err, eventMap: EventMap) {
159 logger.debug('_putEvents Failed: ', err);
160 const statusCode = err.$metadata && err.$metadata.httpStatusCode;
161
162 if (RETRYABLE_CODES.includes(statusCode)) {
163 const retryableEvents = Object.values(eventMap);
164 this._retry(retryableEvents);
165 return;
166 }
167 }
168
169 private _processPutEventsSuccessResponse(
170 data: PutEventsResponse,
171 eventMap: EventMap
172 ) {
173 const { Results } = data.EventsResponse;
174 const retryableEvents: EventObject[] = [];
175
176 Object.entries(Results).forEach(([endpointId, endpointValues]) => {
177 const responses = endpointValues.EventsItemResponse;
178
179 Object.entries(responses).forEach(
180 ([eventId, { StatusCode, Message }]) => {
181 const eventObject = eventMap[eventId];
182
183 // manually crafting handlers response to keep API consistant
184 const response = {
185 EventsResponse: {
186 Results: {
187 [endpointId]: {
188 EventsItemResponse: {
189 [eventId]: { StatusCode, Message },
190 },
191 },
192 },
193 },
194 };
195
196 if (ACCEPTED_CODES.includes(StatusCode)) {
197 eventObject.handlers.resolve(response);
198 return;
199 }
200
201 if (RETRYABLE_CODES.includes(StatusCode)) {
202 retryableEvents.push(eventObject);
203 return;
204 }
205
206 const { name } = eventObject.params.event;
207
208 logger.error(
209 `event ${eventId} : ${name} failed with error: ${Message}`
210 );
211 return eventObject.handlers.reject(response);
212 }
213 );
214 });
215
216 if (retryableEvents.length) {
217 this._retry(retryableEvents);
218 }
219 }
220
221 private _retry(retryableEvents: EventObject[]) {
222 // retryable events that haven't reached the resendLimit
223 const eligibleEvents: EventBuffer = [];
224
225 retryableEvents.forEach((event: EventObject) => {
226 const { params } = event;
227 const { eventId, name } = params.event;
228
229 if (params.resendLimit-- > 0) {
230 logger.debug(
231 `resending event ${eventId} : ${name} with ${params.resendLimit} retry attempts remaining`
232 );
233 eligibleEvents.push({ [eventId]: event });
234 return;
235 }
236
237 logger.debug(
238 `no retry attempts remaining for event ${eventId} : ${name}`
239 );
240 });
241
242 // add the events to the front of the buffer
243 this._buffer.unshift(...eligibleEvents);
244 }
245
246 // convert buffer to map, i.e. { eventId1: { params, handler }, eventId2: { params, handlers } }
247 // this allows us to easily access the handlers after receiving a batch response
248 private _bufferToMap(buffer: EventBuffer) {
249 return buffer.reduce((acc, curVal) => {
250 const [[key, value]] = Object.entries(curVal);
251 acc[key] = value;
252 return acc;
253 }, {});
254 }
255}