1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 | import debugFactory from 'debug';
|
7 | import {EventEmitter} from 'events';
|
8 | import {iterator, multiple} from 'p-event';
|
9 | import {Context} from './context';
|
10 | import {ContextEvent, ContextEventListener} from './context-event';
|
11 | import {
|
12 | ContextEventObserver,
|
13 | ContextEventType,
|
14 | ContextObserver,
|
15 | } from './context-observer';
|
16 |
|
17 | const debug = debugFactory('loopback:context:subscription');
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 | export interface Subscription {
|
24 | |
25 |
|
26 |
|
27 | unsubscribe(): void;
|
28 | |
29 |
|
30 |
|
31 | closed: boolean;
|
32 | }
|
33 |
|
34 |
|
35 |
|
36 |
|
37 | export interface Notification extends ContextEvent {
|
38 | |
39 |
|
40 |
|
41 | observers: Set<ContextEventObserver>;
|
42 | }
|
43 |
|
44 |
|
45 |
|
46 |
|
47 | class ContextSubscription implements Subscription {
|
48 | constructor(
|
49 | protected context: Context,
|
50 | protected observer: ContextEventObserver,
|
51 | ) {}
|
52 |
|
53 | private _closed = false;
|
54 |
|
55 | unsubscribe() {
|
56 | this.context.unsubscribe(this.observer);
|
57 | this._closed = true;
|
58 | }
|
59 |
|
60 | get closed() {
|
61 | return this._closed;
|
62 | }
|
63 | }
|
64 |
|
65 |
|
66 |
|
67 |
|
68 | export class ContextSubscriptionManager extends EventEmitter {
|
69 | |
70 |
|
71 |
|
72 | protected _parentContextEventListener?: ContextEventListener;
|
73 |
|
74 | |
75 |
|
76 |
|
77 |
|
78 | protected _observers: Set<ContextEventObserver> | undefined;
|
79 |
|
80 | |
81 |
|
82 |
|
83 |
|
84 | private pendingNotifications = 0;
|
85 |
|
86 | |
87 |
|
88 |
|
89 | private notificationQueue: AsyncIterableIterator<Notification> | undefined;
|
90 |
|
91 | constructor(protected readonly context: Context) {
|
92 | super();
|
93 | this.setMaxListeners(Infinity);
|
94 | }
|
95 |
|
96 | |
97 |
|
98 |
|
99 | get parentContextEventListener() {
|
100 | return this._parentContextEventListener;
|
101 | }
|
102 |
|
103 | |
104 |
|
105 |
|
106 | get observers() {
|
107 | return this._observers;
|
108 | }
|
109 |
|
110 | |
111 |
|
112 |
|
113 |
|
114 |
|
115 | private _debug(...args: unknown[]) {
|
116 |
|
117 | if (!debug.enabled) return;
|
118 | const formatter = args.shift();
|
119 | if (typeof formatter === 'string') {
|
120 | debug(`[%s] ${formatter}`, this.context.name, ...args);
|
121 | } else {
|
122 | debug('[%s] ', this.context.name, formatter, ...args);
|
123 | }
|
124 | }
|
125 |
|
126 | |
127 |
|
128 |
|
129 |
|
130 |
|
131 | private setupEventHandlersIfNeeded() {
|
132 | if (this.notificationQueue != null) return;
|
133 |
|
134 | if (this.context.parent != null) {
|
135 | |
136 |
|
137 |
|
138 |
|
139 | this._parentContextEventListener = event => {
|
140 | this.handleParentEvent(event);
|
141 | };
|
142 |
|
143 |
|
144 | this.context.parent.on('bind', this._parentContextEventListener!);
|
145 | this.context.parent.on('unbind', this._parentContextEventListener!);
|
146 | }
|
147 |
|
148 |
|
149 |
|
150 | this.startNotificationTask().catch(err => {
|
151 | this.handleNotificationError(err);
|
152 | });
|
153 |
|
154 | let ctx = this.context.parent;
|
155 | while (ctx) {
|
156 | ctx.subscriptionManager.setupEventHandlersIfNeeded();
|
157 | ctx = ctx.parent;
|
158 | }
|
159 | }
|
160 |
|
161 | private handleParentEvent(event: ContextEvent) {
|
162 | const {binding, context, type} = event;
|
163 |
|
164 |
|
165 |
|
166 | if (this.context.contains(binding.key)) {
|
167 | this._debug(
|
168 | 'Event %s %s is not re-emitted from %s to %s',
|
169 | type,
|
170 | binding.key,
|
171 | context.name,
|
172 | this.context.name,
|
173 | );
|
174 | return;
|
175 | }
|
176 | this._debug(
|
177 | 'Re-emitting %s %s from %s to %s',
|
178 | type,
|
179 | binding.key,
|
180 | context.name,
|
181 | this.context.name,
|
182 | );
|
183 | this.context.emitEvent(type, event);
|
184 | }
|
185 |
|
186 | |
187 |
|
188 |
|
189 |
|
190 |
|
191 | private emitEvent<T extends ContextEvent>(type: string, event: T) {
|
192 | this.emit(type, event);
|
193 | }
|
194 |
|
195 | |
196 |
|
197 |
|
198 |
|
199 | private emitError(err: unknown) {
|
200 | this.emit('error', err);
|
201 | }
|
202 |
|
203 | |
204 |
|
205 |
|
206 | private startNotificationTask() {
|
207 |
|
208 | this.setupNotification('bind', 'unbind');
|
209 |
|
210 |
|
211 | this.notificationQueue = iterator(this, 'notification', {
|
212 |
|
213 |
|
214 | rejectionEvents: [],
|
215 | });
|
216 |
|
217 | return this.processNotifications();
|
218 | }
|
219 |
|
220 | |
221 |
|
222 |
|
223 |
|
224 |
|
225 |
|
226 |
|
227 |
|
228 |
|
229 | protected async notifyObservers(
|
230 | event: ContextEvent,
|
231 | observers = this._observers,
|
232 | ) {
|
233 | if (!observers || observers.size === 0) return;
|
234 |
|
235 | const {type, binding, context} = event;
|
236 | for (const observer of observers) {
|
237 | if (typeof observer === 'function') {
|
238 | await observer(type, binding, context);
|
239 | } else if (!observer.filter || observer.filter(binding)) {
|
240 | await observer.observe(type, binding, context);
|
241 | }
|
242 | }
|
243 | }
|
244 |
|
245 | |
246 |
|
247 |
|
248 | private async processNotifications() {
|
249 | const events = this.notificationQueue;
|
250 | if (events == null) return;
|
251 | for await (const {type, binding, context, observers} of events) {
|
252 |
|
253 | try {
|
254 |
|
255 | await this.notifyObservers({type, binding, context}, observers);
|
256 | this.pendingNotifications--;
|
257 | this._debug(
|
258 | 'Observers notified for %s of binding %s',
|
259 | type,
|
260 | binding.key,
|
261 | );
|
262 | this.emitEvent('observersNotified', {type, binding, context});
|
263 | } catch (err) {
|
264 |
|
265 |
|
266 | this._debug('Error caught from observers', err);
|
267 |
|
268 | if (this.listenerCount('error') > 0) {
|
269 |
|
270 | this.emitError(err);
|
271 | } else {
|
272 |
|
273 |
|
274 | this.handleNotificationError(err);
|
275 | }
|
276 | }
|
277 | }
|
278 | }
|
279 |
|
280 | |
281 |
|
282 |
|
283 |
|
284 |
|
285 | private setupNotification(...eventTypes: ContextEventType[]) {
|
286 | for (const type of eventTypes) {
|
287 | this.context.on(type, ({binding, context}) => {
|
288 |
|
289 | if (!this._observers || this._observers.size === 0) return;
|
290 |
|
291 | this.pendingNotifications++;
|
292 |
|
293 |
|
294 |
|
295 | this.emitEvent('notification', {
|
296 | type,
|
297 | binding,
|
298 | context,
|
299 | observers: new Set(this._observers),
|
300 | });
|
301 | });
|
302 | }
|
303 | }
|
304 |
|
305 | |
306 |
|
307 |
|
308 |
|
309 |
|
310 |
|
311 |
|
312 | async waitUntilPendingNotificationsDone(timeout?: number) {
|
313 | const count = this.pendingNotifications;
|
314 | debug('Number of pending notifications: %d', count);
|
315 | if (count === 0) return;
|
316 | await multiple(this, 'observersNotified', {count, timeout});
|
317 | }
|
318 |
|
319 | |
320 |
|
321 |
|
322 |
|
323 | subscribe(observer: ContextEventObserver): Subscription {
|
324 | this._observers = this._observers ?? new Set();
|
325 | this.setupEventHandlersIfNeeded();
|
326 | this._observers.add(observer);
|
327 | return new ContextSubscription(this.context, observer);
|
328 | }
|
329 |
|
330 | |
331 |
|
332 |
|
333 |
|
334 | unsubscribe(observer: ContextEventObserver): boolean {
|
335 | if (!this._observers) return false;
|
336 | return this._observers.delete(observer);
|
337 | }
|
338 |
|
339 | |
340 |
|
341 |
|
342 |
|
343 | isSubscribed(observer: ContextObserver) {
|
344 | if (!this._observers) return false;
|
345 | return this._observers.has(observer);
|
346 | }
|
347 |
|
348 | |
349 |
|
350 |
|
351 |
|
352 | private handleNotificationError(err: unknown) {
|
353 |
|
354 |
|
355 | let ctx: Context | undefined = this.context;
|
356 | while (ctx) {
|
357 | if (ctx.listenerCount('error') === 0) {
|
358 |
|
359 | ctx = ctx.parent;
|
360 | continue;
|
361 | }
|
362 | this._debug('Emitting error to context %s', ctx.name, err);
|
363 | ctx.emitError(err);
|
364 | return;
|
365 | }
|
366 |
|
367 | this._debug('No error handler is configured for the context chain', err);
|
368 |
|
369 | this.context.emitError(err);
|
370 | }
|
371 |
|
372 | |
373 |
|
374 |
|
375 |
|
376 |
|
377 |
|
378 |
|
379 |
|
380 |
|
381 | close() {
|
382 | this._observers = undefined;
|
383 | if (this.notificationQueue != null) {
|
384 |
|
385 | this.notificationQueue.return!(undefined).catch(err => {
|
386 | this.handleNotificationError(err);
|
387 | });
|
388 | this.notificationQueue = undefined;
|
389 | }
|
390 | if (this.context.parent && this._parentContextEventListener) {
|
391 | this.context.parent.removeListener(
|
392 | 'bind',
|
393 | this._parentContextEventListener,
|
394 | );
|
395 | this.context.parent.removeListener(
|
396 | 'unbind',
|
397 | this._parentContextEventListener,
|
398 | );
|
399 | this._parentContextEventListener = undefined;
|
400 | }
|
401 | }
|
402 | }
|