UNPKG

5.02 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRedis = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const server_1 = require("./server");
9let redisPackage = {};
10class ServerRedis extends server_1.Server {
11 constructor(options) {
12 super();
13 this.options = options;
14 this.transportId = enums_1.Transport.REDIS;
15 this.isExplicitlyTerminated = false;
16 redisPackage = this.loadPackage('ioredis', ServerRedis.name, () => require('ioredis'));
17 this.initializeSerializer(options);
18 this.initializeDeserializer(options);
19 }
20 listen(callback) {
21 try {
22 this.subClient = this.createRedisClient();
23 this.pubClient = this.createRedisClient();
24 this.handleError(this.pubClient);
25 this.handleError(this.subClient);
26 this.start(callback);
27 }
28 catch (err) {
29 callback(err);
30 }
31 }
32 start(callback) {
33 Promise.all([this.subClient.connect(), this.pubClient.connect()])
34 .then(() => {
35 this.bindEvents(this.subClient, this.pubClient);
36 callback();
37 })
38 .catch(callback);
39 }
40 bindEvents(subClient, pubClient) {
41 subClient.on(this.options?.wildcards ? 'pmessage' : constants_1.MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
42 const subscribePatterns = [...this.messageHandlers.keys()];
43 subscribePatterns.forEach(pattern => {
44 const { isEventHandler } = this.messageHandlers.get(pattern);
45 const channel = isEventHandler
46 ? pattern
47 : this.getRequestPattern(pattern);
48 if (this.options?.wildcards) {
49 subClient.psubscribe(channel);
50 }
51 else {
52 subClient.subscribe(channel);
53 }
54 });
55 }
56 close() {
57 this.isExplicitlyTerminated = true;
58 this.pubClient && this.pubClient.quit();
59 this.subClient && this.subClient.quit();
60 }
61 createRedisClient() {
62 return new redisPackage({
63 port: constants_1.REDIS_DEFAULT_PORT,
64 host: constants_1.REDIS_DEFAULT_HOST,
65 ...this.getClientOptions(),
66 lazyConnect: true,
67 });
68 }
69 getMessageHandler(pub) {
70 return this.options?.wildcards
71 ? (channel, pattern, buffer) => this.handleMessage(channel, buffer, pub, pattern)
72 : (channel, buffer) => this.handleMessage(channel, buffer, pub, channel);
73 }
74 async handleMessage(channel, buffer, pub, pattern) {
75 const rawMessage = this.parseMessage(buffer);
76 const packet = await this.deserializer.deserialize(rawMessage, { channel });
77 const redisCtx = new ctx_host_1.RedisContext([pattern]);
78 if ((0, shared_utils_1.isUndefined)(packet.id)) {
79 return this.handleEvent(channel, packet, redisCtx);
80 }
81 const publish = this.getPublisher(pub, channel, packet.id);
82 const handler = this.getHandlerByPattern(channel);
83 if (!handler) {
84 const status = 'error';
85 const noHandlerPacket = {
86 id: packet.id,
87 status,
88 err: constants_1.NO_MESSAGE_HANDLER,
89 };
90 return publish(noHandlerPacket);
91 }
92 const response$ = this.transformToObservable(await handler(packet.data, redisCtx));
93 response$ && this.send(response$, publish);
94 }
95 getPublisher(pub, pattern, id) {
96 return (response) => {
97 Object.assign(response, { id });
98 const outgoingResponse = this.serializer.serialize(response);
99 return pub.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse));
100 };
101 }
102 parseMessage(content) {
103 try {
104 return JSON.parse(content);
105 }
106 catch (e) {
107 return content;
108 }
109 }
110 getRequestPattern(pattern) {
111 return pattern;
112 }
113 getReplyPattern(pattern) {
114 return `${pattern}.reply`;
115 }
116 handleError(stream) {
117 stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
118 }
119 getClientOptions() {
120 const retryStrategy = (times) => this.createRetryStrategy(times);
121 return {
122 ...(this.options || {}),
123 retryStrategy,
124 };
125 }
126 createRetryStrategy(times) {
127 if (this.isExplicitlyTerminated) {
128 return undefined;
129 }
130 if (!this.getOptionsProp(this.options, 'retryAttempts') ||
131 times > this.getOptionsProp(this.options, 'retryAttempts')) {
132 this.logger.error(`Retry time exhausted`);
133 return;
134 }
135 return this.getOptionsProp(this.options, 'retryDelay') || 0;
136 }
137}
138exports.ServerRedis = ServerRedis;