UNPKG

4.67 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRedis = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const server_1 = require("./server");
9let redisPackage = {};
10class ServerRedis extends server_1.Server {
11 constructor(options) {
12 super();
13 this.options = options;
14 this.transportId = enums_1.Transport.REDIS;
15 this.isExplicitlyTerminated = false;
16 this.url =
17 this.getOptionsProp(options, 'url') ||
18 (!this.getOptionsProp(options, 'host') && constants_1.REDIS_DEFAULT_URL);
19 redisPackage = this.loadPackage('redis', ServerRedis.name, () => require('redis'));
20 this.initializeSerializer(options);
21 this.initializeDeserializer(options);
22 }
23 listen(callback) {
24 this.subClient = this.createRedisClient();
25 this.pubClient = this.createRedisClient();
26 this.handleError(this.pubClient);
27 this.handleError(this.subClient);
28 this.start(callback);
29 }
30 start(callback) {
31 this.bindEvents(this.subClient, this.pubClient);
32 this.subClient.on(constants_1.CONNECT_EVENT, callback);
33 }
34 bindEvents(subClient, pubClient) {
35 subClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
36 const subscribePatterns = [...this.messageHandlers.keys()];
37 subscribePatterns.forEach(pattern => {
38 const { isEventHandler } = this.messageHandlers.get(pattern);
39 subClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern));
40 });
41 }
42 close() {
43 this.isExplicitlyTerminated = true;
44 this.pubClient && this.pubClient.quit();
45 this.subClient && this.subClient.quit();
46 }
47 createRedisClient() {
48 return redisPackage.createClient(Object.assign(Object.assign({}, this.getClientOptions()), { url: this.url }));
49 }
50 getMessageHandler(pub) {
51 return async (channel, buffer) => this.handleMessage(channel, buffer, pub);
52 }
53 async handleMessage(channel, buffer, pub) {
54 const rawMessage = this.parseMessage(buffer);
55 const packet = this.deserializer.deserialize(rawMessage, { channel });
56 const redisCtx = new ctx_host_1.RedisContext([channel]);
57 if (shared_utils_1.isUndefined(packet.id)) {
58 return this.handleEvent(channel, packet, redisCtx);
59 }
60 const publish = this.getPublisher(pub, channel, packet.id);
61 const handler = this.getHandlerByPattern(channel);
62 if (!handler) {
63 const status = 'error';
64 const noHandlerPacket = {
65 id: packet.id,
66 status,
67 err: constants_1.NO_MESSAGE_HANDLER,
68 };
69 return publish(noHandlerPacket);
70 }
71 const response$ = this.transformToObservable(await handler(packet.data, redisCtx));
72 response$ && this.send(response$, publish);
73 }
74 getPublisher(pub, pattern, id) {
75 return (response) => {
76 Object.assign(response, { id });
77 const outgoingResponse = this.serializer.serialize(response);
78 return pub.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse));
79 };
80 }
81 parseMessage(content) {
82 try {
83 return JSON.parse(content);
84 }
85 catch (e) {
86 return content;
87 }
88 }
89 getRequestPattern(pattern) {
90 return pattern;
91 }
92 getReplyPattern(pattern) {
93 return `${pattern}.reply`;
94 }
95 handleError(stream) {
96 stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
97 }
98 getClientOptions() {
99 const retry_strategy = (options) => this.createRetryStrategy(options);
100 return Object.assign(Object.assign({}, (this.options || {})), { retry_strategy });
101 }
102 createRetryStrategy(options) {
103 if (options.error && options.error.code === 'ECONNREFUSED') {
104 this.logger.error(`Error ECONNREFUSED: ${this.url}`);
105 }
106 if (this.isExplicitlyTerminated) {
107 return undefined;
108 }
109 if (!this.getOptionsProp(this.options, 'retryAttempts') ||
110 options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) {
111 this.logger.error(`Retry time exhausted: ${this.url}`);
112 throw new Error('Retry time exhausted');
113 }
114 return this.getOptionsProp(this.options, 'retryDelay') || 0;
115 }
116}
117exports.ServerRedis = ServerRedis;