UNPKG

6.35 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientMqtt = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const rxjs_1 = require("rxjs");
7const operators_1 = require("rxjs/operators");
8const constants_1 = require("../constants");
9const mqtt_record_serializer_1 = require("../serializers/mqtt-record.serializer");
10const client_proxy_1 = require("./client-proxy");
11let mqttPackage = {};
12class ClientMqtt extends client_proxy_1.ClientProxy {
13 constructor(options) {
14 super();
15 this.options = options;
16 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
17 this.subscriptionsCount = new Map();
18 this.url = this.getOptionsProp(this.options, 'url') || constants_1.MQTT_DEFAULT_URL;
19 mqttPackage = (0, load_package_util_1.loadPackage)('mqtt', ClientMqtt.name, () => require('mqtt'));
20 this.initializeSerializer(options);
21 this.initializeDeserializer(options);
22 }
23 getRequestPattern(pattern) {
24 return pattern;
25 }
26 getResponsePattern(pattern) {
27 return `${pattern}/reply`;
28 }
29 close() {
30 this.mqttClient && this.mqttClient.end();
31 this.mqttClient = null;
32 this.connection = null;
33 }
34 connect() {
35 if (this.mqttClient) {
36 return this.connection;
37 }
38 this.mqttClient = this.createClient();
39 this.handleError(this.mqttClient);
40 const connect$ = this.connect$(this.mqttClient);
41 this.connection = (0, rxjs_1.lastValueFrom)(this.mergeCloseEvent(this.mqttClient, connect$).pipe((0, operators_1.tap)(() => this.mqttClient.on(constants_1.MESSAGE_EVENT, this.createResponseCallback())), (0, operators_1.share)())).catch(err => {
42 if (err instanceof rxjs_1.EmptyError) {
43 return;
44 }
45 throw err;
46 });
47 return this.connection;
48 }
49 mergeCloseEvent(instance, source$) {
50 const close$ = (0, rxjs_1.fromEvent)(instance, constants_1.CLOSE_EVENT).pipe((0, operators_1.map)((err) => {
51 throw err;
52 }));
53 return (0, rxjs_1.merge)(source$, close$).pipe((0, operators_1.first)());
54 }
55 createClient() {
56 return mqttPackage.connect(this.url, this.options);
57 }
58 handleError(client) {
59 client.addListener(constants_1.ERROR_EVENT, (err) => err.code !== constants_1.ECONNREFUSED && this.logger.error(err));
60 }
61 createResponseCallback() {
62 return async (channel, buffer) => {
63 const packet = JSON.parse(buffer.toString());
64 const { err, response, isDisposed, id } = await this.deserializer.deserialize(packet);
65 const callback = this.routingMap.get(id);
66 if (!callback) {
67 return undefined;
68 }
69 if (isDisposed || err) {
70 return callback({
71 err,
72 response,
73 isDisposed: true,
74 });
75 }
76 callback({
77 err,
78 response,
79 });
80 };
81 }
82 publish(partialPacket, callback) {
83 try {
84 const packet = this.assignPacketId(partialPacket);
85 const pattern = this.normalizePattern(partialPacket.pattern);
86 const serializedPacket = this.serializer.serialize(packet);
87 const responseChannel = this.getResponsePattern(pattern);
88 let subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
89 const publishPacket = () => {
90 subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
91 this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
92 this.routingMap.set(packet.id, callback);
93 const options = serializedPacket.options;
94 delete serializedPacket.options;
95 this.mqttClient.publish(this.getRequestPattern(pattern), JSON.stringify(serializedPacket), this.mergePacketOptions(options));
96 };
97 if (subscriptionsCount <= 0) {
98 this.mqttClient.subscribe(responseChannel, (err) => !err && publishPacket());
99 }
100 else {
101 publishPacket();
102 }
103 return () => {
104 this.unsubscribeFromChannel(responseChannel);
105 this.routingMap.delete(packet.id);
106 };
107 }
108 catch (err) {
109 callback({ err });
110 }
111 }
112 dispatchEvent(packet) {
113 const pattern = this.normalizePattern(packet.pattern);
114 const serializedPacket = this.serializer.serialize(packet);
115 const options = serializedPacket.options;
116 delete serializedPacket.options;
117 return new Promise((resolve, reject) => this.mqttClient.publish(pattern, JSON.stringify(serializedPacket), this.mergePacketOptions(options), (err) => (err ? reject(err) : resolve())));
118 }
119 unsubscribeFromChannel(channel) {
120 const subscriptionCount = this.subscriptionsCount.get(channel);
121 this.subscriptionsCount.set(channel, subscriptionCount - 1);
122 if (subscriptionCount - 1 <= 0) {
123 this.mqttClient.unsubscribe(channel);
124 }
125 }
126 initializeSerializer(options) {
127 var _a;
128 this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new mqtt_record_serializer_1.MqttRecordSerializer();
129 }
130 mergePacketOptions(requestOptions) {
131 var _a, _b, _c;
132 if (!requestOptions && !((_a = this.options) === null || _a === void 0 ? void 0 : _a.userProperties)) {
133 return undefined;
134 }
135 return Object.assign(Object.assign({}, requestOptions), { properties: Object.assign(Object.assign({}, requestOptions === null || requestOptions === void 0 ? void 0 : requestOptions.properties), { userProperties: Object.assign(Object.assign({}, (_b = this.options) === null || _b === void 0 ? void 0 : _b.userProperties), (_c = requestOptions === null || requestOptions === void 0 ? void 0 : requestOptions.properties) === null || _c === void 0 ? void 0 : _c.userProperties) }) });
136 }
137}
138exports.ClientMqtt = ClientMqtt;