UNPKG

5.92 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientRedis = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const rxjs_1 = require("rxjs");
7const operators_1 = require("rxjs/operators");
8const constants_1 = require("../constants");
9const client_proxy_1 = require("./client-proxy");
10const constants_2 = require("./constants");
11let redisPackage = {};
12class ClientRedis extends client_proxy_1.ClientProxy {
13 constructor(options) {
14 super();
15 this.options = options;
16 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
17 this.subscriptionsCount = new Map();
18 this.isExplicitlyTerminated = false;
19 this.url =
20 this.getOptionsProp(options, 'url') ||
21 (!this.getOptionsProp(options, 'host') && constants_1.REDIS_DEFAULT_URL);
22 redisPackage = load_package_util_1.loadPackage('redis', ClientRedis.name, () => require('redis'));
23 this.initializeSerializer(options);
24 this.initializeDeserializer(options);
25 }
26 getRequestPattern(pattern) {
27 return pattern;
28 }
29 getReplyPattern(pattern) {
30 return `${pattern}.reply`;
31 }
32 close() {
33 this.pubClient && this.pubClient.quit();
34 this.subClient && this.subClient.quit();
35 this.pubClient = this.subClient = null;
36 this.isExplicitlyTerminated = true;
37 }
38 connect() {
39 if (this.pubClient && this.subClient) {
40 return this.connection;
41 }
42 const error$ = new rxjs_1.Subject();
43 this.pubClient = this.createClient(error$);
44 this.subClient = this.createClient(error$);
45 this.handleError(this.pubClient);
46 this.handleError(this.subClient);
47 const pubConnect$ = rxjs_1.fromEvent(this.pubClient, constants_1.CONNECT_EVENT);
48 const subClient$ = rxjs_1.fromEvent(this.subClient, constants_1.CONNECT_EVENT);
49 this.connection = rxjs_1.merge(error$, rxjs_1.zip(pubConnect$, subClient$))
50 .pipe(operators_1.take(1), operators_1.tap(() => this.subClient.on(constants_1.MESSAGE_EVENT, this.createResponseCallback())), operators_1.share())
51 .toPromise();
52 return this.connection;
53 }
54 createClient(error$) {
55 return redisPackage.createClient(Object.assign(Object.assign({}, this.getClientOptions(error$)), { url: this.url }));
56 }
57 handleError(client) {
58 client.addListener(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
59 }
60 getClientOptions(error$) {
61 const retry_strategy = (options) => this.createRetryStrategy(options, error$);
62 return Object.assign(Object.assign({}, (this.options || {})), { retry_strategy });
63 }
64 createRetryStrategy(options, error$) {
65 if (options.error && options.error.code === constants_2.ECONNREFUSED) {
66 error$.error(options.error);
67 }
68 if (this.isExplicitlyTerminated) {
69 return undefined;
70 }
71 if (!this.getOptionsProp(this.options, 'retryAttempts') ||
72 options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) {
73 return new Error('Retry time exhausted');
74 }
75 return this.getOptionsProp(this.options, 'retryDelay') || 0;
76 }
77 createResponseCallback() {
78 return (channel, buffer) => {
79 const packet = JSON.parse(buffer);
80 const { err, response, isDisposed, id } = this.deserializer.deserialize(packet);
81 const callback = this.routingMap.get(id);
82 if (!callback) {
83 return;
84 }
85 if (isDisposed || err) {
86 return callback({
87 err,
88 response,
89 isDisposed: true,
90 });
91 }
92 callback({
93 err,
94 response,
95 });
96 };
97 }
98 publish(partialPacket, callback) {
99 try {
100 const packet = this.assignPacketId(partialPacket);
101 const pattern = this.normalizePattern(partialPacket.pattern);
102 const serializedPacket = this.serializer.serialize(packet);
103 const responseChannel = this.getReplyPattern(pattern);
104 let subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
105 const publishPacket = () => {
106 subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
107 this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
108 this.routingMap.set(packet.id, callback);
109 this.pubClient.publish(this.getRequestPattern(pattern), JSON.stringify(serializedPacket));
110 };
111 if (subscriptionsCount <= 0) {
112 this.subClient.subscribe(responseChannel, (err) => !err && publishPacket());
113 }
114 else {
115 publishPacket();
116 }
117 return () => {
118 this.unsubscribeFromChannel(responseChannel);
119 this.routingMap.delete(packet.id);
120 };
121 }
122 catch (err) {
123 callback({ err });
124 }
125 }
126 dispatchEvent(packet) {
127 const pattern = this.normalizePattern(packet.pattern);
128 const serializedPacket = this.serializer.serialize(packet);
129 return new Promise((resolve, reject) => this.pubClient.publish(pattern, JSON.stringify(serializedPacket), err => err ? reject(err) : resolve()));
130 }
131 unsubscribeFromChannel(channel) {
132 const subscriptionCount = this.subscriptionsCount.get(channel);
133 this.subscriptionsCount.set(channel, subscriptionCount - 1);
134 if (subscriptionCount - 1 <= 0) {
135 this.subClient.unsubscribe(channel);
136 }
137 }
138}
139exports.ClientRedis = ClientRedis;