UNPKG

12 kBJavaScriptView Raw
1"use strict";
2// Copyright IBM Corp. and LoopBack contributors 2019,2020. All Rights Reserved.
3// Node module: @loopback/context
4// This file is licensed under the MIT License.
5// License text available at https://opensource.org/licenses/MIT
6Object.defineProperty(exports, "__esModule", { value: true });
7exports.ContextSubscriptionManager = void 0;
8const tslib_1 = require("tslib");
9const debug_1 = tslib_1.__importDefault(require("debug"));
10const events_1 = require("events");
11const p_event_1 = require("p-event");
12const debug = (0, debug_1.default)('loopback:context:subscription');
13/**
14 * An implementation of `Subscription` interface for context events
15 */
16class ContextSubscription {
17 constructor(context, observer) {
18 this.context = context;
19 this.observer = observer;
20 this._closed = false;
21 }
22 unsubscribe() {
23 this.context.unsubscribe(this.observer);
24 this._closed = true;
25 }
26 get closed() {
27 return this._closed;
28 }
29}
30/**
31 * Manager for context observer subscriptions
32 */
33class ContextSubscriptionManager extends events_1.EventEmitter {
34 constructor(context) {
35 super();
36 this.context = context;
37 /**
38 * Internal counter for pending notification events which are yet to be
39 * processed by observers.
40 */
41 this.pendingNotifications = 0;
42 this.setMaxListeners(Infinity);
43 }
44 /**
45 * @internal
46 */
47 get parentContextEventListener() {
48 return this._parentContextEventListener;
49 }
50 /**
51 * @internal
52 */
53 get observers() {
54 return this._observers;
55 }
56 /**
57 * Wrap the debug statement so that it always print out the context name
58 * as the prefix
59 * @param args - Arguments for the debug
60 */
61 _debug(...args) {
62 /* istanbul ignore if */
63 if (!debug.enabled)
64 return;
65 const formatter = args.shift();
66 if (typeof formatter === 'string') {
67 debug(`[%s] ${formatter}`, this.context.name, ...args);
68 }
69 else {
70 debug('[%s] ', this.context.name, formatter, ...args);
71 }
72 }
73 /**
74 * Set up an internal listener to notify registered observers asynchronously
75 * upon `bind` and `unbind` events. This method will be called lazily when
76 * the first observer is added.
77 */
78 setupEventHandlersIfNeeded() {
79 if (this.notificationQueue != null)
80 return;
81 if (this.context.parent != null) {
82 /**
83 * Add an event listener to its parent context so that this context will
84 * be notified of parent events, such as `bind` or `unbind`.
85 */
86 this._parentContextEventListener = event => {
87 this.handleParentEvent(event);
88 };
89 // Listen on the parent context events
90 this.context.parent.on('bind', this._parentContextEventListener);
91 this.context.parent.on('unbind', this._parentContextEventListener);
92 }
93 // The following are two async functions. Returned promises are ignored as
94 // they are long-running background tasks.
95 this.startNotificationTask().catch(err => {
96 this.handleNotificationError(err);
97 });
98 let ctx = this.context.parent;
99 while (ctx) {
100 ctx.subscriptionManager.setupEventHandlersIfNeeded();
101 ctx = ctx.parent;
102 }
103 }
104 handleParentEvent(event) {
105 const { binding, context, type } = event;
106 // Propagate the event to this context only if the binding key does not
107 // exist in this context. The parent binding is shadowed if there is a
108 // binding with the same key in this one.
109 if (this.context.contains(binding.key)) {
110 this._debug('Event %s %s is not re-emitted from %s to %s', type, binding.key, context.name, this.context.name);
111 return;
112 }
113 this._debug('Re-emitting %s %s from %s to %s', type, binding.key, context.name, this.context.name);
114 this.context.emitEvent(type, event);
115 }
116 /**
117 * A strongly-typed method to emit context events
118 * @param type Event type
119 * @param event Context event
120 */
121 emitEvent(type, event) {
122 this.emit(type, event);
123 }
124 /**
125 * Emit an `error` event
126 * @param err Error
127 */
128 emitError(err) {
129 this.emit('error', err);
130 }
131 /**
132 * Start a background task to listen on context events and notify observers
133 */
134 startNotificationTask() {
135 // Set up listeners on `bind` and `unbind` for notifications
136 this.setupNotification('bind', 'unbind');
137 // Create an async iterator for the `notification` event as a queue
138 this.notificationQueue = (0, p_event_1.iterator)(this, 'notification', {
139 // Do not end the iterator if an error event is emitted on the
140 // subscription manager
141 rejectionEvents: [],
142 });
143 return this.processNotifications();
144 }
145 /**
146 * Publish an event to the registered observers. Please note the
147 * notification is queued and performed asynchronously so that we allow fluent
148 * APIs such as `ctx.bind('key').to(...).tag(...);` and give observers the
149 * fully populated binding.
150 *
151 * @param event - Context event
152 * @param observers - Current set of context observers
153 */
154 async notifyObservers(event, observers = this._observers) {
155 if (!observers || observers.size === 0)
156 return;
157 const { type, binding, context } = event;
158 for (const observer of observers) {
159 if (typeof observer === 'function') {
160 await observer(type, binding, context);
161 }
162 else if (!observer.filter || observer.filter(binding)) {
163 await observer.observe(type, binding, context);
164 }
165 }
166 }
167 /**
168 * Process notification events as they arrive on the queue
169 */
170 async processNotifications() {
171 const events = this.notificationQueue;
172 if (events == null)
173 return;
174 for await (const { type, binding, context, observers } of events) {
175 // The loop will happen asynchronously upon events
176 try {
177 // The execution of observers happen in the Promise micro-task queue
178 await this.notifyObservers({ type, binding, context }, observers);
179 this.pendingNotifications--;
180 this._debug('Observers notified for %s of binding %s', type, binding.key);
181 this.emitEvent('observersNotified', { type, binding, context });
182 }
183 catch (err) {
184 // Do not reduce the pending notification count so that errors
185 // can be captured by waitUntilPendingNotificationsDone
186 this._debug('Error caught from observers', err);
187 // Errors caught from observers.
188 if (this.listenerCount('error') > 0) {
189 // waitUntilPendingNotificationsDone may be called
190 this.emitError(err);
191 }
192 else {
193 // Emit it to the current context. If no error listeners are
194 // registered, crash the process.
195 this.handleNotificationError(err);
196 }
197 }
198 }
199 }
200 /**
201 * Listen on given event types and emit `notification` event. This method
202 * merge multiple event types into one for notification.
203 * @param eventTypes - Context event types
204 */
205 setupNotification(...eventTypes) {
206 for (const type of eventTypes) {
207 this.context.on(type, ({ binding, context }) => {
208 // No need to schedule notifications if no observers are present
209 if (!this._observers || this._observers.size === 0)
210 return;
211 // Track pending events
212 this.pendingNotifications++;
213 // Take a snapshot of current observers to ensure notifications of this
214 // event will only be sent to current ones. Emit a new event to notify
215 // current context observers.
216 this.emitEvent('notification', {
217 type,
218 binding,
219 context,
220 observers: new Set(this._observers),
221 });
222 });
223 }
224 }
225 /**
226 * Wait until observers are notified for all of currently pending notification
227 * events.
228 *
229 * This method is for test only to perform assertions after observers are
230 * notified for relevant events.
231 */
232 async waitUntilPendingNotificationsDone(timeout) {
233 const count = this.pendingNotifications;
234 debug('Number of pending notifications: %d', count);
235 if (count === 0)
236 return;
237 await (0, p_event_1.multiple)(this, 'observersNotified', { count, timeout });
238 }
239 /**
240 * Add a context event observer to the context
241 * @param observer - Context observer instance or function
242 */
243 subscribe(observer) {
244 var _a;
245 this._observers = (_a = this._observers) !== null && _a !== void 0 ? _a : new Set();
246 this.setupEventHandlersIfNeeded();
247 this._observers.add(observer);
248 return new ContextSubscription(this.context, observer);
249 }
250 /**
251 * Remove the context event observer from the context
252 * @param observer - Context event observer
253 */
254 unsubscribe(observer) {
255 if (!this._observers)
256 return false;
257 return this._observers.delete(observer);
258 }
259 /**
260 * Check if an observer is subscribed to this context
261 * @param observer - Context observer
262 */
263 isSubscribed(observer) {
264 if (!this._observers)
265 return false;
266 return this._observers.has(observer);
267 }
268 /**
269 * Handle errors caught during the notification of observers
270 * @param err - Error
271 */
272 handleNotificationError(err) {
273 // Bubbling up the error event over the context chain
274 // until we find an error listener
275 let ctx = this.context;
276 while (ctx) {
277 if (ctx.listenerCount('error') === 0) {
278 // No error listener found, try its parent
279 ctx = ctx.parent;
280 continue;
281 }
282 this._debug('Emitting error to context %s', ctx.name, err);
283 ctx.emitError(err);
284 return;
285 }
286 // No context with error listeners found
287 this._debug('No error handler is configured for the context chain', err);
288 // Let it crash now by emitting an error event
289 this.context.emitError(err);
290 }
291 /**
292 * Close the context: clear observers, stop notifications, and remove event
293 * listeners from its parent context.
294 *
295 * @remarks
296 * This method MUST be called to avoid memory leaks once a context object is
297 * no longer needed and should be recycled. An example is the `RequestContext`,
298 * which is created per request.
299 */
300 close() {
301 this._observers = undefined;
302 if (this.notificationQueue != null) {
303 // Cancel the notification iterator
304 this.notificationQueue.return(undefined).catch(err => {
305 this.handleNotificationError(err);
306 });
307 this.notificationQueue = undefined;
308 }
309 if (this.context.parent && this._parentContextEventListener) {
310 this.context.parent.removeListener('bind', this._parentContextEventListener);
311 this.context.parent.removeListener('unbind', this._parentContextEventListener);
312 this._parentContextEventListener = undefined;
313 }
314 }
315}
316exports.ContextSubscriptionManager = ContextSubscriptionManager;
317//# sourceMappingURL=context-subscription.js.map
\No newline at end of file