UNPKG

6.05 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientRedis = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const rxjs_1 = require("rxjs");
7const operators_1 = require("rxjs/operators");
8const constants_1 = require("../constants");
9const client_proxy_1 = require("./client-proxy");
10let redisPackage = {};
11class ClientRedis extends client_proxy_1.ClientProxy {
12 constructor(options) {
13 super();
14 this.options = options;
15 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
16 this.subscriptionsCount = new Map();
17 this.isExplicitlyTerminated = false;
18 this.url =
19 this.getOptionsProp(options, 'url') ||
20 (!this.getOptionsProp(options, 'host') && constants_1.REDIS_DEFAULT_URL);
21 redisPackage = (0, load_package_util_1.loadPackage)('redis', ClientRedis.name, () => require('redis'));
22 this.initializeSerializer(options);
23 this.initializeDeserializer(options);
24 }
25 getRequestPattern(pattern) {
26 return pattern;
27 }
28 getReplyPattern(pattern) {
29 return `${pattern}.reply`;
30 }
31 close() {
32 this.pubClient && this.pubClient.quit();
33 this.subClient && this.subClient.quit();
34 this.pubClient = this.subClient = null;
35 this.isExplicitlyTerminated = true;
36 }
37 connect() {
38 if (this.pubClient && this.subClient) {
39 return this.connection;
40 }
41 const error$ = new rxjs_1.Subject();
42 this.pubClient = this.createClient(error$);
43 this.subClient = this.createClient(error$);
44 this.handleError(this.pubClient);
45 this.handleError(this.subClient);
46 const pubConnect$ = (0, rxjs_1.fromEvent)(this.pubClient, constants_1.CONNECT_EVENT);
47 const subClient$ = (0, rxjs_1.fromEvent)(this.subClient, constants_1.CONNECT_EVENT);
48 this.connection = (0, rxjs_1.lastValueFrom)((0, rxjs_1.merge)(error$, (0, rxjs_1.zip)(pubConnect$, subClient$)).pipe((0, operators_1.take)(1), (0, operators_1.tap)(() => this.subClient.on(constants_1.MESSAGE_EVENT, this.createResponseCallback())), (0, operators_1.share)())).catch(err => {
49 if (err instanceof rxjs_1.EmptyError) {
50 return;
51 }
52 throw err;
53 });
54 return this.connection;
55 }
56 createClient(error$) {
57 return redisPackage.createClient(Object.assign(Object.assign({}, this.getClientOptions(error$)), { url: this.url }));
58 }
59 handleError(client) {
60 client.addListener(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
61 }
62 getClientOptions(error$) {
63 const retry_strategy = (options) => this.createRetryStrategy(options, error$);
64 return Object.assign(Object.assign({}, (this.options || {})), { retry_strategy });
65 }
66 createRetryStrategy(options, error$) {
67 if (options.error && options.error.code === constants_1.ECONNREFUSED) {
68 error$.error(options.error);
69 }
70 if (this.isExplicitlyTerminated) {
71 return undefined;
72 }
73 if (!this.getOptionsProp(this.options, 'retryAttempts') ||
74 options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) {
75 return new Error('Retry time exhausted');
76 }
77 return this.getOptionsProp(this.options, 'retryDelay') || 0;
78 }
79 createResponseCallback() {
80 return async (channel, buffer) => {
81 const packet = JSON.parse(buffer);
82 const { err, response, isDisposed, id } = await this.deserializer.deserialize(packet);
83 const callback = this.routingMap.get(id);
84 if (!callback) {
85 return;
86 }
87 if (isDisposed || err) {
88 return callback({
89 err,
90 response,
91 isDisposed: true,
92 });
93 }
94 callback({
95 err,
96 response,
97 });
98 };
99 }
100 publish(partialPacket, callback) {
101 try {
102 const packet = this.assignPacketId(partialPacket);
103 const pattern = this.normalizePattern(partialPacket.pattern);
104 const serializedPacket = this.serializer.serialize(packet);
105 const responseChannel = this.getReplyPattern(pattern);
106 let subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
107 const publishPacket = () => {
108 subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
109 this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
110 this.routingMap.set(packet.id, callback);
111 this.pubClient.publish(this.getRequestPattern(pattern), JSON.stringify(serializedPacket));
112 };
113 if (subscriptionsCount <= 0) {
114 this.subClient.subscribe(responseChannel, (err) => !err && publishPacket());
115 }
116 else {
117 publishPacket();
118 }
119 return () => {
120 this.unsubscribeFromChannel(responseChannel);
121 this.routingMap.delete(packet.id);
122 };
123 }
124 catch (err) {
125 callback({ err });
126 }
127 }
128 dispatchEvent(packet) {
129 const pattern = this.normalizePattern(packet.pattern);
130 const serializedPacket = this.serializer.serialize(packet);
131 return new Promise((resolve, reject) => this.pubClient.publish(pattern, JSON.stringify(serializedPacket), err => err ? reject(err) : resolve()));
132 }
133 unsubscribeFromChannel(channel) {
134 const subscriptionCount = this.subscriptionsCount.get(channel);
135 this.subscriptionsCount.set(channel, subscriptionCount - 1);
136 if (subscriptionCount - 1 <= 0) {
137 this.subClient.unsubscribe(channel);
138 }
139 }
140}
141exports.ClientRedis = ClientRedis;