UNPKG

9.04 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ListenersController = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const context_id_factory_1 = require("@nestjs/core/helpers/context-id-factory");
6const execution_context_host_1 = require("@nestjs/core/helpers/execution-context-host");
7const constants_1 = require("@nestjs/core/injector/constants");
8const metadata_scanner_1 = require("@nestjs/core/metadata-scanner");
9const request_constants_1 = require("@nestjs/core/router/request/request-constants");
10const rxjs_1 = require("rxjs");
11const request_context_host_1 = require("./context/request-context-host");
12const rpc_metadata_constants_1 = require("./context/rpc-metadata-constants");
13const enums_1 = require("./enums");
14const listener_metadata_explorer_1 = require("./listener-metadata-explorer");
15const server_1 = require("./server");
16class ListenersController {
17 constructor(clientsContainer, contextCreator, container, injector, clientFactory, exceptionFiltersContext, graphInspector) {
18 this.clientsContainer = clientsContainer;
19 this.contextCreator = contextCreator;
20 this.container = container;
21 this.injector = injector;
22 this.clientFactory = clientFactory;
23 this.exceptionFiltersContext = exceptionFiltersContext;
24 this.graphInspector = graphInspector;
25 this.metadataExplorer = new listener_metadata_explorer_1.ListenerMetadataExplorer(new metadata_scanner_1.MetadataScanner());
26 this.exceptionFiltersCache = new WeakMap();
27 }
28 registerPatternHandlers(instanceWrapper, server, moduleKey) {
29 const { instance } = instanceWrapper;
30 const isStatic = instanceWrapper.isDependencyTreeStatic();
31 const patternHandlers = this.metadataExplorer.explore(instance);
32 const moduleRef = this.container.getModuleByKey(moduleKey);
33 const defaultCallMetadata = server instanceof server_1.ServerGrpc
34 ? rpc_metadata_constants_1.DEFAULT_GRPC_CALLBACK_METADATA
35 : rpc_metadata_constants_1.DEFAULT_CALLBACK_METADATA;
36 patternHandlers
37 .filter(({ transport }) => (0, shared_utils_1.isUndefined)(transport) ||
38 (0, shared_utils_1.isUndefined)(server.transportId) ||
39 transport === server.transportId)
40 .reduce((acc, handler) => {
41 handler.patterns.forEach(pattern => acc.push({ ...handler, patterns: [pattern] }));
42 return acc;
43 }, [])
44 .forEach((definition) => {
45 const { patterns: [pattern], targetCallback, methodKey, extras, isEventHandler, } = definition;
46 this.insertEntrypointDefinition(instanceWrapper, definition, server.transportId);
47 if (isStatic) {
48 const proxy = this.contextCreator.create(instance, targetCallback, moduleKey, methodKey, constants_1.STATIC_CONTEXT, undefined, defaultCallMetadata);
49 if (isEventHandler) {
50 const eventHandler = async (...args) => {
51 const originalArgs = args;
52 const [dataOrContextHost] = originalArgs;
53 if (dataOrContextHost instanceof request_context_host_1.RequestContextHost) {
54 args = args.slice(1, args.length);
55 }
56 const returnValue = proxy(...args);
57 return this.forkJoinHandlersIfAttached(returnValue, originalArgs, eventHandler);
58 };
59 return server.addHandler(pattern, eventHandler, isEventHandler, extras);
60 }
61 else {
62 return server.addHandler(pattern, proxy, isEventHandler, extras);
63 }
64 }
65 const asyncHandler = this.createRequestScopedHandler(instanceWrapper, pattern, moduleRef, moduleKey, methodKey, defaultCallMetadata, isEventHandler);
66 server.addHandler(pattern, asyncHandler, isEventHandler, extras);
67 });
68 }
69 insertEntrypointDefinition(instanceWrapper, definition, transportId) {
70 this.graphInspector.insertEntrypointDefinition({
71 type: 'microservice',
72 methodName: definition.methodKey,
73 className: instanceWrapper.metatype?.name,
74 classNodeId: instanceWrapper.id,
75 metadata: {
76 key: definition.patterns.toString(),
77 transportId: typeof transportId === 'number'
78 ? enums_1.Transport[transportId]
79 : transportId,
80 patterns: definition.patterns,
81 isEventHandler: definition.isEventHandler,
82 extras: definition.extras,
83 },
84 }, instanceWrapper.id);
85 }
86 forkJoinHandlersIfAttached(currentReturnValue, originalArgs, handlerRef) {
87 if (handlerRef.next) {
88 const returnedValueWrapper = handlerRef.next(...originalArgs);
89 return (0, rxjs_1.forkJoin)({
90 current: this.transformToObservable(currentReturnValue),
91 next: this.transformToObservable(returnedValueWrapper),
92 });
93 }
94 return currentReturnValue;
95 }
96 assignClientsToProperties(instance) {
97 for (const { property, metadata, } of this.metadataExplorer.scanForClientHooks(instance)) {
98 const client = this.clientFactory.create(metadata);
99 this.clientsContainer.addClient(client);
100 this.assignClientToInstance(instance, property, client);
101 }
102 }
103 assignClientToInstance(instance, property, client) {
104 Reflect.set(instance, property, client);
105 }
106 createRequestScopedHandler(wrapper, pattern, moduleRef, moduleKey, methodKey, defaultCallMetadata = rpc_metadata_constants_1.DEFAULT_CALLBACK_METADATA, isEventHandler = false) {
107 const collection = moduleRef.controllers;
108 const { instance } = wrapper;
109 const isTreeDurable = wrapper.isDependencyTreeDurable();
110 const requestScopedHandler = async (...args) => {
111 try {
112 let contextId;
113 let [dataOrContextHost] = args;
114 if (dataOrContextHost instanceof request_context_host_1.RequestContextHost) {
115 contextId = this.getContextId(dataOrContextHost, isTreeDurable);
116 args.shift();
117 }
118 else {
119 const [data, reqCtx] = args;
120 const request = request_context_host_1.RequestContextHost.create(pattern, data, reqCtx);
121 contextId = this.getContextId(request, isTreeDurable);
122 dataOrContextHost = request;
123 }
124 const contextInstance = await this.injector.loadPerContext(instance, moduleRef, collection, contextId);
125 const proxy = this.contextCreator.create(contextInstance, contextInstance[methodKey], moduleKey, methodKey, contextId, wrapper.id, defaultCallMetadata);
126 const returnValue = proxy(...args);
127 if (isEventHandler) {
128 return this.forkJoinHandlersIfAttached(returnValue, [dataOrContextHost, ...args], requestScopedHandler);
129 }
130 return returnValue;
131 }
132 catch (err) {
133 let exceptionFilter = this.exceptionFiltersCache.get(instance[methodKey]);
134 if (!exceptionFilter) {
135 exceptionFilter = this.exceptionFiltersContext.create(instance, instance[methodKey], moduleKey);
136 this.exceptionFiltersCache.set(instance[methodKey], exceptionFilter);
137 }
138 const host = new execution_context_host_1.ExecutionContextHost(args);
139 host.setType('rpc');
140 return exceptionFilter.handle(err, host);
141 }
142 };
143 return requestScopedHandler;
144 }
145 getContextId(request, isTreeDurable) {
146 const contextId = context_id_factory_1.ContextIdFactory.getByRequest(request);
147 if (!request[request_constants_1.REQUEST_CONTEXT_ID]) {
148 Object.defineProperty(request, request_constants_1.REQUEST_CONTEXT_ID, {
149 value: contextId,
150 enumerable: false,
151 writable: false,
152 configurable: false,
153 });
154 const requestProviderValue = isTreeDurable ? contextId.payload : request;
155 this.container.registerRequestProvider(requestProviderValue, contextId);
156 }
157 return contextId;
158 }
159 transformToObservable(resultOrDeferred) {
160 if (resultOrDeferred instanceof Promise) {
161 return (0, rxjs_1.from)(resultOrDeferred).pipe((0, rxjs_1.mergeMap)(val => ((0, rxjs_1.isObservable)(val) ? val : (0, rxjs_1.of)(val))));
162 }
163 if ((0, rxjs_1.isObservable)(resultOrDeferred)) {
164 return resultOrDeferred;
165 }
166 return (0, rxjs_1.of)(resultOrDeferred);
167 }
168}
169exports.ListenersController = ListenersController;