UNPKG

4.98 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.Server = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const rxjs_1 = require("rxjs");
7const operators_1 = require("rxjs/operators");
8const constants_1 = require("../constants");
9const incoming_request_deserializer_1 = require("../deserializers/incoming-request.deserializer");
10const identity_serializer_1 = require("../serializers/identity.serializer");
11const utils_1 = require("../utils");
12class Server {
13 constructor() {
14 this.messageHandlers = new Map();
15 this.logger = new logger_service_1.Logger(Server.name);
16 }
17 addHandler(pattern, callback, isEventHandler = false, extras = {}) {
18 const normalizedPattern = this.normalizePattern(pattern);
19 callback.isEventHandler = isEventHandler;
20 callback.extras = extras;
21 if (this.messageHandlers.has(normalizedPattern) && isEventHandler) {
22 const headRef = this.messageHandlers.get(normalizedPattern);
23 const getTail = (handler) => (handler === null || handler === void 0 ? void 0 : handler.next) ? getTail(handler.next) : handler;
24 const tailRef = getTail(headRef);
25 tailRef.next = callback;
26 }
27 else {
28 this.messageHandlers.set(normalizedPattern, callback);
29 }
30 }
31 getHandlers() {
32 return this.messageHandlers;
33 }
34 getHandlerByPattern(pattern) {
35 const route = this.getRouteFromPattern(pattern);
36 return this.messageHandlers.has(route)
37 ? this.messageHandlers.get(route)
38 : null;
39 }
40 send(stream$, respond) {
41 let dataBuffer = null;
42 const scheduleOnNextTick = (data) => {
43 if (!dataBuffer) {
44 dataBuffer = [data];
45 process.nextTick(async () => {
46 for (const item of dataBuffer) {
47 await respond(item);
48 }
49 dataBuffer = null;
50 });
51 }
52 else if (!data.isDisposed) {
53 dataBuffer = dataBuffer.concat(data);
54 }
55 else {
56 dataBuffer[dataBuffer.length - 1].isDisposed = data.isDisposed;
57 }
58 };
59 return stream$
60 .pipe((0, operators_1.catchError)((err) => {
61 scheduleOnNextTick({ err });
62 return rxjs_1.EMPTY;
63 }), (0, operators_1.finalize)(() => scheduleOnNextTick({ isDisposed: true })))
64 .subscribe((response) => scheduleOnNextTick({ response }));
65 }
66 async handleEvent(pattern, packet, context) {
67 const handler = this.getHandlerByPattern(pattern);
68 if (!handler) {
69 return this.logger.error((0, constants_1.NO_EVENT_HANDLER) `${pattern}`);
70 }
71 const resultOrStream = await handler(packet.data, context);
72 if ((0, rxjs_1.isObservable)(resultOrStream)) {
73 const connectableSource = (0, rxjs_1.connectable)(resultOrStream, {
74 connector: () => new rxjs_1.Subject(),
75 resetOnDisconnect: false,
76 });
77 connectableSource.connect();
78 }
79 }
80 transformToObservable(resultOrDeferred) {
81 if (resultOrDeferred instanceof Promise) {
82 return (0, rxjs_1.from)(resultOrDeferred);
83 }
84 if ((0, rxjs_1.isObservable)(resultOrDeferred)) {
85 return resultOrDeferred;
86 }
87 return (0, rxjs_1.of)(resultOrDeferred);
88 }
89 getOptionsProp(obj, prop, defaultValue = undefined) {
90 return obj && prop in obj ? obj[prop] : defaultValue;
91 }
92 handleError(error) {
93 this.logger.error(error);
94 }
95 loadPackage(name, ctx, loader) {
96 return (0, load_package_util_1.loadPackage)(name, ctx, loader);
97 }
98 initializeSerializer(options) {
99 this.serializer =
100 (options &&
101 options.serializer) ||
102 new identity_serializer_1.IdentitySerializer();
103 }
104 initializeDeserializer(options) {
105 this.deserializer =
106 (options &&
107 options.deserializer) ||
108 new incoming_request_deserializer_1.IncomingRequestDeserializer();
109 }
110 /**
111 * Transforms the server Pattern to valid type and returns a route for him.
112 *
113 * @param {string} pattern - server pattern
114 * @returns string
115 */
116 getRouteFromPattern(pattern) {
117 let validPattern;
118 try {
119 validPattern = JSON.parse(pattern);
120 }
121 catch (error) {
122 // Uses a fundamental object (`pattern` variable without any conversion)
123 validPattern = pattern;
124 }
125 return this.normalizePattern(validPattern);
126 }
127 normalizePattern(pattern) {
128 return (0, utils_1.transformPatternToRoute)(pattern);
129 }
130}
131exports.Server = Server;