UNPKG

19.6 kBPlain TextView Raw
1/*
2 * Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
5 * the License. A copy of the License is located at
6 *
7 * http://aws.amazon.com/apache2.0/
8 *
9 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
11 * and limitations under the License.
12 */
13
14import {
15 ConsoleLogger as Logger,
16 ClientDevice,
17 Credentials,
18 Signer,
19 JS,
20 Hub,
21 getAmplifyUserAgent,
22} from '@aws-amplify/core';
23import {
24 PinpointClient,
25 PutEventsCommand,
26 PutEventsCommandInput,
27 UpdateEndpointCommand,
28} from '@aws-sdk/client-pinpoint';
29import { EventsBatch } from '@aws-sdk/client-pinpoint/models';
30import Cache from '@aws-amplify/cache';
31
32import {
33 AnalyticsProvider,
34 PromiseHandlers,
35 EndpointBuffer,
36 EventParams,
37 EventObject,
38 EndpointFailureData,
39} from '../types';
40import { v1 as uuid } from 'uuid';
41import EventsBuffer from './EventBuffer';
42
43const AMPLIFY_SYMBOL = (typeof Symbol !== 'undefined' &&
44typeof Symbol.for === 'function'
45 ? Symbol.for('amplify_default')
46 : '@@amplify_default') as Symbol;
47
48const dispatchAnalyticsEvent = (event, data) => {
49 Hub.dispatch('analytics', { event, data }, 'Analytics', AMPLIFY_SYMBOL);
50};
51
52const logger = new Logger('AWSPinpointProvider');
53const RETRYABLE_CODES = [429, 500];
54const ACCEPTED_CODES = [202];
55const FORBIDDEN_CODE = 403;
56const MOBILE_SERVICE_NAME = 'mobiletargeting';
57const EXPIRED_TOKEN_CODE = 'ExpiredTokenException';
58const UPDATE_ENDPOINT = '_update_endpoint';
59const SESSION_START = '_session.start';
60const SESSION_STOP = '_session.stop';
61
62const BEACON_SUPPORTED =
63 typeof navigator !== 'undefined' &&
64 navigator &&
65 typeof navigator.sendBeacon === 'function';
66
67// events buffer
68const BUFFER_SIZE = 1000;
69const FLUSH_SIZE = 100;
70const FLUSH_INTERVAL = 5 * 1000; // 5s
71const RESEND_LIMIT = 5;
72
73// params: { event: {name: , .... }, timeStamp, config, resendLimits }
74export class AWSPinpointProvider implements AnalyticsProvider {
75 static category = 'Analytics';
76 static providerName = 'AWSPinpoint';
77
78 private _config;
79 private pinpointClient;
80 private _sessionId;
81 private _sessionStartTimestamp;
82 private _buffer: EventsBuffer;
83 private _endpointBuffer: EndpointBuffer;
84 private _clientInfo;
85 private _endpointGenerating = true;
86 private _endpointUpdateInProgress = false;
87
88 constructor(config?) {
89 this._buffer = null;
90 this._endpointBuffer = [];
91 this._config = config ? config : {};
92 this._config.bufferSize = this._config.bufferSize || BUFFER_SIZE;
93 this._config.flushSize = this._config.flushSize || FLUSH_SIZE;
94 this._config.flushInterval = this._config.flushInterval || FLUSH_INTERVAL;
95 this._config.resendLimit = this._config.resendLimit || RESEND_LIMIT;
96 this._clientInfo = ClientDevice.clientInfo();
97 }
98
99 /**
100 * get the category of the plugin
101 */
102 getCategory(): string {
103 return AWSPinpointProvider.category;
104 }
105
106 /**
107 * get provider name of the plugin
108 */
109 getProviderName(): string {
110 return AWSPinpointProvider.providerName;
111 }
112
113 /**
114 * configure the plugin
115 * @param {Object} config - configuration
116 */
117 public configure(config): object {
118 logger.debug('configure Analytics', config);
119 const conf = config || {};
120 this._config = Object.assign({}, this._config, conf);
121
122 // If autoSessionRecord is enabled, we need to wait for the endpoint to be
123 // updated before sending any events. See `sendEvents` in `Analytics.ts`
124 this._endpointGenerating = !!config['autoSessionRecord'];
125
126 if (this._config.appId && !this._config.disabled) {
127 if (!this._config.endpointId) {
128 const cacheKey = this.getProviderName() + '_' + this._config.appId;
129 this._getEndpointId(cacheKey)
130 .then(endpointId => {
131 logger.debug('setting endpoint id from the cache', endpointId);
132 this._config.endpointId = endpointId;
133 dispatchAnalyticsEvent('pinpointProvider_configured', null);
134 })
135 .catch(err => {
136 logger.debug('Failed to generate endpointId', err);
137 });
138 } else {
139 dispatchAnalyticsEvent('pinpointProvider_configured', null);
140 }
141 } else {
142 this._flushBuffer();
143 }
144 return this._config;
145 }
146
147 /**
148 * record an event
149 * @param {Object} params - the params of an event
150 */
151 public async record(params: EventParams, handlers: PromiseHandlers) {
152 logger.debug('_public record', params);
153 const credentials = await this._getCredentials();
154 if (!credentials || !this._config.appId || !this._config.region) {
155 logger.debug(
156 'cannot send events without credentials, applicationId or region'
157 );
158 return handlers.reject(
159 new Error('No credentials, applicationId or region')
160 );
161 }
162
163 this._initClients(credentials);
164
165 const timestamp = new Date().getTime();
166 // attach the session and eventId
167 this._generateSession(params);
168 params.event.eventId = uuid();
169
170 Object.assign(params, { timestamp, config: this._config });
171
172 if (params.event.immediate) {
173 return this._send(params, handlers);
174 } else {
175 this._putToBuffer(params, handlers);
176 }
177 }
178
179 private async _sendEndpointUpdate(endpointObject: EventObject) {
180 if (this._endpointUpdateInProgress) {
181 this._endpointBuffer.push(endpointObject);
182 return;
183 }
184
185 this._endpointUpdateInProgress = true;
186 await this._updateEndpoint(endpointObject);
187
188 const next = this._endpointBuffer.shift();
189 this._endpointUpdateInProgress = false;
190
191 next && this._sendEndpointUpdate(next);
192 }
193
194 /**
195 * @private
196 * @param params - params for event recording
197 * Put events into buffer
198 */
199 private _putToBuffer(params, handlers) {
200 if (params.event.name === UPDATE_ENDPOINT) {
201 this._sendEndpointUpdate({ params, handlers });
202 return;
203 }
204
205 this._buffer && this._buffer.push({ params, handlers });
206 }
207
208 private _generateSession(params) {
209 this._sessionId = this._sessionId || uuid();
210 const { event } = params;
211
212 switch (event.name) {
213 case SESSION_START:
214 // refresh the session id and session start time
215 this._sessionStartTimestamp = new Date().getTime();
216 this._sessionId = uuid();
217 event.session = {
218 Id: this._sessionId,
219 StartTimestamp: new Date(this._sessionStartTimestamp).toISOString(),
220 };
221 break;
222 case SESSION_STOP:
223 const stopTimestamp = new Date().getTime();
224 this._sessionStartTimestamp =
225 this._sessionStartTimestamp || new Date().getTime();
226 this._sessionId = this._sessionId || uuid();
227 event.session = {
228 Id: this._sessionId,
229 Duration: stopTimestamp - this._sessionStartTimestamp,
230 StartTimestamp: new Date(this._sessionStartTimestamp).toISOString(),
231 StopTimestamp: new Date(stopTimestamp).toISOString(),
232 };
233 this._sessionId = undefined;
234 this._sessionStartTimestamp = undefined;
235 break;
236 default:
237 this._sessionStartTimestamp =
238 this._sessionStartTimestamp || new Date().getTime();
239 this._sessionId = this._sessionId || uuid();
240 event.session = {
241 Id: this._sessionId,
242 StartTimestamp: new Date(this._sessionStartTimestamp).toISOString(),
243 };
244 }
245 }
246
247 private async _send(params, handlers) {
248 const { event } = params;
249
250 switch (event.name) {
251 case UPDATE_ENDPOINT:
252 return this._updateEndpoint({ params, handlers });
253 case SESSION_STOP:
254 return this._pinpointSendStopSession(params, handlers);
255 default:
256 return this._pinpointPutEvents(params, handlers);
257 }
258 }
259
260 private _generateBatchItemContext(params) {
261 const { event, timestamp, config } = params;
262 const { name, attributes, metrics, eventId, session } = event;
263 const { appId, endpointId } = config;
264
265 const endpointContext = {};
266
267 const eventParams: PutEventsCommandInput = {
268 ApplicationId: appId,
269 EventsRequest: {
270 BatchItem: {},
271 },
272 };
273
274 const endpointObj: EventsBatch = {} as EventsBatch;
275 endpointObj.Endpoint = endpointContext;
276 endpointObj.Events = {
277 [eventId]: {
278 EventType: name,
279 Timestamp: new Date(timestamp).toISOString(),
280 Attributes: attributes,
281 Metrics: metrics,
282 Session: session,
283 },
284 };
285 eventParams.EventsRequest.BatchItem[endpointId] = endpointObj;
286
287 return eventParams;
288 }
289
290 private async _pinpointPutEvents(params, handlers) {
291 const {
292 event: { eventId },
293 config: { endpointId },
294 } = params;
295 const eventParams = this._generateBatchItemContext(params);
296 const command: PutEventsCommand = new PutEventsCommand(eventParams);
297
298 try {
299 const data = await this.pinpointClient.send(command);
300 const {
301 EventsResponse: {
302 Results: {
303 [endpointId]: {
304 EventsItemResponse: {
305 [eventId]: { StatusCode, Message },
306 },
307 },
308 },
309 },
310 } = data;
311 if (ACCEPTED_CODES.includes(StatusCode)) {
312 logger.debug('record event success. ', data);
313 return handlers.resolve(data);
314 } else {
315 if (RETRYABLE_CODES.includes(StatusCode)) {
316 this._retry(params, handlers);
317 } else {
318 logger.error(
319 `Event ${eventId} is not accepted, the error is ${Message}`
320 );
321 return handlers.reject(data);
322 }
323 }
324 } catch (err) {
325 this._eventError(err);
326 return handlers.reject(err);
327 }
328 }
329
330 private _pinpointSendStopSession(params, handlers): Promise<string> {
331 if (!BEACON_SUPPORTED) {
332 this._pinpointPutEvents(params, handlers);
333 return;
334 }
335
336 const eventParams = this._generateBatchItemContext(params);
337
338 const { region } = this._config;
339 const { ApplicationId, EventsRequest } = eventParams;
340
341 const accessInfo = {
342 secret_key: this._config.credentials.secretAccessKey,
343 access_key: this._config.credentials.accessKeyId,
344 session_token: this._config.credentials.sessionToken,
345 };
346
347 const url = `https://pinpoint.${region}.amazonaws.com/v1/apps/${ApplicationId}/events/legacy`;
348 const body = JSON.stringify(EventsRequest);
349 const method = 'POST';
350
351 const request = {
352 url,
353 body,
354 method,
355 };
356
357 const serviceInfo = { region, service: MOBILE_SERVICE_NAME };
358
359 const requestUrl: string = Signer.signUrl(
360 request,
361 accessInfo,
362 serviceInfo,
363 null
364 );
365
366 const success: boolean = navigator.sendBeacon(requestUrl, body);
367
368 if (success) {
369 return handlers.resolve('sendBeacon success');
370 }
371 return handlers.reject('sendBeacon failure');
372 }
373
374 private _retry(params, handlers) {
375 const {
376 config: { resendLimit },
377 } = params;
378 // For backward compatibility
379 params.resendLimit =
380 typeof params.resendLimit === 'number' ? params.resendLimit : resendLimit;
381 if (params.resendLimit-- > 0) {
382 logger.debug(
383 `resending event ${params.eventName} with ${params.resendLimit} retry times left`
384 );
385 this._pinpointPutEvents(params, handlers);
386 } else {
387 logger.debug(`retry times used up for event ${params.eventName}`);
388 }
389 }
390
391 private async _updateEndpoint(endpointObject: EventObject) {
392 const { params, handlers } = endpointObject;
393 const { config, event } = params;
394 const { appId, endpointId } = config;
395
396 const request = this._endpointRequest(
397 config,
398 JS.transferKeyToLowerCase(
399 event,
400 [],
401 ['attributes', 'userAttributes', 'Attributes', 'UserAttributes']
402 )
403 );
404 const update_params = {
405 ApplicationId: appId,
406 EndpointId: endpointId,
407 EndpointRequest: request,
408 };
409
410 try {
411 const command: UpdateEndpointCommand = new UpdateEndpointCommand(
412 update_params
413 );
414 const data = await this.pinpointClient.send(command);
415
416 logger.debug('updateEndpoint success', data);
417 this._endpointGenerating = false;
418 this._resumeBuffer();
419
420 handlers.resolve(data);
421 return;
422 } catch (err) {
423 const failureData: EndpointFailureData = {
424 err,
425 update_params,
426 endpointObject,
427 };
428
429 return this._handleEndpointUpdateFailure(failureData);
430 }
431 }
432
433 private async _handleEndpointUpdateFailure(failureData: EndpointFailureData) {
434 const { err, endpointObject } = failureData;
435 const statusCode = err.$metadata && err.$metadata.httpStatusCode;
436
437 logger.debug('updateEndpoint error', err);
438
439 switch (statusCode) {
440 case FORBIDDEN_CODE:
441 return this._handleEndpointUpdateForbidden(failureData);
442 default:
443 if (RETRYABLE_CODES.includes(statusCode)) {
444 // Server error. Attempt exponential retry
445 const exponential = true;
446 return this._retryEndpointUpdate(endpointObject, exponential);
447 }
448 logger.error('updateEndpoint failed', err);
449 endpointObject.handlers.reject(err);
450 }
451 }
452
453 private _handleEndpointUpdateForbidden(failureData: EndpointFailureData) {
454 const { err, endpointObject } = failureData;
455
456 const { code, retryable } = err;
457
458 if (code !== EXPIRED_TOKEN_CODE && !retryable) {
459 return endpointObject.handlers.reject(err);
460 }
461
462 this._retryEndpointUpdate(endpointObject);
463 }
464
465 private _retryEndpointUpdate(
466 endpointObject: EventObject,
467 exponential: boolean = false
468 ) {
469 logger.debug('_retryEndpointUpdate', endpointObject);
470 const { params } = endpointObject;
471
472 // TODO: implement retry with exp back off once exp function is available
473 const {
474 config: { resendLimit },
475 } = params;
476
477 params.resendLimit =
478 typeof params.resendLimit === 'number' ? params.resendLimit : resendLimit;
479
480 if (params.resendLimit-- > 0) {
481 logger.debug(
482 `resending endpoint update ${params.event.eventId} with ${params.resendLimit} retry attempts remaining`
483 );
484 // insert at the front of endpointBuffer
485 this._endpointBuffer.length
486 ? this._endpointBuffer.unshift(endpointObject)
487 : this._updateEndpoint(endpointObject);
488 return;
489 }
490
491 logger.warn(
492 `resending endpoint update ${params.event.eventId} failed after ${params.config.resendLimit} attempts`
493 );
494
495 if (this._endpointGenerating) {
496 logger.error('Initial endpoint update failed. ');
497 }
498 }
499
500 /**
501 * @private
502 * @param config
503 * Init the clients
504 */
505 private async _initClients(credentials) {
506 logger.debug('init clients');
507
508 if (
509 this.pinpointClient &&
510 this._config.credentials &&
511 this._config.credentials.sessionToken === credentials.sessionToken &&
512 this._config.credentials.identityId === credentials.identityId
513 ) {
514 logger.debug('no change for aws credentials, directly return from init');
515 return;
516 }
517
518 const identityId = this._config.credentials
519 ? this._config.credentials.identityId
520 : null;
521
522 this._config.credentials = credentials;
523 const { region } = this._config;
524 logger.debug('init clients with credentials', credentials);
525 this.pinpointClient = new PinpointClient({
526 region,
527 credentials,
528 customUserAgent: getAmplifyUserAgent(),
529 });
530
531 // TODO: remove this middleware once a long term fix is implemented by aws-sdk-js team.
532 this.pinpointClient.middlewareStack.addRelativeTo(
533 next => args => {
534 delete args.request.headers['amz-sdk-invocation-id'];
535 delete args.request.headers['amz-sdk-request'];
536 return next(args);
537 },
538 {
539 step: 'finalizeRequest',
540 relation: 'after',
541 toMiddleware: 'retryMiddleware',
542 }
543 );
544
545 if (this._bufferExists() && identityId === credentials.identityId) {
546 // if the identity has remained the same, pass the updated client to the buffer
547 this._updateBufferClient();
548 } else {
549 // otherwise flush the buffer and instantiate a new one
550 // this will cause the old buffer to send any remaining events
551 // with the old credentials and then stop looping and shortly thereafter get picked up by GC
552 this._initBuffer();
553 }
554
555 this._customizePinpointClientReq();
556 }
557
558 private _bufferExists() {
559 return this._buffer && this._buffer instanceof EventsBuffer;
560 }
561
562 private _initBuffer() {
563 if (this._bufferExists()) {
564 this._flushBuffer();
565 }
566
567 this._buffer = new EventsBuffer(this.pinpointClient, this._config);
568
569 // if the first endpoint update hasn't yet resolved pause the buffer to
570 // prevent race conditions. It will be resumed as soon as that request succeeds
571 if (this._endpointGenerating) {
572 this._buffer.pause();
573 }
574 }
575
576 private _updateBufferClient() {
577 if (this._bufferExists()) {
578 this._buffer.updateClient(this.pinpointClient);
579 }
580 }
581
582 private _flushBuffer() {
583 if (this._bufferExists()) {
584 this._buffer.flush();
585 this._buffer = null;
586 }
587 }
588
589 private _resumeBuffer() {
590 if (this._bufferExists()) {
591 this._buffer.resume();
592 }
593 }
594
595 private _customizePinpointClientReq() {
596 // TODO FIXME: Find a middleware to do this with AWS V3 SDK
597 // if (Platform.isReactNative) {
598 // this.pinpointClient.customizeRequests(request => {
599 // request.on('build', req => {
600 // req.httpRequest.headers['user-agent'] = Platform.userAgent;
601 // });
602 // });
603 // }
604 }
605
606 private async _getEndpointId(cacheKey) {
607 // try to get from cache
608 let endpointId = await Cache.getItem(cacheKey);
609 logger.debug(
610 'endpointId from cache',
611 endpointId,
612 'type',
613 typeof endpointId
614 );
615 if (!endpointId) {
616 endpointId = uuid();
617 Cache.setItem(cacheKey, endpointId);
618 }
619 return endpointId;
620 }
621
622 /**
623 * EndPoint request
624 * @return {Object} - The request of updating endpoint
625 */
626 private _endpointRequest(config, event) {
627 const { credentials } = config;
628 const clientInfo = this._clientInfo || {};
629 const clientContext = config.clientContext || {};
630 // for now we have three different ways for default endpoint configurations
631 // clientInfo
632 // clientContext (deprecated)
633 // config.endpoint
634 const defaultEndpointConfig = config.endpoint || {};
635 const demographicByClientInfo = {
636 appVersion: clientInfo.appVersion,
637 make: clientInfo.make,
638 model: clientInfo.model,
639 modelVersion: clientInfo.version,
640 platform: clientInfo.platform,
641 };
642 // for backward compatibility
643 const {
644 clientId,
645 appTitle,
646 appVersionName,
647 appVersionCode,
648 appPackageName,
649 ...demographicByClientContext
650 } = clientContext;
651 const channelType = event.address
652 ? clientInfo.platform === 'android'
653 ? 'GCM'
654 : 'APNS'
655 : undefined;
656 const tmp = {
657 channelType,
658 requestId: uuid(),
659 effectiveDate: new Date().toISOString(),
660 ...defaultEndpointConfig,
661 ...event,
662 attributes: {
663 ...defaultEndpointConfig.attributes,
664 ...event.attributes,
665 },
666 demographic: {
667 ...demographicByClientInfo,
668 ...demographicByClientContext,
669 ...defaultEndpointConfig.demographic,
670 ...event.demographic,
671 },
672 location: {
673 ...defaultEndpointConfig.location,
674 ...event.location,
675 },
676 metrics: {
677 ...defaultEndpointConfig.metrics,
678 ...event.metrics,
679 },
680 user: {
681 userId:
682 event.userId ||
683 defaultEndpointConfig.userId ||
684 credentials.identityId,
685 userAttributes: {
686 ...defaultEndpointConfig.userAttributes,
687 ...event.userAttributes,
688 },
689 },
690 };
691
692 // eliminate unnecessary params
693 const {
694 userId,
695 userAttributes,
696 name,
697 session,
698 eventId,
699 immediate,
700 ...ret
701 } = tmp;
702 return JS.transferKeyToUpperCase(
703 ret,
704 [],
705 ['metrics', 'userAttributes', 'attributes']
706 );
707 }
708
709 private _eventError(err: any) {
710 logger.error('record event failed.', err);
711 logger.warn(
712 `Please ensure you have updated your Pinpoint IAM Policy ` +
713 `with the Action: "mobiletargeting:PutEvents" ` +
714 `in order to record events`
715 );
716 }
717
718 private async _getCredentials() {
719 try {
720 const credentials = await Credentials.get();
721 if (!credentials) return null;
722
723 logger.debug('set credentials for analytics', credentials);
724 return Credentials.shear(credentials);
725 } catch (err) {
726 logger.debug('ensure credentials error', err);
727 return null;
728 }
729 }
730}
731
732/**
733 * @deprecated use named import
734 */
735export default AWSPinpointProvider;