UNPKG

6.2 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerMqtt = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const mqtt_context_1 = require("../ctx-host/mqtt.context");
7const enums_1 = require("../enums");
8const mqtt_record_serializer_1 = require("../serializers/mqtt-record.serializer");
9const server_1 = require("./server");
10let mqttPackage = {};
11class ServerMqtt extends server_1.Server {
12 constructor(options) {
13 super();
14 this.options = options;
15 this.transportId = enums_1.Transport.MQTT;
16 this.url = this.getOptionsProp(options, 'url') || constants_1.MQTT_DEFAULT_URL;
17 mqttPackage = this.loadPackage('mqtt', ServerMqtt.name, () => require('mqtt'));
18 this.initializeSerializer(options);
19 this.initializeDeserializer(options);
20 }
21 async listen(callback) {
22 try {
23 this.mqttClient = this.createMqttClient();
24 this.start(callback);
25 }
26 catch (err) {
27 callback(err);
28 }
29 }
30 start(callback) {
31 this.handleError(this.mqttClient);
32 this.bindEvents(this.mqttClient);
33 this.mqttClient.on(constants_1.CONNECT_EVENT, () => callback());
34 }
35 bindEvents(mqttClient) {
36 mqttClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(mqttClient).bind(this));
37 const registeredPatterns = [...this.messageHandlers.keys()];
38 registeredPatterns.forEach(pattern => {
39 const { isEventHandler } = this.messageHandlers.get(pattern);
40 mqttClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern), this.getOptionsProp(this.options, 'subscribeOptions'));
41 });
42 }
43 close() {
44 this.mqttClient && this.mqttClient.end();
45 }
46 createMqttClient() {
47 return mqttPackage.connect(this.url, this.options);
48 }
49 getMessageHandler(pub) {
50 return async (channel, buffer, originalPacket) => this.handleMessage(channel, buffer, pub, originalPacket);
51 }
52 async handleMessage(channel, buffer, pub, originalPacket) {
53 const rawPacket = this.parseMessage(buffer.toString());
54 const packet = await this.deserializer.deserialize(rawPacket, { channel });
55 const mqttContext = new mqtt_context_1.MqttContext([channel, originalPacket]);
56 if ((0, shared_utils_1.isUndefined)(packet.id)) {
57 return this.handleEvent(channel, packet, mqttContext);
58 }
59 const publish = this.getPublisher(pub, channel, packet.id);
60 const handler = this.getHandlerByPattern(channel);
61 if (!handler) {
62 const status = 'error';
63 const noHandlerPacket = {
64 id: packet.id,
65 status,
66 err: constants_1.NO_MESSAGE_HANDLER,
67 };
68 return publish(noHandlerPacket);
69 }
70 const response$ = this.transformToObservable(await handler(packet.data, mqttContext));
71 response$ && this.send(response$, publish);
72 }
73 getPublisher(client, pattern, id) {
74 return (response) => {
75 Object.assign(response, { id });
76 const outgoingResponse = this.serializer.serialize(response);
77 const options = outgoingResponse.options;
78 delete outgoingResponse.options;
79 return client.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse), options);
80 };
81 }
82 parseMessage(content) {
83 try {
84 return JSON.parse(content);
85 }
86 catch (e) {
87 return content;
88 }
89 }
90 matchMqttPattern(pattern, topic) {
91 const patternSegments = pattern.split(constants_1.MQTT_SEPARATOR);
92 const topicSegments = topic.split(constants_1.MQTT_SEPARATOR);
93 const patternSegmentsLength = patternSegments.length;
94 const topicSegmentsLength = topicSegments.length;
95 const lastIndex = patternSegmentsLength - 1;
96 for (let i = 0; i < patternSegmentsLength; i++) {
97 const currentPattern = patternSegments[i];
98 const patternChar = currentPattern[0];
99 const currentTopic = topicSegments[i];
100 if (!currentTopic && !currentPattern) {
101 continue;
102 }
103 if (!currentTopic && currentPattern !== constants_1.MQTT_WILDCARD_ALL) {
104 return false;
105 }
106 if (patternChar === constants_1.MQTT_WILDCARD_ALL) {
107 return i === lastIndex;
108 }
109 if (patternChar !== constants_1.MQTT_WILDCARD_SINGLE &&
110 currentPattern !== currentTopic) {
111 return false;
112 }
113 }
114 return patternSegmentsLength === topicSegmentsLength;
115 }
116 getHandlerByPattern(pattern) {
117 const route = this.getRouteFromPattern(pattern);
118 if (this.messageHandlers.has(route)) {
119 return this.messageHandlers.get(route) || null;
120 }
121 for (const [key, value] of this.messageHandlers) {
122 if (!key.includes(constants_1.MQTT_WILDCARD_SINGLE) &&
123 !key.includes(constants_1.MQTT_WILDCARD_ALL)) {
124 continue;
125 }
126 const keyWithoutSharedPrefix = this.removeHandlerKeySharedPrefix(key);
127 if (this.matchMqttPattern(keyWithoutSharedPrefix, route)) {
128 return value;
129 }
130 }
131 return null;
132 }
133 removeHandlerKeySharedPrefix(handlerKey) {
134 return handlerKey && handlerKey.startsWith('$share')
135 ? handlerKey.split('/').slice(2).join('/')
136 : handlerKey;
137 }
138 getRequestPattern(pattern) {
139 return pattern;
140 }
141 getReplyPattern(pattern) {
142 return `${pattern}/reply`;
143 }
144 handleError(stream) {
145 stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
146 }
147 initializeSerializer(options) {
148 var _a;
149 this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new mqtt_record_serializer_1.MqttRecordSerializer();
150 }
151}
152exports.ServerMqtt = ServerMqtt;