UNPKG

4.98 kBPlain TextView Raw
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3import {
4 Amplify,
5 browserOrNode,
6 Category,
7 ConsoleLogger as Logger,
8 CustomUserAgentDetails,
9 INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER,
10 PubSubAction,
11} from '@aws-amplify/core';
12import { PubSubProvider, PubSubOptions, ProviderOptions } from '../types';
13import { AWSAppSyncRealTimeProvider } from '../Providers';
14import { PubSubContent } from '../types/PubSub';
15import Observable from 'zen-observable-ts';
16
17const { isNode } = browserOrNode();
18const logger = new Logger('PubSub');
19
20type PubSubObservable = {
21 provider: PubSubProvider;
22 value: string | Record<string, unknown>;
23};
24
25export class InternalPubSubClass {
26 private _options: PubSubOptions;
27
28 private _pluggables: PubSubProvider[];
29
30 /**
31 * Internal instance of AWSAppSyncRealTimeProvider used by the API category to subscribe to AppSync
32 */
33 private _awsAppSyncRealTimeProvider?: AWSAppSyncRealTimeProvider;
34
35 /**
36 * Lazy instantiate AWSAppSyncRealTimeProvider when it is required by the API category
37 */
38 private get awsAppSyncRealTimeProvider() {
39 if (!this._awsAppSyncRealTimeProvider) {
40 this._awsAppSyncRealTimeProvider = new AWSAppSyncRealTimeProvider(
41 this._options
42 );
43 }
44 return this._awsAppSyncRealTimeProvider;
45 }
46
47 /**
48 * Initialize PubSub with AWS configurations
49 *
50 * @param {PubSubOptions} options - Configuration object for PubSub
51 */
52 constructor(options?: PubSubOptions) {
53 this._options = options ?? {};
54 logger.debug('PubSub Options', this._options);
55 this._pluggables = [];
56 this.subscribe = this.subscribe.bind(this);
57 }
58
59 public getModuleName() {
60 return 'InternalPubSub';
61 }
62
63 /**
64 * Configure PubSub part with configurations
65 *
66 * @param {PubSubOptions} config - Configuration for PubSub
67 * @return {Object} - The current configuration
68 */
69 configure(options: PubSubOptions) {
70 const opt: Record<string, unknown> = options
71 ? options.PubSub || options
72 : {};
73 logger.debug('configure PubSub', { opt });
74
75 this._options = Object.assign({}, this._options, opt);
76
77 this._pluggables.map(pluggable => pluggable.configure(this._options));
78
79 return this._options;
80 }
81
82 /**
83 * add plugin into Analytics category
84 * @param {Object} pluggable - an instance of the plugin
85 */
86 public async addPluggable(pluggable: PubSubProvider) {
87 if (pluggable && pluggable.getCategory() === 'PubSub') {
88 this._pluggables.push(pluggable);
89
90 const config = pluggable.configure(this._options);
91
92 return config;
93 }
94 }
95
96 /**
97 * remove plugin from PubSub category
98 * @param providerName - the name of the plugin
99 */
100 removePluggable(providerName: string): void {
101 this._pluggables = this._pluggables.filter(
102 pluggable => pluggable.getProviderName() !== providerName
103 );
104 }
105
106 private getProviderByName(providerName: string | symbol) {
107 if (providerName === INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER) {
108 return this.awsAppSyncRealTimeProvider;
109 }
110
111 return this._pluggables.find(
112 pluggable => pluggable.getProviderName() === providerName
113 );
114 }
115
116 private getProviders(options: ProviderOptions = {}) {
117 const providerName = options.provider;
118 if (!providerName) {
119 return this._pluggables;
120 }
121
122 const provider = this.getProviderByName(providerName);
123 if (!provider) {
124 throw new Error(`Could not find provider named ${String(providerName)}`);
125 }
126
127 return [provider];
128 }
129
130 async publish(
131 topics: string[] | string,
132 msg: PubSubContent,
133 options?: ProviderOptions
134 ) {
135 return Promise.all(
136 this.getProviders(options).map(provider =>
137 provider.publish(topics, msg, options)
138 )
139 );
140 }
141
142 subscribe(
143 topics: string[] | string,
144 options?: ProviderOptions,
145 customUserAgentDetails?: CustomUserAgentDetails
146 ): Observable<PubSubObservable> {
147 if (isNode && this._options && this._options.ssr) {
148 throw new Error(
149 'Subscriptions are not supported for Server-Side Rendering (SSR)'
150 );
151 }
152
153 logger.debug('subscribe options', options);
154
155 const providers = this.getProviders(options);
156
157 const pubSubUserAgentDetails: CustomUserAgentDetails = {
158 category: Category.PubSub,
159 action: PubSubAction.Subscribe,
160 ...customUserAgentDetails,
161 };
162
163 return new Observable<PubSubObservable>(observer => {
164 const observables = providers.map(provider => ({
165 provider,
166 observable: provider.subscribe(topics, options, pubSubUserAgentDetails),
167 }));
168
169 const subscriptions = observables.map(({ provider, observable }) =>
170 observable.subscribe({
171 start: console.error,
172 next: (value: PubSubContent) => observer.next({ provider, value }),
173 error: (error: unknown) => observer.error({ provider, error }),
174 // complete: observer.complete, // TODO: when all completed, complete the outer one
175 })
176 );
177
178 return () =>
179 subscriptions.forEach(subscription => subscription.unsubscribe());
180 });
181 }
182}
183
184export const InternalPubSub = new InternalPubSubClass();
185Amplify.register(InternalPubSub);