1 | import { ConsoleLogger as Logger } from '@aws-amplify/core';
|
2 |
|
3 | import {
|
4 | PutEventsResponse,
|
5 | EventBuffer,
|
6 | EventObject,
|
7 | EventMap,
|
8 | } from '../types';
|
9 | import {
|
10 | PutEventsCommand,
|
11 | PutEventsCommandOutput,
|
12 | } from '@aws-sdk/client-pinpoint';
|
13 | import { isAppInForeground } from '../utils/AppUtils';
|
14 |
|
15 | const logger = new Logger('EventsBuffer');
|
16 | const RETRYABLE_CODES = [429, 500];
|
17 | const ACCEPTED_CODES = [202];
|
18 |
|
19 | type EventsBufferConfig = {
|
20 | bufferSize: number;
|
21 | flushSize: number;
|
22 | flushInterval: number;
|
23 | resendLimit: number;
|
24 | };
|
25 |
|
26 | export default class EventsBuffer {
|
27 | private _config;
|
28 | private _client;
|
29 | private _interval;
|
30 | private _buffer: EventBuffer;
|
31 | private _pause = false;
|
32 | private _flush = false;
|
33 |
|
34 | constructor(client, config: EventsBufferConfig) {
|
35 | logger.debug('Instantiating buffer with config:', config);
|
36 | this._buffer = [];
|
37 | this._client = client;
|
38 | this._config = config;
|
39 |
|
40 | this._sendBatch = this._sendBatch.bind(this);
|
41 |
|
42 | this._startLoop();
|
43 | }
|
44 |
|
45 | public push(event: EventObject) {
|
46 |
|
47 | if (this._buffer.length >= this._config.bufferSize) {
|
48 | logger.debug('Exceeded analytics events buffer size');
|
49 | return event.handlers.reject(
|
50 | new Error('Exceeded the size of analytics events buffer')
|
51 | );
|
52 | }
|
53 |
|
54 | const { eventId } = event.params.event;
|
55 | const bufferElement = { [eventId]: event };
|
56 | this._buffer.push(bufferElement);
|
57 | }
|
58 |
|
59 | public pause() {
|
60 | this._pause = true;
|
61 | }
|
62 |
|
63 | public resume() {
|
64 | this._pause = false;
|
65 | }
|
66 |
|
67 | public updateClient(client) {
|
68 | this._client = client;
|
69 | }
|
70 |
|
71 | public flush() {
|
72 | this._flush = true;
|
73 | }
|
74 |
|
75 | private _startLoop() {
|
76 | if (this._interval) {
|
77 | clearInterval(this._interval);
|
78 | }
|
79 |
|
80 | const { flushInterval } = this._config;
|
81 |
|
82 | this._interval = setInterval(this._sendBatch, flushInterval);
|
83 | }
|
84 |
|
85 | private _sendBatch() {
|
86 | const bufferLength = this._buffer.length;
|
87 |
|
88 | if (this._flush && !bufferLength) {
|
89 | clearInterval(this._interval);
|
90 | }
|
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 | if (this._pause || !bufferLength || !isAppInForeground()) {
|
97 | return;
|
98 | }
|
99 |
|
100 | const { flushSize } = this._config;
|
101 |
|
102 | const batchSize = Math.min(flushSize, bufferLength);
|
103 | const bufferSubset = this._buffer.splice(0, batchSize);
|
104 |
|
105 | this._putEvents(bufferSubset);
|
106 | }
|
107 |
|
108 | private async _putEvents(buffer: EventBuffer) {
|
109 | const eventMap: EventMap = this._bufferToMap(buffer);
|
110 | const batchEventParams = this._generateBatchEventParams(eventMap);
|
111 |
|
112 | try {
|
113 | const command: PutEventsCommand = new PutEventsCommand(batchEventParams);
|
114 | const data: PutEventsCommandOutput = await this._client.send(command);
|
115 | this._processPutEventsSuccessResponse(data, eventMap);
|
116 | } catch (err) {
|
117 | return this._handlePutEventsFailure(err, eventMap);
|
118 | }
|
119 | }
|
120 |
|
121 | private _generateBatchEventParams(eventMap: EventMap) {
|
122 | const batchEventParams = {
|
123 | ApplicationId: '',
|
124 | EventsRequest: {
|
125 | BatchItem: {},
|
126 | },
|
127 | };
|
128 |
|
129 | Object.values(eventMap).forEach(item => {
|
130 | const { params } = item;
|
131 | const { event, timestamp, config } = params;
|
132 | const { name, attributes, metrics, eventId, session } = event;
|
133 | const { appId, endpointId } = config;
|
134 |
|
135 | const batchItem = batchEventParams.EventsRequest.BatchItem;
|
136 |
|
137 | batchEventParams.ApplicationId = batchEventParams.ApplicationId || appId;
|
138 |
|
139 | if (!batchItem[endpointId]) {
|
140 | batchItem[endpointId] = {
|
141 | Endpoint: {},
|
142 | Events: {},
|
143 | };
|
144 | }
|
145 |
|
146 | batchItem[endpointId].Events[eventId] = {
|
147 | EventType: name,
|
148 | Timestamp: new Date(timestamp).toISOString(),
|
149 | Attributes: attributes,
|
150 | Metrics: metrics,
|
151 | Session: session,
|
152 | };
|
153 | });
|
154 |
|
155 | return batchEventParams;
|
156 | }
|
157 |
|
158 | private _handlePutEventsFailure(err, eventMap: EventMap) {
|
159 | logger.debug('_putEvents Failed: ', err);
|
160 | const statusCode = err.$metadata && err.$metadata.httpStatusCode;
|
161 |
|
162 | if (RETRYABLE_CODES.includes(statusCode)) {
|
163 | const retryableEvents = Object.values(eventMap);
|
164 | this._retry(retryableEvents);
|
165 | return;
|
166 | }
|
167 | }
|
168 |
|
169 | private _processPutEventsSuccessResponse(
|
170 | data: PutEventsResponse,
|
171 | eventMap: EventMap
|
172 | ) {
|
173 | const { Results } = data.EventsResponse;
|
174 | const retryableEvents: EventObject[] = [];
|
175 |
|
176 | Object.entries(Results).forEach(([endpointId, endpointValues]) => {
|
177 | const responses = endpointValues.EventsItemResponse;
|
178 |
|
179 | Object.entries(responses).forEach(
|
180 | ([eventId, { StatusCode, Message }]) => {
|
181 | const eventObject = eventMap[eventId];
|
182 |
|
183 |
|
184 | const response = {
|
185 | EventsResponse: {
|
186 | Results: {
|
187 | [endpointId]: {
|
188 | EventsItemResponse: {
|
189 | [eventId]: { StatusCode, Message },
|
190 | },
|
191 | },
|
192 | },
|
193 | },
|
194 | };
|
195 |
|
196 | if (ACCEPTED_CODES.includes(StatusCode)) {
|
197 | eventObject.handlers.resolve(response);
|
198 | return;
|
199 | }
|
200 |
|
201 | if (RETRYABLE_CODES.includes(StatusCode)) {
|
202 | retryableEvents.push(eventObject);
|
203 | return;
|
204 | }
|
205 |
|
206 | const { name } = eventObject.params.event;
|
207 |
|
208 | logger.error(
|
209 | `event ${eventId} : ${name} failed with error: ${Message}`
|
210 | );
|
211 | return eventObject.handlers.reject(response);
|
212 | }
|
213 | );
|
214 | });
|
215 |
|
216 | if (retryableEvents.length) {
|
217 | this._retry(retryableEvents);
|
218 | }
|
219 | }
|
220 |
|
221 | private _retry(retryableEvents: EventObject[]) {
|
222 |
|
223 | const eligibleEvents: EventBuffer = [];
|
224 |
|
225 | retryableEvents.forEach((event: EventObject) => {
|
226 | const { params } = event;
|
227 | const { eventId, name } = params.event;
|
228 |
|
229 | if (params.resendLimit-- > 0) {
|
230 | logger.debug(
|
231 | `resending event ${eventId} : ${name} with ${params.resendLimit} retry attempts remaining`
|
232 | );
|
233 | eligibleEvents.push({ [eventId]: event });
|
234 | return;
|
235 | }
|
236 |
|
237 | logger.debug(
|
238 | `no retry attempts remaining for event ${eventId} : ${name}`
|
239 | );
|
240 | });
|
241 |
|
242 |
|
243 | this._buffer.unshift(...eligibleEvents);
|
244 | }
|
245 |
|
246 |
|
247 |
|
248 | private _bufferToMap(buffer: EventBuffer) {
|
249 | return buffer.reduce((acc, curVal) => {
|
250 | const [[key, value]] = Object.entries(curVal);
|
251 | acc[key] = value;
|
252 | return acc;
|
253 | }, {});
|
254 | }
|
255 | }
|