UNPKG

4.6 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRedis = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const server_1 = require("./server");
9let redisPackage = {};
10class ServerRedis extends server_1.Server {
11 constructor(options) {
12 super();
13 this.options = options;
14 this.transportId = enums_1.Transport.REDIS;
15 this.isExplicitlyTerminated = false;
16 redisPackage = this.loadPackage('ioredis', ServerRedis.name, () => require('ioredis'));
17 this.initializeSerializer(options);
18 this.initializeDeserializer(options);
19 }
20 listen(callback) {
21 try {
22 this.subClient = this.createRedisClient();
23 this.pubClient = this.createRedisClient();
24 this.handleError(this.pubClient);
25 this.handleError(this.subClient);
26 this.start(callback);
27 }
28 catch (err) {
29 callback(err);
30 }
31 }
32 start(callback) {
33 Promise.all([this.subClient.connect(), this.pubClient.connect()])
34 .then(() => {
35 this.bindEvents(this.subClient, this.pubClient);
36 callback();
37 })
38 .catch(callback);
39 }
40 bindEvents(subClient, pubClient) {
41 subClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
42 const subscribePatterns = [...this.messageHandlers.keys()];
43 subscribePatterns.forEach(pattern => {
44 const { isEventHandler } = this.messageHandlers.get(pattern);
45 subClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern));
46 });
47 }
48 close() {
49 this.isExplicitlyTerminated = true;
50 this.pubClient && this.pubClient.quit();
51 this.subClient && this.subClient.quit();
52 }
53 createRedisClient() {
54 return new redisPackage(Object.assign(Object.assign({ port: constants_1.REDIS_DEFAULT_PORT, host: constants_1.REDIS_DEFAULT_HOST }, this.getClientOptions()), { lazyConnect: true }));
55 }
56 getMessageHandler(pub) {
57 return async (channel, buffer) => this.handleMessage(channel, buffer, pub);
58 }
59 async handleMessage(channel, buffer, pub) {
60 const rawMessage = this.parseMessage(buffer);
61 const packet = await this.deserializer.deserialize(rawMessage, { channel });
62 const redisCtx = new ctx_host_1.RedisContext([channel]);
63 if ((0, shared_utils_1.isUndefined)(packet.id)) {
64 return this.handleEvent(channel, packet, redisCtx);
65 }
66 const publish = this.getPublisher(pub, channel, packet.id);
67 const handler = this.getHandlerByPattern(channel);
68 if (!handler) {
69 const status = 'error';
70 const noHandlerPacket = {
71 id: packet.id,
72 status,
73 err: constants_1.NO_MESSAGE_HANDLER,
74 };
75 return publish(noHandlerPacket);
76 }
77 const response$ = this.transformToObservable(await handler(packet.data, redisCtx));
78 response$ && this.send(response$, publish);
79 }
80 getPublisher(pub, pattern, id) {
81 return (response) => {
82 Object.assign(response, { id });
83 const outgoingResponse = this.serializer.serialize(response);
84 return pub.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse));
85 };
86 }
87 parseMessage(content) {
88 try {
89 return JSON.parse(content);
90 }
91 catch (e) {
92 return content;
93 }
94 }
95 getRequestPattern(pattern) {
96 return pattern;
97 }
98 getReplyPattern(pattern) {
99 return `${pattern}.reply`;
100 }
101 handleError(stream) {
102 stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
103 }
104 getClientOptions() {
105 const retryStrategy = (times) => this.createRetryStrategy(times);
106 return Object.assign(Object.assign({}, (this.options || {})), { retryStrategy });
107 }
108 createRetryStrategy(times) {
109 if (this.isExplicitlyTerminated) {
110 return undefined;
111 }
112 if (!this.getOptionsProp(this.options, 'retryAttempts') ||
113 times > this.getOptionsProp(this.options, 'retryAttempts')) {
114 this.logger.error(`Retry time exhausted`);
115 return;
116 }
117 return this.getOptionsProp(this.options, 'retryDelay') || 0;
118 }
119}
120exports.ServerRedis = ServerRedis;