1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #include <stdio.h>
|
31 | #include <string.h>
|
32 |
|
33 | #include <config.h>
|
34 |
|
35 | #include <mosquitto_broker.h>
|
36 | #include <mqtt3_protocol.h>
|
37 | #include <memory_mosq.h>
|
38 | #include <send_mosq.h>
|
39 | #include <time_mosq.h>
|
40 | #include <tls_mosq.h>
|
41 | #include <util_mosq.h>
|
42 |
|
43 | #ifdef WITH_SYS_TREE
|
44 | extern unsigned int g_connection_count;
|
45 | #endif
|
46 |
|
47 | int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
|
48 | {
|
49 | char *protocol_name = NULL;
|
50 | uint8_t protocol_version;
|
51 | uint8_t connect_flags;
|
52 | char *client_id = NULL;
|
53 | char *will_payload = NULL, *will_topic = NULL;
|
54 | uint16_t will_payloadlen;
|
55 | struct mosquitto_message *will_struct = NULL;
|
56 | uint8_t will, will_retain, will_qos, clean_session;
|
57 | uint8_t username_flag, password_flag;
|
58 | char *username = NULL, *password = NULL;
|
59 | int i;
|
60 | int rc;
|
61 | struct _mosquitto_acl_user *acl_tail;
|
62 | int slen;
|
63 | #ifdef WITH_TLS
|
64 | X509 *client_cert;
|
65 | X509_NAME *name;
|
66 | X509_NAME_ENTRY *name_entry;
|
67 | #endif
|
68 | struct _clientid_index_hash *find_cih;
|
69 | struct _clientid_index_hash *new_cih;
|
70 |
|
71 | #ifdef WITH_SYS_TREE
|
72 | g_connection_count++;
|
73 | #endif
|
74 |
|
75 |
|
76 | if(context->state != mosq_cs_new){
|
77 | mqtt3_context_disconnect(db, context);
|
78 | return MOSQ_ERR_PROTOCOL;
|
79 | }
|
80 |
|
81 | if(_mosquitto_read_string(&context->in_packet, &protocol_name)){
|
82 | mqtt3_context_disconnect(db, context);
|
83 | return 1;
|
84 | }
|
85 | if(!protocol_name){
|
86 | mqtt3_context_disconnect(db, context);
|
87 | return 3;
|
88 | }
|
89 | if(_mosquitto_read_byte(&context->in_packet, &protocol_version)){
|
90 | mqtt3_context_disconnect(db, context);
|
91 | return 1;
|
92 | }
|
93 | if(!strcmp(protocol_name, PROTOCOL_NAME_v31)){
|
94 | if((protocol_version&0x7F) != PROTOCOL_VERSION_v31){
|
95 | if(db->config->connection_messages == true){
|
96 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid protocol version %d in CONNECT from %s.",
|
97 | protocol_version, context->address);
|
98 | }
|
99 | _mosquitto_send_connack(context, CONNACK_REFUSED_PROTOCOL_VERSION);
|
100 | mqtt3_context_disconnect(db, context);
|
101 | _mosquitto_free(protocol_name);
|
102 | return MOSQ_ERR_PROTOCOL;
|
103 | }
|
104 | context->protocol = mosq_p_mqtt31;
|
105 | }else if(!strcmp(protocol_name, PROTOCOL_NAME_v311)){
|
106 | if((protocol_version&0x7F) != PROTOCOL_VERSION_v311){
|
107 | if(db->config->connection_messages == true){
|
108 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid protocol version %d in CONNECT from %s.",
|
109 | protocol_version, context->address);
|
110 | }
|
111 | _mosquitto_send_connack(context, CONNACK_REFUSED_PROTOCOL_VERSION);
|
112 | mqtt3_context_disconnect(db, context);
|
113 | _mosquitto_free(protocol_name);
|
114 | return MOSQ_ERR_PROTOCOL;
|
115 | }
|
116 | if((context->in_packet.command&0x0F) != 0x00){
|
117 |
|
118 | mqtt3_context_disconnect(db, context);
|
119 | _mosquitto_free(protocol_name);
|
120 | return MOSQ_ERR_PROTOCOL;
|
121 | }
|
122 | context->protocol = mosq_p_mqtt311;
|
123 | }else{
|
124 | if(db->config->connection_messages == true){
|
125 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid protocol \"%s\" in CONNECT from %s.",
|
126 | protocol_name, context->address);
|
127 | }
|
128 | _mosquitto_free(protocol_name);
|
129 | mqtt3_context_disconnect(db, context);
|
130 | return MOSQ_ERR_PROTOCOL;
|
131 | }
|
132 | _mosquitto_free(protocol_name);
|
133 |
|
134 | if(_mosquitto_read_byte(&context->in_packet, &connect_flags)){
|
135 | mqtt3_context_disconnect(db, context);
|
136 | return 1;
|
137 | }
|
138 | clean_session = connect_flags & 0x02;
|
139 | will = connect_flags & 0x04;
|
140 | will_qos = (connect_flags & 0x18) >> 3;
|
141 | if(will_qos == 3){
|
142 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid Will QoS in CONNECT from %s.",
|
143 | context->address);
|
144 | mqtt3_context_disconnect(db, context);
|
145 | return MOSQ_ERR_PROTOCOL;
|
146 | }
|
147 | will_retain = connect_flags & 0x20;
|
148 | password_flag = connect_flags & 0x40;
|
149 | username_flag = connect_flags & 0x80;
|
150 |
|
151 | if(_mosquitto_read_uint16(&context->in_packet, &(context->keepalive))){
|
152 | mqtt3_context_disconnect(db, context);
|
153 | return 1;
|
154 | }
|
155 |
|
156 | if(_mosquitto_read_string(&context->in_packet, &client_id)){
|
157 | mqtt3_context_disconnect(db, context);
|
158 | return 1;
|
159 | }
|
160 |
|
161 | slen = strlen(client_id);
|
162 | #ifdef WITH_STRICT_PROTOCOL
|
163 | if(slen > 23 || slen == 0){
|
164 | #else
|
165 | if(slen == 0){
|
166 | #endif
|
167 | if(context->protocol == mosq_p_mqtt31){
|
168 | _mosquitto_free(client_id);
|
169 | _mosquitto_send_connack(context, CONNACK_REFUSED_IDENTIFIER_REJECTED);
|
170 | mqtt3_context_disconnect(db, context);
|
171 | return MOSQ_ERR_PROTOCOL;
|
172 | }else{
|
173 | _mosquitto_free(client_id);
|
174 |
|
175 | if(clean_session == 0){
|
176 | _mosquitto_send_connack(context, CONNACK_REFUSED_IDENTIFIER_REJECTED);
|
177 | mqtt3_context_disconnect(db, context);
|
178 | return MOSQ_ERR_PROTOCOL;
|
179 | }
|
180 | if(db->config->allow_zero_length_clientid == true){
|
181 | client_id = (char *)_mosquitto_calloc(65 + db->config->auto_id_prefix_len, sizeof(char));
|
182 | if(!client_id){
|
183 | mqtt3_context_disconnect(db, context);
|
184 | return MOSQ_ERR_NOMEM;
|
185 | }
|
186 | memcpy(client_id, db->config->auto_id_prefix, db->config->auto_id_prefix_len);
|
187 | for(i=0; i<64; i++){
|
188 | client_id[i+db->config->auto_id_prefix_len] = (rand()%73)+48;
|
189 | }
|
190 | client_id[i] = '\0';
|
191 | }else{
|
192 | _mosquitto_send_connack(context, CONNACK_REFUSED_IDENTIFIER_REJECTED);
|
193 | mqtt3_context_disconnect(db, context);
|
194 | return MOSQ_ERR_PROTOCOL;
|
195 | }
|
196 | }
|
197 | }
|
198 |
|
199 |
|
200 | if(db->config->clientid_prefixes){
|
201 | if(strncmp(db->config->clientid_prefixes, client_id, strlen(db->config->clientid_prefixes))){
|
202 | _mosquitto_free(client_id);
|
203 | _mosquitto_send_connack(context, CONNACK_REFUSED_NOT_AUTHORIZED);
|
204 | mqtt3_context_disconnect(db, context);
|
205 | return MOSQ_ERR_SUCCESS;
|
206 | }
|
207 | }
|
208 |
|
209 | if(will){
|
210 | will_struct = _mosquitto_calloc(1, sizeof(struct mosquitto_message));
|
211 | if(!will_struct){
|
212 | mqtt3_context_disconnect(db, context);
|
213 | rc = MOSQ_ERR_NOMEM;
|
214 | goto handle_connect_error;
|
215 | }
|
216 | if(_mosquitto_read_string(&context->in_packet, &will_topic)){
|
217 | mqtt3_context_disconnect(db, context);
|
218 | rc = 1;
|
219 | goto handle_connect_error;
|
220 | }
|
221 | if(strlen(will_topic) == 0){
|
222 | mqtt3_context_disconnect(db, context);
|
223 | rc = 1;
|
224 | goto handle_connect_error;
|
225 | }
|
226 | if(_mosquitto_topic_wildcard_pos_check(will_topic)){
|
227 | mqtt3_context_disconnect(db, context);
|
228 | rc = 1;
|
229 | goto handle_connect_error;
|
230 | }
|
231 |
|
232 | if(_mosquitto_read_uint16(&context->in_packet, &will_payloadlen)){
|
233 | mqtt3_context_disconnect(db, context);
|
234 | rc = 1;
|
235 | goto handle_connect_error;
|
236 | }
|
237 | if(will_payloadlen > 0){
|
238 | will_payload = _mosquitto_malloc(will_payloadlen);
|
239 | if(!will_payload){
|
240 | mqtt3_context_disconnect(db, context);
|
241 | rc = 1;
|
242 | goto handle_connect_error;
|
243 | }
|
244 |
|
245 | rc = _mosquitto_read_bytes(&context->in_packet, will_payload, will_payloadlen);
|
246 | if(rc){
|
247 | mqtt3_context_disconnect(db, context);
|
248 | rc = 1;
|
249 | goto handle_connect_error;
|
250 | }
|
251 | }
|
252 | }else{
|
253 | if(context->protocol == mosq_p_mqtt311){
|
254 | if(will_qos != 0 || will_retain != 0){
|
255 | mqtt3_context_disconnect(db, context);
|
256 | rc = MOSQ_ERR_PROTOCOL;
|
257 | goto handle_connect_error;
|
258 | }
|
259 | }
|
260 | }
|
261 |
|
262 | if(username_flag){
|
263 | rc = _mosquitto_read_string(&context->in_packet, &username);
|
264 | if(rc == MOSQ_ERR_SUCCESS){
|
265 | if(password_flag){
|
266 | rc = _mosquitto_read_string(&context->in_packet, &password);
|
267 | if(rc == MOSQ_ERR_NOMEM){
|
268 | rc = MOSQ_ERR_NOMEM;
|
269 | goto handle_connect_error;
|
270 | }else if(rc == MOSQ_ERR_PROTOCOL){
|
271 | if(context->protocol == mosq_p_mqtt31){
|
272 |
|
273 | password_flag = 0;
|
274 | }else if(context->protocol == mosq_p_mqtt311){
|
275 | mqtt3_context_disconnect(db, context);
|
276 | rc = MOSQ_ERR_PROTOCOL;
|
277 | goto handle_connect_error;
|
278 | }
|
279 | }
|
280 | }
|
281 | }else if(rc == MOSQ_ERR_NOMEM){
|
282 | rc = MOSQ_ERR_NOMEM;
|
283 | goto handle_connect_error;
|
284 | }else{
|
285 | if(context->protocol == mosq_p_mqtt31){
|
286 |
|
287 | username_flag = 0;
|
288 | }else if(context->protocol == mosq_p_mqtt311){
|
289 | mqtt3_context_disconnect(db, context);
|
290 | rc = MOSQ_ERR_PROTOCOL;
|
291 | goto handle_connect_error;
|
292 | }
|
293 | }
|
294 | }else{
|
295 | if(context->protocol == mosq_p_mqtt311){
|
296 | if(password_flag){
|
297 |
|
298 | mqtt3_context_disconnect(db, context);
|
299 | rc = MOSQ_ERR_PROTOCOL;
|
300 | goto handle_connect_error;
|
301 | }
|
302 | }
|
303 | }
|
304 |
|
305 | #ifdef WITH_TLS
|
306 | if(context->listener->use_identity_as_username){
|
307 | if(!context->ssl){
|
308 | _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
|
309 | mqtt3_context_disconnect(db, context);
|
310 | rc = MOSQ_ERR_SUCCESS;
|
311 | goto handle_connect_error;
|
312 | }
|
313 | #ifdef REAL_WITH_TLS_PSK
|
314 | if(context->listener->psk_hint){
|
315 |
|
316 | if(!context->username){
|
317 | _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
|
318 | mqtt3_context_disconnect(db, context);
|
319 | rc = MOSQ_ERR_SUCCESS;
|
320 | goto handle_connect_error;
|
321 | }
|
322 | }else{
|
323 | #endif
|
324 | client_cert = SSL_get_peer_certificate(context->ssl);
|
325 | if(!client_cert){
|
326 | _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
|
327 | mqtt3_context_disconnect(db, context);
|
328 | rc = MOSQ_ERR_SUCCESS;
|
329 | goto handle_connect_error;
|
330 | }
|
331 | name = X509_get_subject_name(client_cert);
|
332 | if(!name){
|
333 | _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
|
334 | mqtt3_context_disconnect(db, context);
|
335 | rc = MOSQ_ERR_SUCCESS;
|
336 | goto handle_connect_error;
|
337 | }
|
338 |
|
339 | i = X509_NAME_get_index_by_NID(name, NID_commonName, -1);
|
340 | if(i == -1){
|
341 | _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
|
342 | mqtt3_context_disconnect(db, context);
|
343 | rc = MOSQ_ERR_SUCCESS;
|
344 | goto handle_connect_error;
|
345 | }
|
346 | name_entry = X509_NAME_get_entry(name, i);
|
347 | context->username = _mosquitto_strdup((char *)ASN1_STRING_data(name_entry->value));
|
348 | if(!context->username){
|
349 | rc = MOSQ_ERR_SUCCESS;
|
350 | goto handle_connect_error;
|
351 | }
|
352 | #ifdef REAL_WITH_TLS_PSK
|
353 | }
|
354 | #endif
|
355 | }else{
|
356 | #endif
|
357 | if(username_flag){
|
358 | rc = mosquitto_unpwd_check(db, username, password);
|
359 | if(rc == MOSQ_ERR_AUTH){
|
360 | _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
|
361 | mqtt3_context_disconnect(db, context);
|
362 | rc = MOSQ_ERR_SUCCESS;
|
363 | goto handle_connect_error;
|
364 | }else if(rc == MOSQ_ERR_INVAL){
|
365 | goto handle_connect_error;
|
366 | }
|
367 | context->username = username;
|
368 | context->password = password;
|
369 | username = NULL;
|
370 | password = NULL;
|
371 | }
|
372 |
|
373 | if(!username_flag && db->config->allow_anonymous == false){
|
374 | _mosquitto_send_connack(context, CONNACK_REFUSED_NOT_AUTHORIZED);
|
375 | mqtt3_context_disconnect(db, context);
|
376 | rc = MOSQ_ERR_SUCCESS;
|
377 | goto handle_connect_error;
|
378 | }
|
379 | #ifdef WITH_TLS
|
380 | }
|
381 | #endif
|
382 |
|
383 |
|
384 | HASH_FIND_STR(db->clientid_index_hash, client_id, find_cih);
|
385 | if(find_cih){
|
386 | i = find_cih->db_context_index;
|
387 |
|
388 | if(db->contexts[i]->sock == -1){
|
389 |
|
390 |
|
391 | }else{
|
392 |
|
393 | if(db->config->connection_messages == true){
|
394 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Client %s already connected, closing old connection.", client_id);
|
395 | }
|
396 | }
|
397 | db->contexts[i]->clean_session = clean_session;
|
398 | mqtt3_context_cleanup(db, db->contexts[i], false);
|
399 | db->contexts[i]->state = mosq_cs_connected;
|
400 | db->contexts[i]->address = _mosquitto_strdup(context->address);
|
401 | db->contexts[i]->sock = context->sock;
|
402 | db->contexts[i]->listener = context->listener;
|
403 | db->contexts[i]->last_msg_in = mosquitto_time();
|
404 | db->contexts[i]->last_msg_out = mosquitto_time();
|
405 | db->contexts[i]->keepalive = context->keepalive;
|
406 | db->contexts[i]->pollfd_index = context->pollfd_index;
|
407 | #ifdef WITH_TLS
|
408 | db->contexts[i]->ssl = context->ssl;
|
409 | #endif
|
410 | if(context->username){
|
411 | db->contexts[i]->username = _mosquitto_strdup(context->username);
|
412 | }
|
413 | context->sock = -1;
|
414 | #ifdef WITH_TLS
|
415 | context->ssl = NULL;
|
416 | #endif
|
417 | context->state = mosq_cs_disconnecting;
|
418 | context = db->contexts[i];
|
419 | if(context->msgs){
|
420 | mqtt3_db_message_reconnect_reset(context);
|
421 | }
|
422 | }
|
423 |
|
424 | context->id = client_id;
|
425 | client_id = NULL;
|
426 | context->clean_session = clean_session;
|
427 | context->ping_t = 0;
|
428 | context->is_dropping = false;
|
429 | if((protocol_version&0x80) == 0x80){
|
430 | context->is_bridge = true;
|
431 | }
|
432 |
|
433 |
|
434 | new_cih = _mosquitto_malloc(sizeof(struct _clientid_index_hash));
|
435 | if(!new_cih){
|
436 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
437 | mqtt3_context_disconnect(db, context);
|
438 | rc = MOSQ_ERR_NOMEM;
|
439 | goto handle_connect_error;
|
440 | }
|
441 | new_cih->id = context->id;
|
442 | new_cih->db_context_index = context->db_index;
|
443 | HASH_ADD_KEYPTR(hh, db->clientid_index_hash, context->id, strlen(context->id), new_cih);
|
444 |
|
445 | #ifdef WITH_PERSISTENCE
|
446 | if(!clean_session){
|
447 | db->persistence_changes++;
|
448 | }
|
449 | #endif
|
450 |
|
451 | if(db->acl_list){
|
452 | acl_tail = db->acl_list;
|
453 | while(acl_tail){
|
454 | if(context->username){
|
455 | if(acl_tail->username && !strcmp(context->username, acl_tail->username)){
|
456 | context->acl_list = acl_tail;
|
457 | break;
|
458 | }
|
459 | }else{
|
460 | if(acl_tail->username == NULL){
|
461 | context->acl_list = acl_tail;
|
462 | break;
|
463 | }
|
464 | }
|
465 | acl_tail = acl_tail->next;
|
466 | }
|
467 | }else{
|
468 | context->acl_list = NULL;
|
469 | }
|
470 |
|
471 | if(will_struct){
|
472 | if(mosquitto_acl_check(db, context, will_topic, MOSQ_ACL_WRITE) != MOSQ_ERR_SUCCESS){
|
473 | _mosquitto_send_connack(context, CONNACK_REFUSED_NOT_AUTHORIZED);
|
474 | mqtt3_context_disconnect(db, context);
|
475 | rc = MOSQ_ERR_SUCCESS;
|
476 | goto handle_connect_error;
|
477 | }
|
478 | context->will = will_struct;
|
479 | context->will->topic = will_topic;
|
480 | if(will_payload){
|
481 | context->will->payload = will_payload;
|
482 | context->will->payloadlen = will_payloadlen;
|
483 | }else{
|
484 | context->will->payload = NULL;
|
485 | context->will->payloadlen = 0;
|
486 | }
|
487 | context->will->qos = will_qos;
|
488 | context->will->retain = will_retain;
|
489 | }
|
490 |
|
491 | if(db->config->connection_messages == true){
|
492 | if(context->is_bridge){
|
493 | if(context->username){
|
494 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "New bridge connected from %s as %s (c%d, k%d, u%s).", context->address, context->id, context->clean_session, context->keepalive, context->username);
|
495 | }else{
|
496 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "New bridge connected from %s as %s (c%d, k%d).", context->address, context->id, context->clean_session, context->keepalive);
|
497 | }
|
498 | }else{
|
499 | if(context->username){
|
500 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "New client connected from %s as %s (c%d, k%d, u%s).", context->address, context->id, context->clean_session, context->keepalive, context->username);
|
501 | }else{
|
502 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "New client connected from %s as %s (c%d, k%d).", context->address, context->id, context->clean_session, context->keepalive);
|
503 | }
|
504 | }
|
505 | }
|
506 |
|
507 | context->state = mosq_cs_connected;
|
508 | return _mosquitto_send_connack(context, CONNACK_ACCEPTED);
|
509 |
|
510 | handle_connect_error:
|
511 | if(client_id) _mosquitto_free(client_id);
|
512 | if(username) _mosquitto_free(username);
|
513 | if(password) _mosquitto_free(password);
|
514 | if(will_payload) _mosquitto_free(will_payload);
|
515 | if(will_topic) _mosquitto_free(will_topic);
|
516 | if(will_struct) _mosquitto_free(will_struct);
|
517 | return rc;
|
518 | }
|
519 |
|
520 | int mqtt3_handle_disconnect(struct mosquitto_db *db, struct mosquitto *context)
|
521 | {
|
522 | if(!context){
|
523 | return MOSQ_ERR_INVAL;
|
524 | }
|
525 | if(context->in_packet.remaining_length != 0){
|
526 | return MOSQ_ERR_PROTOCOL;
|
527 | }
|
528 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received DISCONNECT from %s", context->id);
|
529 | if(context->protocol == mosq_p_mqtt311){
|
530 | if((context->in_packet.command&0x0F) != 0x00){
|
531 | mqtt3_context_disconnect(db, context);
|
532 | return MOSQ_ERR_PROTOCOL;
|
533 | }
|
534 | }
|
535 | context->state = mosq_cs_disconnecting;
|
536 | mqtt3_context_disconnect(db, context);
|
537 | return MOSQ_ERR_SUCCESS;
|
538 | }
|
539 |
|
540 |
|
541 | int mqtt3_handle_subscribe(struct mosquitto_db *db, struct mosquitto *context)
|
542 | {
|
543 | int rc = 0;
|
544 | int rc2;
|
545 | uint16_t mid;
|
546 | char *sub;
|
547 | uint8_t qos;
|
548 | uint8_t *payload = NULL, *tmp_payload;
|
549 | uint32_t payloadlen = 0;
|
550 | int len;
|
551 | char *sub_mount;
|
552 |
|
553 | if(!context) return MOSQ_ERR_INVAL;
|
554 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received SUBSCRIBE from %s", context->id);
|
555 |
|
556 |
|
557 | if(context->protocol == mosq_p_mqtt311){
|
558 | if((context->in_packet.command&0x0F) != 0x02){
|
559 | return MOSQ_ERR_PROTOCOL;
|
560 | }
|
561 | }
|
562 | if(_mosquitto_read_uint16(&context->in_packet, &mid)) return 1;
|
563 |
|
564 | while(context->in_packet.pos < context->in_packet.remaining_length){
|
565 | sub = NULL;
|
566 | if(_mosquitto_read_string(&context->in_packet, &sub)){
|
567 | if(payload) _mosquitto_free(payload);
|
568 | return 1;
|
569 | }
|
570 |
|
571 | if(sub){
|
572 | if(!strlen(sub)){
|
573 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Empty subscription string from %s, disconnecting.",
|
574 | context->address);
|
575 | _mosquitto_free(sub);
|
576 | if(payload) _mosquitto_free(payload);
|
577 | return 1;
|
578 | }
|
579 | if(_mosquitto_topic_wildcard_pos_check(sub)){
|
580 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid subscription string from %s, disconnecting.",
|
581 | context->address);
|
582 | _mosquitto_free(sub);
|
583 | if(payload) _mosquitto_free(payload);
|
584 | return 1;
|
585 | }
|
586 |
|
587 | if(_mosquitto_read_byte(&context->in_packet, &qos)){
|
588 | _mosquitto_free(sub);
|
589 | if(payload) _mosquitto_free(payload);
|
590 | return 1;
|
591 | }
|
592 | if(qos > 2){
|
593 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid QoS in subscription command from %s, disconnecting.",
|
594 | context->address);
|
595 | _mosquitto_free(sub);
|
596 | if(payload) _mosquitto_free(payload);
|
597 | return 1;
|
598 | }
|
599 | if(context->listener && context->listener->mount_point){
|
600 | len = strlen(context->listener->mount_point) + strlen(sub) + 1;
|
601 | sub_mount = _mosquitto_calloc(len, sizeof(char));
|
602 | if(!sub_mount){
|
603 | _mosquitto_free(sub);
|
604 | if(payload) _mosquitto_free(payload);
|
605 | return MOSQ_ERR_NOMEM;
|
606 | }
|
607 | snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub);
|
608 | _mosquitto_free(sub);
|
609 | sub = sub_mount;
|
610 |
|
611 | }
|
612 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub, qos);
|
613 |
|
614 | if(context->protocol == mosq_p_mqtt311){
|
615 | rc = mosquitto_acl_check(db, context, sub, MOSQ_ACL_READ);
|
616 | if(rc == MOSQ_ERR_ACL_DENIED){
|
617 | qos = 0x80;
|
618 | }
|
619 | }
|
620 |
|
621 | if(qos != 0x80){
|
622 | rc2 = mqtt3_sub_add(db, context, sub, qos, &db->subs);
|
623 | if(rc2 == MOSQ_ERR_SUCCESS){
|
624 | if(mqtt3_retain_queue(db, context, sub, qos)) rc = 1;
|
625 | }else if(rc2 != -1){
|
626 | rc = rc2;
|
627 | }
|
628 | _mosquitto_log_printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
|
629 | }
|
630 | _mosquitto_free(sub);
|
631 |
|
632 | tmp_payload = _mosquitto_realloc(payload, payloadlen + 1);
|
633 | if(tmp_payload){
|
634 | payload = tmp_payload;
|
635 | payload[payloadlen] = qos;
|
636 | payloadlen++;
|
637 | }else{
|
638 | if(payload) _mosquitto_free(payload);
|
639 |
|
640 | return MOSQ_ERR_NOMEM;
|
641 | }
|
642 | }
|
643 | }
|
644 |
|
645 | if(context->protocol == mosq_p_mqtt311){
|
646 | if(payloadlen == 0){
|
647 |
|
648 | return MOSQ_ERR_PROTOCOL;
|
649 | }
|
650 | }
|
651 | if(_mosquitto_send_suback(context, mid, payloadlen, payload)) rc = 1;
|
652 | _mosquitto_free(payload);
|
653 |
|
654 | #ifdef WITH_PERSISTENCE
|
655 | db->persistence_changes++;
|
656 | #endif
|
657 |
|
658 | return rc;
|
659 | }
|
660 |
|
661 | int mqtt3_handle_unsubscribe(struct mosquitto_db *db, struct mosquitto *context)
|
662 | {
|
663 | uint16_t mid;
|
664 | char *sub;
|
665 |
|
666 | if(!context) return MOSQ_ERR_INVAL;
|
667 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBSCRIBE from %s", context->id);
|
668 |
|
669 | if(context->protocol == mosq_p_mqtt311){
|
670 | if((context->in_packet.command&0x0F) != 0x02){
|
671 | return MOSQ_ERR_PROTOCOL;
|
672 | }
|
673 | }
|
674 | if(_mosquitto_read_uint16(&context->in_packet, &mid)) return 1;
|
675 |
|
676 | while(context->in_packet.pos < context->in_packet.remaining_length){
|
677 | sub = NULL;
|
678 | if(_mosquitto_read_string(&context->in_packet, &sub)){
|
679 | return 1;
|
680 | }
|
681 |
|
682 | if(sub){
|
683 | if(!strlen(sub)){
|
684 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Empty unsubscription string from %s, disconnecting.",
|
685 | context->id);
|
686 | _mosquitto_free(sub);
|
687 | return 1;
|
688 | }
|
689 | if(_mosquitto_topic_wildcard_pos_check(sub)){
|
690 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Invalid unsubscription string from %s, disconnecting.",
|
691 | context->id);
|
692 | _mosquitto_free(sub);
|
693 | return 1;
|
694 | }
|
695 |
|
696 | _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub);
|
697 | mqtt3_sub_remove(db, context, sub, &db->subs);
|
698 | _mosquitto_log_printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub);
|
699 | _mosquitto_free(sub);
|
700 | }
|
701 | }
|
702 | #ifdef WITH_PERSISTENCE
|
703 | db->persistence_changes++;
|
704 | #endif
|
705 |
|
706 | return _mosquitto_send_command_with_mid(context, UNSUBACK, mid, false);
|
707 | }
|
708 |
|