1 | ;
|
2 | // Copyright IBM Corp. and LoopBack contributors 2019,2020. All Rights Reserved.
|
3 | // Node module: @loopback/context
|
4 | // This file is licensed under the MIT License.
|
5 | // License text available at https://opensource.org/licenses/MIT
|
6 | Object.defineProperty(exports, "__esModule", { value: true });
|
7 | exports.ContextSubscriptionManager = void 0;
|
8 | const tslib_1 = require("tslib");
|
9 | const debug_1 = tslib_1.__importDefault(require("debug"));
|
10 | const events_1 = require("events");
|
11 | const p_event_1 = require("p-event");
|
12 | const debug = (0, debug_1.default)('loopback:context:subscription');
|
13 | /**
|
14 | * An implementation of `Subscription` interface for context events
|
15 | */
|
16 | class ContextSubscription {
|
17 | constructor(context, observer) {
|
18 | this.context = context;
|
19 | this.observer = observer;
|
20 | this._closed = false;
|
21 | }
|
22 | unsubscribe() {
|
23 | this.context.unsubscribe(this.observer);
|
24 | this._closed = true;
|
25 | }
|
26 | get closed() {
|
27 | return this._closed;
|
28 | }
|
29 | }
|
30 | /**
|
31 | * Manager for context observer subscriptions
|
32 | */
|
33 | class ContextSubscriptionManager extends events_1.EventEmitter {
|
34 | constructor(context) {
|
35 | super();
|
36 | this.context = context;
|
37 | /**
|
38 | * Internal counter for pending notification events which are yet to be
|
39 | * processed by observers.
|
40 | */
|
41 | this.pendingNotifications = 0;
|
42 | this.setMaxListeners(Infinity);
|
43 | }
|
44 | /**
|
45 | * @internal
|
46 | */
|
47 | get parentContextEventListener() {
|
48 | return this._parentContextEventListener;
|
49 | }
|
50 | /**
|
51 | * @internal
|
52 | */
|
53 | get observers() {
|
54 | return this._observers;
|
55 | }
|
56 | /**
|
57 | * Wrap the debug statement so that it always print out the context name
|
58 | * as the prefix
|
59 | * @param args - Arguments for the debug
|
60 | */
|
61 | _debug(...args) {
|
62 | /* istanbul ignore if */
|
63 | if (!debug.enabled)
|
64 | return;
|
65 | const formatter = args.shift();
|
66 | if (typeof formatter === 'string') {
|
67 | debug(`[%s] ${formatter}`, this.context.name, ...args);
|
68 | }
|
69 | else {
|
70 | debug('[%s] ', this.context.name, formatter, ...args);
|
71 | }
|
72 | }
|
73 | /**
|
74 | * Set up an internal listener to notify registered observers asynchronously
|
75 | * upon `bind` and `unbind` events. This method will be called lazily when
|
76 | * the first observer is added.
|
77 | */
|
78 | setupEventHandlersIfNeeded() {
|
79 | if (this.notificationQueue != null)
|
80 | return;
|
81 | if (this.context.parent != null) {
|
82 | /**
|
83 | * Add an event listener to its parent context so that this context will
|
84 | * be notified of parent events, such as `bind` or `unbind`.
|
85 | */
|
86 | this._parentContextEventListener = event => {
|
87 | this.handleParentEvent(event);
|
88 | };
|
89 | // Listen on the parent context events
|
90 | this.context.parent.on('bind', this._parentContextEventListener);
|
91 | this.context.parent.on('unbind', this._parentContextEventListener);
|
92 | }
|
93 | // The following are two async functions. Returned promises are ignored as
|
94 | // they are long-running background tasks.
|
95 | this.startNotificationTask().catch(err => {
|
96 | this.handleNotificationError(err);
|
97 | });
|
98 | let ctx = this.context.parent;
|
99 | while (ctx) {
|
100 | ctx.subscriptionManager.setupEventHandlersIfNeeded();
|
101 | ctx = ctx.parent;
|
102 | }
|
103 | }
|
104 | handleParentEvent(event) {
|
105 | const { binding, context, type } = event;
|
106 | // Propagate the event to this context only if the binding key does not
|
107 | // exist in this context. The parent binding is shadowed if there is a
|
108 | // binding with the same key in this one.
|
109 | if (this.context.contains(binding.key)) {
|
110 | this._debug('Event %s %s is not re-emitted from %s to %s', type, binding.key, context.name, this.context.name);
|
111 | return;
|
112 | }
|
113 | this._debug('Re-emitting %s %s from %s to %s', type, binding.key, context.name, this.context.name);
|
114 | this.context.emitEvent(type, event);
|
115 | }
|
116 | /**
|
117 | * A strongly-typed method to emit context events
|
118 | * @param type Event type
|
119 | * @param event Context event
|
120 | */
|
121 | emitEvent(type, event) {
|
122 | this.emit(type, event);
|
123 | }
|
124 | /**
|
125 | * Emit an `error` event
|
126 | * @param err Error
|
127 | */
|
128 | emitError(err) {
|
129 | this.emit('error', err);
|
130 | }
|
131 | /**
|
132 | * Start a background task to listen on context events and notify observers
|
133 | */
|
134 | startNotificationTask() {
|
135 | // Set up listeners on `bind` and `unbind` for notifications
|
136 | this.setupNotification('bind', 'unbind');
|
137 | // Create an async iterator for the `notification` event as a queue
|
138 | this.notificationQueue = (0, p_event_1.iterator)(this, 'notification', {
|
139 | // Do not end the iterator if an error event is emitted on the
|
140 | // subscription manager
|
141 | rejectionEvents: [],
|
142 | });
|
143 | return this.processNotifications();
|
144 | }
|
145 | /**
|
146 | * Publish an event to the registered observers. Please note the
|
147 | * notification is queued and performed asynchronously so that we allow fluent
|
148 | * APIs such as `ctx.bind('key').to(...).tag(...);` and give observers the
|
149 | * fully populated binding.
|
150 | *
|
151 | * @param event - Context event
|
152 | * @param observers - Current set of context observers
|
153 | */
|
154 | async notifyObservers(event, observers = this._observers) {
|
155 | if (!observers || observers.size === 0)
|
156 | return;
|
157 | const { type, binding, context } = event;
|
158 | for (const observer of observers) {
|
159 | if (typeof observer === 'function') {
|
160 | await observer(type, binding, context);
|
161 | }
|
162 | else if (!observer.filter || observer.filter(binding)) {
|
163 | await observer.observe(type, binding, context);
|
164 | }
|
165 | }
|
166 | }
|
167 | /**
|
168 | * Process notification events as they arrive on the queue
|
169 | */
|
170 | async processNotifications() {
|
171 | const events = this.notificationQueue;
|
172 | if (events == null)
|
173 | return;
|
174 | for await (const { type, binding, context, observers } of events) {
|
175 | // The loop will happen asynchronously upon events
|
176 | try {
|
177 | // The execution of observers happen in the Promise micro-task queue
|
178 | await this.notifyObservers({ type, binding, context }, observers);
|
179 | this.pendingNotifications--;
|
180 | this._debug('Observers notified for %s of binding %s', type, binding.key);
|
181 | this.emitEvent('observersNotified', { type, binding, context });
|
182 | }
|
183 | catch (err) {
|
184 | // Do not reduce the pending notification count so that errors
|
185 | // can be captured by waitUntilPendingNotificationsDone
|
186 | this._debug('Error caught from observers', err);
|
187 | // Errors caught from observers.
|
188 | if (this.listenerCount('error') > 0) {
|
189 | // waitUntilPendingNotificationsDone may be called
|
190 | this.emitError(err);
|
191 | }
|
192 | else {
|
193 | // Emit it to the current context. If no error listeners are
|
194 | // registered, crash the process.
|
195 | this.handleNotificationError(err);
|
196 | }
|
197 | }
|
198 | }
|
199 | }
|
200 | /**
|
201 | * Listen on given event types and emit `notification` event. This method
|
202 | * merge multiple event types into one for notification.
|
203 | * @param eventTypes - Context event types
|
204 | */
|
205 | setupNotification(...eventTypes) {
|
206 | for (const type of eventTypes) {
|
207 | this.context.on(type, ({ binding, context }) => {
|
208 | // No need to schedule notifications if no observers are present
|
209 | if (!this._observers || this._observers.size === 0)
|
210 | return;
|
211 | // Track pending events
|
212 | this.pendingNotifications++;
|
213 | // Take a snapshot of current observers to ensure notifications of this
|
214 | // event will only be sent to current ones. Emit a new event to notify
|
215 | // current context observers.
|
216 | this.emitEvent('notification', {
|
217 | type,
|
218 | binding,
|
219 | context,
|
220 | observers: new Set(this._observers),
|
221 | });
|
222 | });
|
223 | }
|
224 | }
|
225 | /**
|
226 | * Wait until observers are notified for all of currently pending notification
|
227 | * events.
|
228 | *
|
229 | * This method is for test only to perform assertions after observers are
|
230 | * notified for relevant events.
|
231 | */
|
232 | async waitUntilPendingNotificationsDone(timeout) {
|
233 | const count = this.pendingNotifications;
|
234 | debug('Number of pending notifications: %d', count);
|
235 | if (count === 0)
|
236 | return;
|
237 | await (0, p_event_1.multiple)(this, 'observersNotified', { count, timeout });
|
238 | }
|
239 | /**
|
240 | * Add a context event observer to the context
|
241 | * @param observer - Context observer instance or function
|
242 | */
|
243 | subscribe(observer) {
|
244 | var _a;
|
245 | this._observers = (_a = this._observers) !== null && _a !== void 0 ? _a : new Set();
|
246 | this.setupEventHandlersIfNeeded();
|
247 | this._observers.add(observer);
|
248 | return new ContextSubscription(this.context, observer);
|
249 | }
|
250 | /**
|
251 | * Remove the context event observer from the context
|
252 | * @param observer - Context event observer
|
253 | */
|
254 | unsubscribe(observer) {
|
255 | if (!this._observers)
|
256 | return false;
|
257 | return this._observers.delete(observer);
|
258 | }
|
259 | /**
|
260 | * Check if an observer is subscribed to this context
|
261 | * @param observer - Context observer
|
262 | */
|
263 | isSubscribed(observer) {
|
264 | if (!this._observers)
|
265 | return false;
|
266 | return this._observers.has(observer);
|
267 | }
|
268 | /**
|
269 | * Handle errors caught during the notification of observers
|
270 | * @param err - Error
|
271 | */
|
272 | handleNotificationError(err) {
|
273 | // Bubbling up the error event over the context chain
|
274 | // until we find an error listener
|
275 | let ctx = this.context;
|
276 | while (ctx) {
|
277 | if (ctx.listenerCount('error') === 0) {
|
278 | // No error listener found, try its parent
|
279 | ctx = ctx.parent;
|
280 | continue;
|
281 | }
|
282 | this._debug('Emitting error to context %s', ctx.name, err);
|
283 | ctx.emitError(err);
|
284 | return;
|
285 | }
|
286 | // No context with error listeners found
|
287 | this._debug('No error handler is configured for the context chain', err);
|
288 | // Let it crash now by emitting an error event
|
289 | this.context.emitError(err);
|
290 | }
|
291 | /**
|
292 | * Close the context: clear observers, stop notifications, and remove event
|
293 | * listeners from its parent context.
|
294 | *
|
295 | * @remarks
|
296 | * This method MUST be called to avoid memory leaks once a context object is
|
297 | * no longer needed and should be recycled. An example is the `RequestContext`,
|
298 | * which is created per request.
|
299 | */
|
300 | close() {
|
301 | this._observers = undefined;
|
302 | if (this.notificationQueue != null) {
|
303 | // Cancel the notification iterator
|
304 | this.notificationQueue.return(undefined).catch(err => {
|
305 | this.handleNotificationError(err);
|
306 | });
|
307 | this.notificationQueue = undefined;
|
308 | }
|
309 | if (this.context.parent && this._parentContextEventListener) {
|
310 | this.context.parent.removeListener('bind', this._parentContextEventListener);
|
311 | this.context.parent.removeListener('unbind', this._parentContextEventListener);
|
312 | this._parentContextEventListener = undefined;
|
313 | }
|
314 | }
|
315 | }
|
316 | exports.ContextSubscriptionManager = ContextSubscriptionManager;
|
317 | //# sourceMappingURL=context-subscription.js.map |
\ | No newline at end of file |