1 |
|
2 |
|
3 | import {
|
4 | Amplify,
|
5 | browserOrNode,
|
6 | Category,
|
7 | ConsoleLogger as Logger,
|
8 | CustomUserAgentDetails,
|
9 | INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER,
|
10 | PubSubAction,
|
11 | } from '@aws-amplify/core';
|
12 | import { PubSubProvider, PubSubOptions, ProviderOptions } from '../types';
|
13 | import { AWSAppSyncRealTimeProvider } from '../Providers';
|
14 | import { PubSubContent } from '../types/PubSub';
|
15 | import Observable from 'zen-observable-ts';
|
16 |
|
17 | const { isNode } = browserOrNode();
|
18 | const logger = new Logger('PubSub');
|
19 |
|
20 | type PubSubObservable = {
|
21 | provider: PubSubProvider;
|
22 | value: string | Record<string, unknown>;
|
23 | };
|
24 |
|
25 | export class InternalPubSubClass {
|
26 | private _options: PubSubOptions;
|
27 |
|
28 | private _pluggables: PubSubProvider[];
|
29 |
|
30 | |
31 |
|
32 |
|
33 | private _awsAppSyncRealTimeProvider?: AWSAppSyncRealTimeProvider;
|
34 |
|
35 | |
36 |
|
37 |
|
38 | private get awsAppSyncRealTimeProvider() {
|
39 | if (!this._awsAppSyncRealTimeProvider) {
|
40 | this._awsAppSyncRealTimeProvider = new AWSAppSyncRealTimeProvider(
|
41 | this._options
|
42 | );
|
43 | }
|
44 | return this._awsAppSyncRealTimeProvider;
|
45 | }
|
46 |
|
47 | |
48 |
|
49 |
|
50 |
|
51 |
|
52 | constructor(options?: PubSubOptions) {
|
53 | this._options = options ?? {};
|
54 | logger.debug('PubSub Options', this._options);
|
55 | this._pluggables = [];
|
56 | this.subscribe = this.subscribe.bind(this);
|
57 | }
|
58 |
|
59 | public getModuleName() {
|
60 | return 'InternalPubSub';
|
61 | }
|
62 |
|
63 | |
64 |
|
65 |
|
66 |
|
67 |
|
68 |
|
69 | configure(options: PubSubOptions) {
|
70 | const opt: Record<string, unknown> = options
|
71 | ? options.PubSub || options
|
72 | : {};
|
73 | logger.debug('configure PubSub', { opt });
|
74 |
|
75 | this._options = Object.assign({}, this._options, opt);
|
76 |
|
77 | this._pluggables.map(pluggable => pluggable.configure(this._options));
|
78 |
|
79 | return this._options;
|
80 | }
|
81 |
|
82 | |
83 |
|
84 |
|
85 |
|
86 | public async addPluggable(pluggable: PubSubProvider) {
|
87 | if (pluggable && pluggable.getCategory() === 'PubSub') {
|
88 | this._pluggables.push(pluggable);
|
89 |
|
90 | const config = pluggable.configure(this._options);
|
91 |
|
92 | return config;
|
93 | }
|
94 | }
|
95 |
|
96 | |
97 |
|
98 |
|
99 |
|
100 | removePluggable(providerName: string): void {
|
101 | this._pluggables = this._pluggables.filter(
|
102 | pluggable => pluggable.getProviderName() !== providerName
|
103 | );
|
104 | }
|
105 |
|
106 | private getProviderByName(providerName: string | symbol) {
|
107 | if (providerName === INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER) {
|
108 | return this.awsAppSyncRealTimeProvider;
|
109 | }
|
110 |
|
111 | return this._pluggables.find(
|
112 | pluggable => pluggable.getProviderName() === providerName
|
113 | );
|
114 | }
|
115 |
|
116 | private getProviders(options: ProviderOptions = {}) {
|
117 | const providerName = options.provider;
|
118 | if (!providerName) {
|
119 | return this._pluggables;
|
120 | }
|
121 |
|
122 | const provider = this.getProviderByName(providerName);
|
123 | if (!provider) {
|
124 | throw new Error(`Could not find provider named ${String(providerName)}`);
|
125 | }
|
126 |
|
127 | return [provider];
|
128 | }
|
129 |
|
130 | async publish(
|
131 | topics: string[] | string,
|
132 | msg: PubSubContent,
|
133 | options?: ProviderOptions
|
134 | ) {
|
135 | return Promise.all(
|
136 | this.getProviders(options).map(provider =>
|
137 | provider.publish(topics, msg, options)
|
138 | )
|
139 | );
|
140 | }
|
141 |
|
142 | subscribe(
|
143 | topics: string[] | string,
|
144 | options?: ProviderOptions,
|
145 | customUserAgentDetails?: CustomUserAgentDetails
|
146 | ): Observable<PubSubObservable> {
|
147 | if (isNode && this._options && this._options.ssr) {
|
148 | throw new Error(
|
149 | 'Subscriptions are not supported for Server-Side Rendering (SSR)'
|
150 | );
|
151 | }
|
152 |
|
153 | logger.debug('subscribe options', options);
|
154 |
|
155 | const providers = this.getProviders(options);
|
156 |
|
157 | const pubSubUserAgentDetails: CustomUserAgentDetails = {
|
158 | category: Category.PubSub,
|
159 | action: PubSubAction.Subscribe,
|
160 | ...customUserAgentDetails,
|
161 | };
|
162 |
|
163 | return new Observable<PubSubObservable>(observer => {
|
164 | const observables = providers.map(provider => ({
|
165 | provider,
|
166 | observable: provider.subscribe(topics, options, pubSubUserAgentDetails),
|
167 | }));
|
168 |
|
169 | const subscriptions = observables.map(({ provider, observable }) =>
|
170 | observable.subscribe({
|
171 | start: console.error,
|
172 | next: (value: PubSubContent) => observer.next({ provider, value }),
|
173 | error: (error: unknown) => observer.error({ provider, error }),
|
174 |
|
175 | })
|
176 | );
|
177 |
|
178 | return () =>
|
179 | subscriptions.forEach(subscription => subscription.unsubscribe());
|
180 | });
|
181 | }
|
182 | }
|
183 |
|
184 | export const InternalPubSub = new InternalPubSubClass();
|
185 | Amplify.register(InternalPubSub);
|