1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #include <assert.h>
|
31 | #include <stdio.h>
|
32 | #include <string.h>
|
33 |
|
34 | #include "mosquitto.h"
|
35 | #include "mosquitto_internal.h"
|
36 | #include "logging_mosq.h"
|
37 | #include "mqtt3_protocol.h"
|
38 | #include "memory_mosq.h"
|
39 | #include "net_mosq.h"
|
40 | #include "send_mosq.h"
|
41 | #include "time_mosq.h"
|
42 | #include "util_mosq.h"
|
43 |
|
44 | #ifdef WITH_BROKER
|
45 | #include "mosquitto_broker.h"
|
46 | # ifdef WITH_SYS_TREE
|
47 | extern uint64_t g_pub_bytes_sent;
|
48 | # endif
|
49 | #endif
|
50 |
|
51 | int _mosquitto_send_pingreq(struct mosquitto *mosq)
|
52 | {
|
53 | int rc;
|
54 | assert(mosq);
|
55 | #ifdef WITH_BROKER
|
56 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGREQ to %s", mosq->id);
|
57 | #else
|
58 | _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGREQ", mosq->id);
|
59 | #endif
|
60 | rc = _mosquitto_send_simple_command(mosq, PINGREQ);
|
61 | if(rc == MOSQ_ERR_SUCCESS){
|
62 | mosq->ping_t = mosquitto_time();
|
63 | }
|
64 | return rc;
|
65 | }
|
66 |
|
67 | int _mosquitto_send_pingresp(struct mosquitto *mosq)
|
68 | {
|
69 | #ifdef WITH_BROKER
|
70 | if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGRESP to %s", mosq->id);
|
71 | #else
|
72 | if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGRESP", mosq->id);
|
73 | #endif
|
74 | return _mosquitto_send_simple_command(mosq, PINGRESP);
|
75 | }
|
76 |
|
77 | int _mosquitto_send_puback(struct mosquitto *mosq, uint16_t mid)
|
78 | {
|
79 | #ifdef WITH_BROKER
|
80 | if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (Mid: %d)", mosq->id, mid);
|
81 | #else
|
82 | if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d)", mosq->id, mid);
|
83 | #endif
|
84 | return _mosquitto_send_command_with_mid(mosq, PUBACK, mid, false);
|
85 | }
|
86 |
|
87 | int _mosquitto_send_pubcomp(struct mosquitto *mosq, uint16_t mid)
|
88 | {
|
89 | #ifdef WITH_BROKER
|
90 | if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBCOMP to %s (Mid: %d)", mosq->id, mid);
|
91 | #else
|
92 | if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (Mid: %d)", mosq->id, mid);
|
93 | #endif
|
94 | return _mosquitto_send_command_with_mid(mosq, PUBCOMP, mid, false);
|
95 | }
|
96 |
|
97 | int _mosquitto_send_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
|
98 | {
|
99 | #ifdef WITH_BROKER
|
100 | size_t len;
|
101 | #ifdef WITH_BRIDGE
|
102 | int i;
|
103 | struct _mqtt3_bridge_topic *cur_topic;
|
104 | bool match;
|
105 | int rc;
|
106 | char *mapped_topic = NULL;
|
107 | char *topic_temp = NULL;
|
108 | #endif
|
109 | #endif
|
110 | assert(mosq);
|
111 | assert(topic);
|
112 |
|
113 | if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
|
114 | #ifdef WITH_BROKER
|
115 | if(mosq->listener && mosq->listener->mount_point){
|
116 | len = strlen(mosq->listener->mount_point);
|
117 | if(len < strlen(topic)){
|
118 | topic += len;
|
119 | }else{
|
120 |
|
121 | return MOSQ_ERR_SUCCESS;
|
122 | }
|
123 | }
|
124 | #ifdef WITH_BRIDGE
|
125 | if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
|
126 | for(i=0; i<mosq->bridge->topic_count; i++){
|
127 | cur_topic = &mosq->bridge->topics[i];
|
128 | if((cur_topic->direction == bd_both || cur_topic->direction == bd_out)
|
129 | && (cur_topic->remote_prefix || cur_topic->local_prefix)){
|
130 |
|
131 |
|
132 | rc = mosquitto_topic_matches_sub(cur_topic->local_topic, topic, &match);
|
133 | if(rc){
|
134 | return rc;
|
135 | }
|
136 | if(match){
|
137 | mapped_topic = _mosquitto_strdup(topic);
|
138 | if(!mapped_topic) return MOSQ_ERR_NOMEM;
|
139 | if(cur_topic->local_prefix){
|
140 |
|
141 | if(!strncmp(cur_topic->local_prefix, mapped_topic, strlen(cur_topic->local_prefix))){
|
142 | topic_temp = _mosquitto_strdup(mapped_topic+strlen(cur_topic->local_prefix));
|
143 | _mosquitto_free(mapped_topic);
|
144 | if(!topic_temp){
|
145 | return MOSQ_ERR_NOMEM;
|
146 | }
|
147 | mapped_topic = topic_temp;
|
148 | }
|
149 | }
|
150 |
|
151 | if(cur_topic->remote_prefix){
|
152 |
|
153 | len = strlen(mapped_topic) + strlen(cur_topic->remote_prefix)+1;
|
154 | topic_temp = _mosquitto_calloc(len+1, sizeof(char));
|
155 | if(!topic_temp){
|
156 | _mosquitto_free(mapped_topic);
|
157 | return MOSQ_ERR_NOMEM;
|
158 | }
|
159 | snprintf(topic_temp, len, "%s%s", cur_topic->remote_prefix, mapped_topic);
|
160 | _mosquitto_free(mapped_topic);
|
161 | mapped_topic = topic_temp;
|
162 | }
|
163 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
|
164 | #ifdef WITH_SYS_TREE
|
165 | g_pub_bytes_sent += payloadlen;
|
166 | #endif
|
167 | rc = _mosquitto_send_real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup);
|
168 | _mosquitto_free(mapped_topic);
|
169 | return rc;
|
170 | }
|
171 | }
|
172 | }
|
173 | }
|
174 | #endif
|
175 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
|
176 | # ifdef WITH_SYS_TREE
|
177 | g_pub_bytes_sent += payloadlen;
|
178 | # endif
|
179 | #else
|
180 | _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
|
181 | #endif
|
182 |
|
183 | return _mosquitto_send_real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup);
|
184 | }
|
185 |
|
186 | int _mosquitto_send_pubrec(struct mosquitto *mosq, uint16_t mid)
|
187 | {
|
188 | #ifdef WITH_BROKER
|
189 | if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (Mid: %d)", mosq->id, mid);
|
190 | #else
|
191 | if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d)", mosq->id, mid);
|
192 | #endif
|
193 | return _mosquitto_send_command_with_mid(mosq, PUBREC, mid, false);
|
194 | }
|
195 |
|
196 | int _mosquitto_send_pubrel(struct mosquitto *mosq, uint16_t mid, bool dup)
|
197 | {
|
198 | #ifdef WITH_BROKER
|
199 | if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREL to %s (Mid: %d)", mosq->id, mid);
|
200 | #else
|
201 | if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREL (Mid: %d)", mosq->id, mid);
|
202 | #endif
|
203 | return _mosquitto_send_command_with_mid(mosq, PUBREL|2, mid, dup);
|
204 | }
|
205 |
|
206 |
|
207 | int _mosquitto_send_command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup)
|
208 | {
|
209 | struct _mosquitto_packet *packet = NULL;
|
210 | int rc;
|
211 |
|
212 | assert(mosq);
|
213 | packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
|
214 | if(!packet) return MOSQ_ERR_NOMEM;
|
215 |
|
216 | packet->command = command;
|
217 | if(dup){
|
218 | packet->command |= 8;
|
219 | }
|
220 | packet->remaining_length = 2;
|
221 | rc = _mosquitto_packet_alloc(packet);
|
222 | if(rc){
|
223 | _mosquitto_free(packet);
|
224 | return rc;
|
225 | }
|
226 |
|
227 | packet->payload[packet->pos+0] = MOSQ_MSB(mid);
|
228 | packet->payload[packet->pos+1] = MOSQ_LSB(mid);
|
229 |
|
230 | return _mosquitto_packet_queue(mosq, packet);
|
231 | }
|
232 |
|
233 |
|
234 | int _mosquitto_send_simple_command(struct mosquitto *mosq, uint8_t command)
|
235 | {
|
236 | struct _mosquitto_packet *packet = NULL;
|
237 | int rc;
|
238 |
|
239 | assert(mosq);
|
240 | packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
|
241 | if(!packet) return MOSQ_ERR_NOMEM;
|
242 |
|
243 | packet->command = command;
|
244 | packet->remaining_length = 0;
|
245 |
|
246 | rc = _mosquitto_packet_alloc(packet);
|
247 | if(rc){
|
248 | _mosquitto_free(packet);
|
249 | return rc;
|
250 | }
|
251 |
|
252 | return _mosquitto_packet_queue(mosq, packet);
|
253 | }
|
254 |
|
255 | int _mosquitto_send_real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
|
256 | {
|
257 | struct _mosquitto_packet *packet = NULL;
|
258 | int packetlen;
|
259 | int rc;
|
260 |
|
261 | assert(mosq);
|
262 | assert(topic);
|
263 |
|
264 | packetlen = 2+strlen(topic) + payloadlen;
|
265 | if(qos > 0) packetlen += 2;
|
266 | packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
|
267 | if(!packet) return MOSQ_ERR_NOMEM;
|
268 |
|
269 | packet->mid = mid;
|
270 | packet->command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain;
|
271 | packet->remaining_length = packetlen;
|
272 | rc = _mosquitto_packet_alloc(packet);
|
273 | if(rc){
|
274 | _mosquitto_free(packet);
|
275 | return rc;
|
276 | }
|
277 |
|
278 | _mosquitto_write_string(packet, topic, strlen(topic));
|
279 | if(qos > 0){
|
280 | _mosquitto_write_uint16(packet, mid);
|
281 | }
|
282 |
|
283 |
|
284 | if(payloadlen){
|
285 | _mosquitto_write_bytes(packet, payload, payloadlen);
|
286 | }
|
287 |
|
288 | return _mosquitto_packet_queue(mosq, packet);
|
289 | }
|