UNPKG

5 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientMqtt = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const rxjs_1 = require("rxjs");
7const operators_1 = require("rxjs/operators");
8const constants_1 = require("../constants");
9const client_proxy_1 = require("./client-proxy");
10let mqttPackage = {};
11class ClientMqtt extends client_proxy_1.ClientProxy {
12 constructor(options) {
13 super();
14 this.options = options;
15 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
16 this.subscriptionsCount = new Map();
17 this.url = this.getOptionsProp(this.options, 'url') || constants_1.MQTT_DEFAULT_URL;
18 mqttPackage = load_package_util_1.loadPackage('mqtt', ClientMqtt.name, () => require('mqtt'));
19 this.initializeSerializer(options);
20 this.initializeDeserializer(options);
21 }
22 getRequestPattern(pattern) {
23 return pattern;
24 }
25 getResponsePattern(pattern) {
26 return `${pattern}/reply`;
27 }
28 close() {
29 this.mqttClient && this.mqttClient.end();
30 this.mqttClient = null;
31 this.connection = null;
32 }
33 connect() {
34 if (this.mqttClient) {
35 return this.connection;
36 }
37 this.mqttClient = this.createClient();
38 this.handleError(this.mqttClient);
39 const connect$ = this.connect$(this.mqttClient);
40 this.connection = rxjs_1.lastValueFrom(this.mergeCloseEvent(this.mqttClient, connect$).pipe(operators_1.tap(() => this.mqttClient.on(constants_1.MESSAGE_EVENT, this.createResponseCallback())), operators_1.share())).catch(err => {
41 if (err instanceof rxjs_1.EmptyError) {
42 return;
43 }
44 throw err;
45 });
46 return this.connection;
47 }
48 mergeCloseEvent(instance, source$) {
49 const close$ = rxjs_1.fromEvent(instance, constants_1.CLOSE_EVENT).pipe(operators_1.map((err) => {
50 throw err;
51 }));
52 return rxjs_1.merge(source$, close$).pipe(operators_1.first());
53 }
54 createClient() {
55 return mqttPackage.connect(this.url, this.options);
56 }
57 handleError(client) {
58 client.addListener(constants_1.ERROR_EVENT, (err) => err.code !== constants_1.ECONNREFUSED && this.logger.error(err));
59 }
60 createResponseCallback() {
61 return async (channel, buffer) => {
62 const packet = JSON.parse(buffer.toString());
63 const { err, response, isDisposed, id } = await this.deserializer.deserialize(packet);
64 const callback = this.routingMap.get(id);
65 if (!callback) {
66 return undefined;
67 }
68 if (isDisposed || err) {
69 return callback({
70 err,
71 response,
72 isDisposed: true,
73 });
74 }
75 callback({
76 err,
77 response,
78 });
79 };
80 }
81 publish(partialPacket, callback) {
82 try {
83 const packet = this.assignPacketId(partialPacket);
84 const pattern = this.normalizePattern(partialPacket.pattern);
85 const serializedPacket = this.serializer.serialize(packet);
86 const responseChannel = this.getResponsePattern(pattern);
87 let subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
88 const publishPacket = () => {
89 subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
90 this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
91 this.routingMap.set(packet.id, callback);
92 this.mqttClient.publish(this.getRequestPattern(pattern), JSON.stringify(serializedPacket));
93 };
94 if (subscriptionsCount <= 0) {
95 this.mqttClient.subscribe(responseChannel, (err) => !err && publishPacket());
96 }
97 else {
98 publishPacket();
99 }
100 return () => {
101 this.unsubscribeFromChannel(responseChannel);
102 this.routingMap.delete(packet.id);
103 };
104 }
105 catch (err) {
106 callback({ err });
107 }
108 }
109 dispatchEvent(packet) {
110 const pattern = this.normalizePattern(packet.pattern);
111 const serializedPacket = this.serializer.serialize(packet);
112 return new Promise((resolve, reject) => this.mqttClient.publish(pattern, JSON.stringify(serializedPacket), err => err ? reject(err) : resolve()));
113 }
114 unsubscribeFromChannel(channel) {
115 const subscriptionCount = this.subscriptionsCount.get(channel);
116 this.subscriptionsCount.set(channel, subscriptionCount - 1);
117 if (subscriptionCount - 1 <= 0) {
118 this.mqttClient.unsubscribe(channel);
119 }
120 }
121}
122exports.ClientMqtt = ClientMqtt;