UNPKG

12 kBPlain TextView Raw
1// Copyright IBM Corp. and LoopBack contributors 2019,2020. All Rights Reserved.
2// Node module: @loopback/context
3// This file is licensed under the MIT License.
4// License text available at https://opensource.org/licenses/MIT
5
6import debugFactory from 'debug';
7import {EventEmitter} from 'events';
8import {iterator, multiple} from 'p-event';
9import {Context} from './context';
10import {ContextEvent, ContextEventListener} from './context-event';
11import {
12 ContextEventObserver,
13 ContextEventType,
14 ContextObserver,
15} from './context-observer';
16
17const debug = debugFactory('loopback:context:subscription');
18
19/**
20 * Subscription of context events. It's modeled after
21 * https://github.com/tc39/proposal-observable.
22 */
23export interface Subscription {
24 /**
25 * unsubscribe
26 */
27 unsubscribe(): void;
28 /**
29 * Is the subscription closed?
30 */
31 closed: boolean;
32}
33
34/**
35 * Event data for observer notifications
36 */
37export interface Notification extends ContextEvent {
38 /**
39 * A snapshot of observers when the original event is emitted
40 */
41 observers: Set<ContextEventObserver>;
42}
43
44/**
45 * An implementation of `Subscription` interface for context events
46 */
47class ContextSubscription implements Subscription {
48 constructor(
49 protected context: Context,
50 protected observer: ContextEventObserver,
51 ) {}
52
53 private _closed = false;
54
55 unsubscribe() {
56 this.context.unsubscribe(this.observer);
57 this._closed = true;
58 }
59
60 get closed() {
61 return this._closed;
62 }
63}
64
65/**
66 * Manager for context observer subscriptions
67 */
68export class ContextSubscriptionManager extends EventEmitter {
69 /**
70 * A listener to watch parent context events
71 */
72 protected _parentContextEventListener?: ContextEventListener;
73
74 /**
75 * A list of registered context observers. The Set will be created when the
76 * first observer is added.
77 */
78 protected _observers: Set<ContextEventObserver> | undefined;
79
80 /**
81 * Internal counter for pending notification events which are yet to be
82 * processed by observers.
83 */
84 private pendingNotifications = 0;
85
86 /**
87 * Queue for background notifications for observers
88 */
89 private notificationQueue: AsyncIterableIterator<Notification> | undefined;
90
91 constructor(protected readonly context: Context) {
92 super();
93 this.setMaxListeners(Infinity);
94 }
95
96 /**
97 * @internal
98 */
99 get parentContextEventListener() {
100 return this._parentContextEventListener;
101 }
102
103 /**
104 * @internal
105 */
106 get observers() {
107 return this._observers;
108 }
109
110 /**
111 * Wrap the debug statement so that it always print out the context name
112 * as the prefix
113 * @param args - Arguments for the debug
114 */
115 private _debug(...args: unknown[]) {
116 /* istanbul ignore if */
117 if (!debug.enabled) return;
118 const formatter = args.shift();
119 if (typeof formatter === 'string') {
120 debug(`[%s] ${formatter}`, this.context.name, ...args);
121 } else {
122 debug('[%s] ', this.context.name, formatter, ...args);
123 }
124 }
125
126 /**
127 * Set up an internal listener to notify registered observers asynchronously
128 * upon `bind` and `unbind` events. This method will be called lazily when
129 * the first observer is added.
130 */
131 private setupEventHandlersIfNeeded() {
132 if (this.notificationQueue != null) return;
133
134 if (this.context.parent != null) {
135 /**
136 * Add an event listener to its parent context so that this context will
137 * be notified of parent events, such as `bind` or `unbind`.
138 */
139 this._parentContextEventListener = event => {
140 this.handleParentEvent(event);
141 };
142
143 // Listen on the parent context events
144 this.context.parent.on('bind', this._parentContextEventListener!);
145 this.context.parent.on('unbind', this._parentContextEventListener!);
146 }
147
148 // The following are two async functions. Returned promises are ignored as
149 // they are long-running background tasks.
150 this.startNotificationTask().catch(err => {
151 this.handleNotificationError(err);
152 });
153
154 let ctx = this.context.parent;
155 while (ctx) {
156 ctx.subscriptionManager.setupEventHandlersIfNeeded();
157 ctx = ctx.parent;
158 }
159 }
160
161 private handleParentEvent(event: ContextEvent) {
162 const {binding, context, type} = event;
163 // Propagate the event to this context only if the binding key does not
164 // exist in this context. The parent binding is shadowed if there is a
165 // binding with the same key in this one.
166 if (this.context.contains(binding.key)) {
167 this._debug(
168 'Event %s %s is not re-emitted from %s to %s',
169 type,
170 binding.key,
171 context.name,
172 this.context.name,
173 );
174 return;
175 }
176 this._debug(
177 'Re-emitting %s %s from %s to %s',
178 type,
179 binding.key,
180 context.name,
181 this.context.name,
182 );
183 this.context.emitEvent(type, event);
184 }
185
186 /**
187 * A strongly-typed method to emit context events
188 * @param type Event type
189 * @param event Context event
190 */
191 private emitEvent<T extends ContextEvent>(type: string, event: T) {
192 this.emit(type, event);
193 }
194
195 /**
196 * Emit an `error` event
197 * @param err Error
198 */
199 private emitError(err: unknown) {
200 this.emit('error', err);
201 }
202
203 /**
204 * Start a background task to listen on context events and notify observers
205 */
206 private startNotificationTask() {
207 // Set up listeners on `bind` and `unbind` for notifications
208 this.setupNotification('bind', 'unbind');
209
210 // Create an async iterator for the `notification` event as a queue
211 this.notificationQueue = iterator(this, 'notification', {
212 // Do not end the iterator if an error event is emitted on the
213 // subscription manager
214 rejectionEvents: [],
215 });
216
217 return this.processNotifications();
218 }
219
220 /**
221 * Publish an event to the registered observers. Please note the
222 * notification is queued and performed asynchronously so that we allow fluent
223 * APIs such as `ctx.bind('key').to(...).tag(...);` and give observers the
224 * fully populated binding.
225 *
226 * @param event - Context event
227 * @param observers - Current set of context observers
228 */
229 protected async notifyObservers(
230 event: ContextEvent,
231 observers = this._observers,
232 ) {
233 if (!observers || observers.size === 0) return;
234
235 const {type, binding, context} = event;
236 for (const observer of observers) {
237 if (typeof observer === 'function') {
238 await observer(type, binding, context);
239 } else if (!observer.filter || observer.filter(binding)) {
240 await observer.observe(type, binding, context);
241 }
242 }
243 }
244
245 /**
246 * Process notification events as they arrive on the queue
247 */
248 private async processNotifications() {
249 const events = this.notificationQueue;
250 if (events == null) return;
251 for await (const {type, binding, context, observers} of events) {
252 // The loop will happen asynchronously upon events
253 try {
254 // The execution of observers happen in the Promise micro-task queue
255 await this.notifyObservers({type, binding, context}, observers);
256 this.pendingNotifications--;
257 this._debug(
258 'Observers notified for %s of binding %s',
259 type,
260 binding.key,
261 );
262 this.emitEvent('observersNotified', {type, binding, context});
263 } catch (err) {
264 // Do not reduce the pending notification count so that errors
265 // can be captured by waitUntilPendingNotificationsDone
266 this._debug('Error caught from observers', err);
267 // Errors caught from observers.
268 if (this.listenerCount('error') > 0) {
269 // waitUntilPendingNotificationsDone may be called
270 this.emitError(err);
271 } else {
272 // Emit it to the current context. If no error listeners are
273 // registered, crash the process.
274 this.handleNotificationError(err);
275 }
276 }
277 }
278 }
279
280 /**
281 * Listen on given event types and emit `notification` event. This method
282 * merge multiple event types into one for notification.
283 * @param eventTypes - Context event types
284 */
285 private setupNotification(...eventTypes: ContextEventType[]) {
286 for (const type of eventTypes) {
287 this.context.on(type, ({binding, context}) => {
288 // No need to schedule notifications if no observers are present
289 if (!this._observers || this._observers.size === 0) return;
290 // Track pending events
291 this.pendingNotifications++;
292 // Take a snapshot of current observers to ensure notifications of this
293 // event will only be sent to current ones. Emit a new event to notify
294 // current context observers.
295 this.emitEvent('notification', {
296 type,
297 binding,
298 context,
299 observers: new Set(this._observers),
300 });
301 });
302 }
303 }
304
305 /**
306 * Wait until observers are notified for all of currently pending notification
307 * events.
308 *
309 * This method is for test only to perform assertions after observers are
310 * notified for relevant events.
311 */
312 async waitUntilPendingNotificationsDone(timeout?: number) {
313 const count = this.pendingNotifications;
314 debug('Number of pending notifications: %d', count);
315 if (count === 0) return;
316 await multiple(this, 'observersNotified', {count, timeout});
317 }
318
319 /**
320 * Add a context event observer to the context
321 * @param observer - Context observer instance or function
322 */
323 subscribe(observer: ContextEventObserver): Subscription {
324 this._observers = this._observers ?? new Set();
325 this.setupEventHandlersIfNeeded();
326 this._observers.add(observer);
327 return new ContextSubscription(this.context, observer);
328 }
329
330 /**
331 * Remove the context event observer from the context
332 * @param observer - Context event observer
333 */
334 unsubscribe(observer: ContextEventObserver): boolean {
335 if (!this._observers) return false;
336 return this._observers.delete(observer);
337 }
338
339 /**
340 * Check if an observer is subscribed to this context
341 * @param observer - Context observer
342 */
343 isSubscribed(observer: ContextObserver) {
344 if (!this._observers) return false;
345 return this._observers.has(observer);
346 }
347
348 /**
349 * Handle errors caught during the notification of observers
350 * @param err - Error
351 */
352 private handleNotificationError(err: unknown) {
353 // Bubbling up the error event over the context chain
354 // until we find an error listener
355 let ctx: Context | undefined = this.context;
356 while (ctx) {
357 if (ctx.listenerCount('error') === 0) {
358 // No error listener found, try its parent
359 ctx = ctx.parent;
360 continue;
361 }
362 this._debug('Emitting error to context %s', ctx.name, err);
363 ctx.emitError(err);
364 return;
365 }
366 // No context with error listeners found
367 this._debug('No error handler is configured for the context chain', err);
368 // Let it crash now by emitting an error event
369 this.context.emitError(err);
370 }
371
372 /**
373 * Close the context: clear observers, stop notifications, and remove event
374 * listeners from its parent context.
375 *
376 * @remarks
377 * This method MUST be called to avoid memory leaks once a context object is
378 * no longer needed and should be recycled. An example is the `RequestContext`,
379 * which is created per request.
380 */
381 close() {
382 this._observers = undefined;
383 if (this.notificationQueue != null) {
384 // Cancel the notification iterator
385 this.notificationQueue.return!(undefined).catch(err => {
386 this.handleNotificationError(err);
387 });
388 this.notificationQueue = undefined;
389 }
390 if (this.context.parent && this._parentContextEventListener) {
391 this.context.parent.removeListener(
392 'bind',
393 this._parentContextEventListener,
394 );
395 this.context.parent.removeListener(
396 'unbind',
397 this._parentContextEventListener,
398 );
399 this._parentContextEventListener = undefined;
400 }
401 }
402}