UNPKG

4.79 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRedis = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const server_1 = require("./server");
9let redisPackage = {};
10class ServerRedis extends server_1.Server {
11 constructor(options) {
12 super();
13 this.options = options;
14 this.transportId = enums_1.Transport.REDIS;
15 this.isExplicitlyTerminated = false;
16 this.url =
17 this.getOptionsProp(options, 'url') ||
18 (!this.getOptionsProp(options, 'host') && constants_1.REDIS_DEFAULT_URL);
19 redisPackage = this.loadPackage('redis', ServerRedis.name, () => require('redis'));
20 this.initializeSerializer(options);
21 this.initializeDeserializer(options);
22 }
23 listen(callback) {
24 try {
25 this.subClient = this.createRedisClient();
26 this.pubClient = this.createRedisClient();
27 this.handleError(this.pubClient);
28 this.handleError(this.subClient);
29 this.start(callback);
30 }
31 catch (err) {
32 callback(err);
33 }
34 }
35 start(callback) {
36 this.bindEvents(this.subClient, this.pubClient);
37 this.subClient.on(constants_1.CONNECT_EVENT, callback);
38 }
39 bindEvents(subClient, pubClient) {
40 subClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
41 const subscribePatterns = [...this.messageHandlers.keys()];
42 subscribePatterns.forEach(pattern => {
43 const { isEventHandler } = this.messageHandlers.get(pattern);
44 subClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern));
45 });
46 }
47 close() {
48 this.isExplicitlyTerminated = true;
49 this.pubClient && this.pubClient.quit();
50 this.subClient && this.subClient.quit();
51 }
52 createRedisClient() {
53 return redisPackage.createClient(Object.assign(Object.assign({}, this.getClientOptions()), { url: this.url }));
54 }
55 getMessageHandler(pub) {
56 return async (channel, buffer) => this.handleMessage(channel, buffer, pub);
57 }
58 async handleMessage(channel, buffer, pub) {
59 const rawMessage = this.parseMessage(buffer);
60 const packet = await this.deserializer.deserialize(rawMessage, { channel });
61 const redisCtx = new ctx_host_1.RedisContext([channel]);
62 if ((0, shared_utils_1.isUndefined)(packet.id)) {
63 return this.handleEvent(channel, packet, redisCtx);
64 }
65 const publish = this.getPublisher(pub, channel, packet.id);
66 const handler = this.getHandlerByPattern(channel);
67 if (!handler) {
68 const status = 'error';
69 const noHandlerPacket = {
70 id: packet.id,
71 status,
72 err: constants_1.NO_MESSAGE_HANDLER,
73 };
74 return publish(noHandlerPacket);
75 }
76 const response$ = this.transformToObservable(await handler(packet.data, redisCtx));
77 response$ && this.send(response$, publish);
78 }
79 getPublisher(pub, pattern, id) {
80 return (response) => {
81 Object.assign(response, { id });
82 const outgoingResponse = this.serializer.serialize(response);
83 return pub.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse));
84 };
85 }
86 parseMessage(content) {
87 try {
88 return JSON.parse(content);
89 }
90 catch (e) {
91 return content;
92 }
93 }
94 getRequestPattern(pattern) {
95 return pattern;
96 }
97 getReplyPattern(pattern) {
98 return `${pattern}.reply`;
99 }
100 handleError(stream) {
101 stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
102 }
103 getClientOptions() {
104 const retry_strategy = (options) => this.createRetryStrategy(options);
105 return Object.assign(Object.assign({}, (this.options || {})), { retry_strategy });
106 }
107 createRetryStrategy(options) {
108 if (options.error && options.error.code === 'ECONNREFUSED') {
109 this.logger.error(`Error ECONNREFUSED: ${this.url}`);
110 }
111 if (this.isExplicitlyTerminated) {
112 return undefined;
113 }
114 if (!this.getOptionsProp(this.options, 'retryAttempts') ||
115 options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) {
116 this.logger.error(`Retry time exhausted: ${this.url}`);
117 throw new Error('Retry time exhausted');
118 }
119 return this.getOptionsProp(this.options, 'retryDelay') || 0;
120 }
121}
122exports.ServerRedis = ServerRedis;