1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerMqtt = void 0;
|
4 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
5 | const constants_1 = require("../constants");
|
6 | const mqtt_context_1 = require("../ctx-host/mqtt.context");
|
7 | const enums_1 = require("../enums");
|
8 | const mqtt_record_serializer_1 = require("../serializers/mqtt-record.serializer");
|
9 | const server_1 = require("./server");
|
10 | let mqttPackage = {};
|
11 | class ServerMqtt extends server_1.Server {
|
12 | constructor(options) {
|
13 | super();
|
14 | this.options = options;
|
15 | this.transportId = enums_1.Transport.MQTT;
|
16 | this.url = this.getOptionsProp(options, 'url') || constants_1.MQTT_DEFAULT_URL;
|
17 | mqttPackage = this.loadPackage('mqtt', ServerMqtt.name, () => require('mqtt'));
|
18 | this.initializeSerializer(options);
|
19 | this.initializeDeserializer(options);
|
20 | }
|
21 | async listen(callback) {
|
22 | try {
|
23 | this.mqttClient = this.createMqttClient();
|
24 | this.start(callback);
|
25 | }
|
26 | catch (err) {
|
27 | callback(err);
|
28 | }
|
29 | }
|
30 | start(callback) {
|
31 | this.handleError(this.mqttClient);
|
32 | this.bindEvents(this.mqttClient);
|
33 | this.mqttClient.on(constants_1.CONNECT_EVENT, () => callback());
|
34 | }
|
35 | bindEvents(mqttClient) {
|
36 | mqttClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(mqttClient).bind(this));
|
37 | const registeredPatterns = [...this.messageHandlers.keys()];
|
38 | registeredPatterns.forEach(pattern => {
|
39 | const { isEventHandler } = this.messageHandlers.get(pattern);
|
40 | mqttClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern), this.getOptionsProp(this.options, 'subscribeOptions'));
|
41 | });
|
42 | }
|
43 | close() {
|
44 | this.mqttClient && this.mqttClient.end();
|
45 | }
|
46 | createMqttClient() {
|
47 | return mqttPackage.connect(this.url, this.options);
|
48 | }
|
49 | getMessageHandler(pub) {
|
50 | return async (channel, buffer, originalPacket) => this.handleMessage(channel, buffer, pub, originalPacket);
|
51 | }
|
52 | async handleMessage(channel, buffer, pub, originalPacket) {
|
53 | const rawPacket = this.parseMessage(buffer.toString());
|
54 | const packet = await this.deserializer.deserialize(rawPacket, { channel });
|
55 | const mqttContext = new mqtt_context_1.MqttContext([channel, originalPacket]);
|
56 | if ((0, shared_utils_1.isUndefined)(packet.id)) {
|
57 | return this.handleEvent(channel, packet, mqttContext);
|
58 | }
|
59 | const publish = this.getPublisher(pub, channel, packet.id);
|
60 | const handler = this.getHandlerByPattern(channel);
|
61 | if (!handler) {
|
62 | const status = 'error';
|
63 | const noHandlerPacket = {
|
64 | id: packet.id,
|
65 | status,
|
66 | err: constants_1.NO_MESSAGE_HANDLER,
|
67 | };
|
68 | return publish(noHandlerPacket);
|
69 | }
|
70 | const response$ = this.transformToObservable(await handler(packet.data, mqttContext));
|
71 | response$ && this.send(response$, publish);
|
72 | }
|
73 | getPublisher(client, pattern, id) {
|
74 | return (response) => {
|
75 | Object.assign(response, { id });
|
76 | const outgoingResponse = this.serializer.serialize(response);
|
77 | const options = outgoingResponse.options;
|
78 | delete outgoingResponse.options;
|
79 | return client.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse), options);
|
80 | };
|
81 | }
|
82 | parseMessage(content) {
|
83 | try {
|
84 | return JSON.parse(content);
|
85 | }
|
86 | catch (e) {
|
87 | return content;
|
88 | }
|
89 | }
|
90 | matchMqttPattern(pattern, topic) {
|
91 | const patternSegments = pattern.split(constants_1.MQTT_SEPARATOR);
|
92 | const topicSegments = topic.split(constants_1.MQTT_SEPARATOR);
|
93 | const patternSegmentsLength = patternSegments.length;
|
94 | const topicSegmentsLength = topicSegments.length;
|
95 | const lastIndex = patternSegmentsLength - 1;
|
96 | for (let i = 0; i < patternSegmentsLength; i++) {
|
97 | const currentPattern = patternSegments[i];
|
98 | const patternChar = currentPattern[0];
|
99 | const currentTopic = topicSegments[i];
|
100 | if (!currentTopic && !currentPattern) {
|
101 | continue;
|
102 | }
|
103 | if (!currentTopic && currentPattern !== constants_1.MQTT_WILDCARD_ALL) {
|
104 | return false;
|
105 | }
|
106 | if (patternChar === constants_1.MQTT_WILDCARD_ALL) {
|
107 | return i === lastIndex;
|
108 | }
|
109 | if (patternChar !== constants_1.MQTT_WILDCARD_SINGLE &&
|
110 | currentPattern !== currentTopic) {
|
111 | return false;
|
112 | }
|
113 | }
|
114 | return patternSegmentsLength === topicSegmentsLength;
|
115 | }
|
116 | getHandlerByPattern(pattern) {
|
117 | const route = this.getRouteFromPattern(pattern);
|
118 | if (this.messageHandlers.has(route)) {
|
119 | return this.messageHandlers.get(route) || null;
|
120 | }
|
121 | for (const [key, value] of this.messageHandlers) {
|
122 | const keyWithoutSharedPrefix = this.removeHandlerKeySharedPrefix(key);
|
123 | if (this.matchMqttPattern(keyWithoutSharedPrefix, route)) {
|
124 | return value;
|
125 | }
|
126 | }
|
127 | return null;
|
128 | }
|
129 | removeHandlerKeySharedPrefix(handlerKey) {
|
130 | return handlerKey && handlerKey.startsWith('$share')
|
131 | ? handlerKey.split('/').slice(2).join('/')
|
132 | : handlerKey;
|
133 | }
|
134 | getRequestPattern(pattern) {
|
135 | return pattern;
|
136 | }
|
137 | getReplyPattern(pattern) {
|
138 | return `${pattern}/reply`;
|
139 | }
|
140 | handleError(stream) {
|
141 | stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
|
142 | }
|
143 | initializeSerializer(options) {
|
144 | this.serializer = options?.serializer ?? new mqtt_record_serializer_1.MqttRecordSerializer();
|
145 | }
|
146 | }
|
147 | exports.ServerMqtt = ServerMqtt;
|