1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 | #include <config.h>
|
62 |
|
63 | #include <assert.h>
|
64 | #include <stdio.h>
|
65 | #include <string.h>
|
66 |
|
67 | #include <mosquitto_broker.h>
|
68 | #include <memory_mosq.h>
|
69 | #include <util_mosq.h>
|
70 |
|
71 | struct _sub_token {
|
72 | struct _sub_token *next;
|
73 | char *topic;
|
74 | };
|
75 |
|
76 | static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
|
77 | {
|
78 | int rc = 0;
|
79 | int rc2;
|
80 | int client_qos, msg_qos;
|
81 | uint16_t mid;
|
82 | struct _mosquitto_subleaf *leaf;
|
83 | bool client_retain;
|
84 |
|
85 | leaf = hier->subs;
|
86 |
|
87 | if(retain && set_retain){
|
88 | #ifdef WITH_PERSISTENCE
|
89 | if(strncmp(topic, "$SYS", 4)){
|
90 | |
91 |
|
92 | db->persistence_changes++;
|
93 | }
|
94 | #endif
|
95 | if(hier->retained){
|
96 | hier->retained->ref_count--;
|
97 |
|
98 | db->retained_count--;
|
99 | }
|
100 | if(stored->msg.payloadlen){
|
101 | hier->retained = stored;
|
102 | hier->retained->ref_count++;
|
103 | db->retained_count++;
|
104 | }else{
|
105 | hier->retained = NULL;
|
106 | }
|
107 | }
|
108 | while(source_id && leaf){
|
109 | if(leaf->context->is_bridge && !strcmp(leaf->context->id, source_id)){
|
110 | leaf = leaf->next;
|
111 | continue;
|
112 | }
|
113 |
|
114 | rc2 = mosquitto_acl_check(db, leaf->context, topic, MOSQ_ACL_READ);
|
115 | if(rc2 == MOSQ_ERR_ACL_DENIED){
|
116 | leaf = leaf->next;
|
117 | continue;
|
118 | }else if(rc2 == MOSQ_ERR_SUCCESS){
|
119 | client_qos = leaf->qos;
|
120 |
|
121 | if(db->config->upgrade_outgoing_qos){
|
122 | msg_qos = client_qos;
|
123 | }else{
|
124 | if(qos > client_qos){
|
125 | msg_qos = client_qos;
|
126 | }else{
|
127 | msg_qos = qos;
|
128 | }
|
129 | }
|
130 | if(msg_qos){
|
131 | mid = _mosquitto_mid_generate(leaf->context);
|
132 | }else{
|
133 | mid = 0;
|
134 | }
|
135 | if(leaf->context->is_bridge){
|
136 | |
137 |
|
138 |
|
139 | client_retain = retain;
|
140 | }else{
|
141 | |
142 |
|
143 | client_retain = false;
|
144 | }
|
145 | if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1;
|
146 | }else{
|
147 | rc = 1;
|
148 | }
|
149 | leaf = leaf->next;
|
150 | }
|
151 | return rc;
|
152 | }
|
153 |
|
154 | static int _sub_topic_tokenise(const char *subtopic, struct _sub_token **topics)
|
155 | {
|
156 | struct _sub_token *new_topic, *tail = NULL;
|
157 | int len;
|
158 | int start, stop, tlen;
|
159 | int i;
|
160 |
|
161 | assert(subtopic);
|
162 | assert(topics);
|
163 |
|
164 | if(subtopic[0] != '$'){
|
165 | new_topic = _mosquitto_malloc(sizeof(struct _sub_token));
|
166 | if(!new_topic) goto cleanup;
|
167 | new_topic->next = NULL;
|
168 | new_topic->topic = _mosquitto_strdup("");
|
169 | if(!new_topic->topic) goto cleanup;
|
170 |
|
171 | *topics = new_topic;
|
172 | tail = new_topic;
|
173 | }
|
174 |
|
175 | len = strlen(subtopic);
|
176 |
|
177 | if(subtopic[0] == '/'){
|
178 | new_topic = _mosquitto_malloc(sizeof(struct _sub_token));
|
179 | if(!new_topic) goto cleanup;
|
180 | new_topic->next = NULL;
|
181 | new_topic->topic = _mosquitto_strdup("");
|
182 | if(!new_topic->topic) goto cleanup;
|
183 |
|
184 | *topics = new_topic;
|
185 | tail = new_topic;
|
186 |
|
187 | start = 1;
|
188 | }else{
|
189 | start = 0;
|
190 | }
|
191 |
|
192 | stop = 0;
|
193 | for(i=start; i<len+1; i++){
|
194 | if(subtopic[i] == '/' || subtopic[i] == '\0'){
|
195 | stop = i;
|
196 | new_topic = _mosquitto_malloc(sizeof(struct _sub_token));
|
197 | if(!new_topic) goto cleanup;
|
198 | new_topic->next = NULL;
|
199 |
|
200 | if(start != stop){
|
201 | tlen = stop-start + 1;
|
202 |
|
203 | new_topic->topic = _mosquitto_calloc(tlen, sizeof(char));
|
204 | if(!new_topic->topic) goto cleanup;
|
205 | memcpy(new_topic->topic, &subtopic[start], tlen-1);
|
206 | }else{
|
207 | new_topic->topic = _mosquitto_strdup("");
|
208 | if(!new_topic->topic) goto cleanup;
|
209 | }
|
210 | if(tail){
|
211 | tail->next = new_topic;
|
212 | tail = tail->next;
|
213 | }else{
|
214 | tail = new_topic;
|
215 | *topics = tail;
|
216 | }
|
217 | start = i+1;
|
218 | }
|
219 | }
|
220 |
|
221 | return MOSQ_ERR_SUCCESS;
|
222 |
|
223 | cleanup:
|
224 | tail = *topics;
|
225 | *topics = NULL;
|
226 | while(tail){
|
227 | if(tail->topic) _mosquitto_free(tail->topic);
|
228 | new_topic = tail->next;
|
229 | _mosquitto_free(tail);
|
230 | tail = new_topic;
|
231 | }
|
232 | return 1;
|
233 | }
|
234 |
|
235 | static int _sub_add(struct mosquitto_db *db, struct mosquitto *context, int qos, struct _mosquitto_subhier *subhier, struct _sub_token *tokens)
|
236 | {
|
237 | struct _mosquitto_subhier *branch, *last = NULL;
|
238 | struct _mosquitto_subleaf *leaf, *last_leaf;
|
239 |
|
240 | if(!tokens){
|
241 | if(context){
|
242 | leaf = subhier->subs;
|
243 | last_leaf = NULL;
|
244 | while(leaf){
|
245 | if(!strcmp(leaf->context->id, context->id)){
|
246 | |
247 |
|
248 |
|
249 | leaf->qos = qos;
|
250 | return -1;
|
251 | }
|
252 | last_leaf = leaf;
|
253 | leaf = leaf->next;
|
254 | }
|
255 | leaf = _mosquitto_malloc(sizeof(struct _mosquitto_subleaf));
|
256 | if(!leaf) return MOSQ_ERR_NOMEM;
|
257 | leaf->next = NULL;
|
258 | leaf->context = context;
|
259 | leaf->qos = qos;
|
260 | if(last_leaf){
|
261 | last_leaf->next = leaf;
|
262 | leaf->prev = last_leaf;
|
263 | }else{
|
264 | subhier->subs = leaf;
|
265 | leaf->prev = NULL;
|
266 | }
|
267 | db->subscription_count++;
|
268 | }
|
269 | return MOSQ_ERR_SUCCESS;
|
270 | }
|
271 |
|
272 | branch = subhier->children;
|
273 | while(branch){
|
274 | if(!strcmp(branch->topic, tokens->topic)){
|
275 | return _sub_add(db, context, qos, branch, tokens->next);
|
276 | }
|
277 | last = branch;
|
278 | branch = branch->next;
|
279 | }
|
280 |
|
281 | branch = _mosquitto_calloc(1, sizeof(struct _mosquitto_subhier));
|
282 | if(!branch) return MOSQ_ERR_NOMEM;
|
283 | branch->topic = _mosquitto_strdup(tokens->topic);
|
284 | if(!branch->topic){
|
285 | _mosquitto_free(branch);
|
286 | return MOSQ_ERR_NOMEM;
|
287 | }
|
288 | if(!last){
|
289 | subhier->children = branch;
|
290 | }else{
|
291 | last->next = branch;
|
292 | }
|
293 | return _sub_add(db, context, qos, branch, tokens->next);
|
294 | }
|
295 |
|
296 | static int _sub_remove(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *subhier, struct _sub_token *tokens)
|
297 | {
|
298 | struct _mosquitto_subhier *branch, *last = NULL;
|
299 | struct _mosquitto_subleaf *leaf;
|
300 |
|
301 | if(!tokens){
|
302 | leaf = subhier->subs;
|
303 | while(leaf){
|
304 | if(leaf->context==context){
|
305 | db->subscription_count--;
|
306 | if(leaf->prev){
|
307 | leaf->prev->next = leaf->next;
|
308 | }else{
|
309 | subhier->subs = leaf->next;
|
310 | }
|
311 | if(leaf->next){
|
312 | leaf->next->prev = leaf->prev;
|
313 | }
|
314 | _mosquitto_free(leaf);
|
315 | return MOSQ_ERR_SUCCESS;
|
316 | }
|
317 | leaf = leaf->next;
|
318 | }
|
319 | return MOSQ_ERR_SUCCESS;
|
320 | }
|
321 |
|
322 | branch = subhier->children;
|
323 | while(branch){
|
324 | if(!strcmp(branch->topic, tokens->topic)){
|
325 | _sub_remove(db, context, branch, tokens->next);
|
326 | if(!branch->children && !branch->subs && !branch->retained){
|
327 | if(last){
|
328 | last->next = branch->next;
|
329 | }else{
|
330 | subhier->children = branch->next;
|
331 | }
|
332 | _mosquitto_free(branch->topic);
|
333 | _mosquitto_free(branch);
|
334 | }
|
335 | return MOSQ_ERR_SUCCESS;
|
336 | }
|
337 | last = branch;
|
338 | branch = branch->next;
|
339 | }
|
340 | return MOSQ_ERR_SUCCESS;
|
341 | }
|
342 |
|
343 | static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
|
344 | {
|
345 |
|
346 | struct _mosquitto_subhier *branch;
|
347 | bool sr;
|
348 |
|
349 | branch = subhier->children;
|
350 | while(branch){
|
351 | sr = set_retain;
|
352 |
|
353 | if(tokens && tokens->topic && (!strcmp(branch->topic, tokens->topic) || !strcmp(branch->topic, "+"))){
|
354 | |
355 |
|
356 | if(!strcmp(branch->topic, "+")){
|
357 |
|
358 | sr = false;
|
359 | }
|
360 | _sub_search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr);
|
361 | if(!tokens->next){
|
362 | _subs_process(db, branch, source_id, topic, qos, retain, stored, sr);
|
363 | }
|
364 | }else if(!strcmp(branch->topic, "#") && !branch->children){
|
365 | |
366 |
|
367 |
|
368 |
|
369 | _subs_process(db, branch, source_id, topic, qos, retain, stored, false);
|
370 | }
|
371 | branch = branch->next;
|
372 | }
|
373 | }
|
374 |
|
375 | int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct _mosquitto_subhier *root)
|
376 | {
|
377 | int rc = 0;
|
378 | struct _mosquitto_subhier *subhier, *child;
|
379 | struct _sub_token *tokens = NULL, *tail;
|
380 |
|
381 | assert(root);
|
382 | assert(sub);
|
383 |
|
384 | if(_sub_topic_tokenise(sub, &tokens)) return 1;
|
385 |
|
386 | subhier = root->children;
|
387 | while(subhier){
|
388 | if(!strcmp(subhier->topic, tokens->topic)){
|
389 | rc = _sub_add(db, context, qos, subhier, tokens);
|
390 | break;
|
391 | }
|
392 | subhier = subhier->next;
|
393 | }
|
394 | if(!subhier){
|
395 | child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
|
396 | if(!child){
|
397 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
398 | return MOSQ_ERR_NOMEM;
|
399 | }
|
400 | child->next = NULL;
|
401 | child->topic = _mosquitto_strdup(tokens->topic);
|
402 | if(!child->topic){
|
403 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
404 | return MOSQ_ERR_NOMEM;
|
405 | }
|
406 | child->subs = NULL;
|
407 | child->children = NULL;
|
408 | child->retained = NULL;
|
409 | db->subs.children = child;
|
410 |
|
411 | rc = _sub_add(db, context, qos, child, tokens);
|
412 | }
|
413 |
|
414 | while(tokens){
|
415 | tail = tokens->next;
|
416 | _mosquitto_free(tokens->topic);
|
417 | _mosquitto_free(tokens);
|
418 | tokens = tail;
|
419 | }
|
420 |
|
421 | if(rc == -1) rc = MOSQ_ERR_SUCCESS;
|
422 | return rc;
|
423 | }
|
424 |
|
425 | int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct _mosquitto_subhier *root)
|
426 | {
|
427 | int rc = 0;
|
428 | struct _mosquitto_subhier *subhier;
|
429 | struct _sub_token *tokens = NULL, *tail;
|
430 |
|
431 | assert(root);
|
432 | assert(sub);
|
433 |
|
434 | if(_sub_topic_tokenise(sub, &tokens)) return 1;
|
435 |
|
436 | subhier = root->children;
|
437 | while(subhier){
|
438 | if(!strcmp(subhier->topic, tokens->topic)){
|
439 | rc = _sub_remove(db, context, subhier, tokens);
|
440 | break;
|
441 | }
|
442 | subhier = subhier->next;
|
443 | }
|
444 |
|
445 | while(tokens){
|
446 | tail = tokens->next;
|
447 | _mosquitto_free(tokens->topic);
|
448 | _mosquitto_free(tokens);
|
449 | tokens = tail;
|
450 | }
|
451 |
|
452 | return rc;
|
453 | }
|
454 |
|
455 | int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored)
|
456 | {
|
457 | int rc = 0;
|
458 | struct _mosquitto_subhier *subhier;
|
459 | struct _sub_token *tokens = NULL, *tail;
|
460 |
|
461 | assert(db);
|
462 | assert(topic);
|
463 |
|
464 | if(_sub_topic_tokenise(topic, &tokens)) return 1;
|
465 |
|
466 | subhier = db->subs.children;
|
467 | while(subhier){
|
468 | if(!strcmp(subhier->topic, tokens->topic)){
|
469 | if(retain){
|
470 | |
471 |
|
472 |
|
473 | _sub_add(db, NULL, 0, subhier, tokens);
|
474 | }
|
475 | _sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true);
|
476 | }
|
477 | subhier = subhier->next;
|
478 | }
|
479 | while(tokens){
|
480 | tail = tokens->next;
|
481 | _mosquitto_free(tokens->topic);
|
482 | _mosquitto_free(tokens);
|
483 | tokens = tail;
|
484 | }
|
485 |
|
486 | return rc;
|
487 | }
|
488 |
|
489 | static int _subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root)
|
490 | {
|
491 | int rc = 0;
|
492 | struct _mosquitto_subhier *child, *last = NULL;
|
493 | struct _mosquitto_subleaf *leaf, *next;
|
494 |
|
495 | if(!root) return MOSQ_ERR_SUCCESS;
|
496 |
|
497 | leaf = root->subs;
|
498 | while(leaf){
|
499 | if(leaf->context == context){
|
500 | db->subscription_count--;
|
501 | if(leaf->prev){
|
502 | leaf->prev->next = leaf->next;
|
503 | }else{
|
504 | root->subs = leaf->next;
|
505 | }
|
506 | if(leaf->next){
|
507 | leaf->next->prev = leaf->prev;
|
508 | }
|
509 | next = leaf->next;
|
510 | _mosquitto_free(leaf);
|
511 | leaf = next;
|
512 | }else{
|
513 | leaf = leaf->next;
|
514 | }
|
515 | }
|
516 |
|
517 | child = root->children;
|
518 | while(child){
|
519 | _subs_clean_session(db, context, child);
|
520 | if(!child->children && !child->subs && !child->retained){
|
521 | if(last){
|
522 | last->next = child->next;
|
523 | }else{
|
524 | root->children = child->next;
|
525 | }
|
526 | _mosquitto_free(child->topic);
|
527 | _mosquitto_free(child);
|
528 | if(last){
|
529 | child = last->next;
|
530 | }else{
|
531 | child = root->children;
|
532 | }
|
533 | }else{
|
534 | last = child;
|
535 | child = child->next;
|
536 | }
|
537 | }
|
538 | return rc;
|
539 | }
|
540 |
|
541 |
|
542 |
|
543 | int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root)
|
544 | {
|
545 | struct _mosquitto_subhier *child;
|
546 |
|
547 | child = root->children;
|
548 | while(child){
|
549 | _subs_clean_session(db, context, child);
|
550 | child = child->next;
|
551 | }
|
552 |
|
553 | return MOSQ_ERR_SUCCESS;
|
554 | }
|
555 |
|
556 | void mqtt3_sub_tree_print(struct _mosquitto_subhier *root, int level)
|
557 | {
|
558 | int i;
|
559 | struct _mosquitto_subhier *branch;
|
560 | struct _mosquitto_subleaf *leaf;
|
561 |
|
562 | for(i=0; i<level*2; i++){
|
563 | printf(" ");
|
564 | }
|
565 | printf("%s", root->topic);
|
566 | leaf = root->subs;
|
567 | while(leaf){
|
568 | if(leaf->context){
|
569 | printf(" (%s, %d)", leaf->context->id, leaf->qos);
|
570 | }else{
|
571 | printf(" (%s, %d)", "", leaf->qos);
|
572 | }
|
573 | leaf = leaf->next;
|
574 | }
|
575 | if(root->retained){
|
576 | printf(" (r)");
|
577 | }
|
578 | printf("\n");
|
579 |
|
580 | branch = root->children;
|
581 | while(branch){
|
582 | mqtt3_sub_tree_print(branch, level+1);
|
583 | branch = branch->next;
|
584 | }
|
585 | }
|
586 |
|
587 | static int _retain_process(struct mosquitto_db *db, struct mosquitto_msg_store *retained, struct mosquitto *context, const char *sub, int sub_qos)
|
588 | {
|
589 | int rc = 0;
|
590 | int qos;
|
591 | uint16_t mid;
|
592 |
|
593 | rc = mosquitto_acl_check(db, context, retained->msg.topic, MOSQ_ACL_READ);
|
594 | if(rc == MOSQ_ERR_ACL_DENIED){
|
595 | return MOSQ_ERR_SUCCESS;
|
596 | }else if(rc != MOSQ_ERR_SUCCESS){
|
597 | return rc;
|
598 | }
|
599 |
|
600 | qos = retained->msg.qos;
|
601 |
|
602 | if(qos > sub_qos) qos = sub_qos;
|
603 | if(qos > 0){
|
604 | mid = _mosquitto_mid_generate(context);
|
605 | }else{
|
606 | mid = 0;
|
607 | }
|
608 | return mqtt3_db_message_insert(db, context, mid, mosq_md_out, qos, true, retained);
|
609 | }
|
610 |
|
611 | static int _retain_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, struct mosquitto *context, const char *sub, int sub_qos, int level)
|
612 | {
|
613 | struct _mosquitto_subhier *branch;
|
614 | int flag = 0;
|
615 |
|
616 | branch = subhier->children;
|
617 | while(branch){
|
618 | |
619 |
|
620 |
|
621 | if(!strcmp(tokens->topic, "#") && !tokens->next){
|
622 | |
623 |
|
624 |
|
625 |
|
626 | flag = -1;
|
627 | if(branch->retained){
|
628 | _retain_process(db, branch->retained, context, sub, sub_qos);
|
629 | }
|
630 | if(branch->children){
|
631 | _retain_search(db, branch, tokens, context, sub, sub_qos, level+1);
|
632 | }
|
633 | }else if(strcmp(branch->topic, "+") && (!strcmp(branch->topic, tokens->topic) || !strcmp(tokens->topic, "+"))){
|
634 | if(tokens->next){
|
635 | if(_retain_search(db, branch, tokens->next, context, sub, sub_qos, level+1) == -1
|
636 | || (!branch->next && tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){
|
637 |
|
638 | if(branch->retained){
|
639 | _retain_process(db, branch->retained, context, sub, sub_qos);
|
640 | }
|
641 | }
|
642 | }else{
|
643 | if(branch->retained){
|
644 | _retain_process(db, branch->retained, context, sub, sub_qos);
|
645 | }
|
646 | }
|
647 | }
|
648 |
|
649 | branch = branch->next;
|
650 | }
|
651 | return flag;
|
652 | }
|
653 |
|
654 | int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos)
|
655 | {
|
656 | struct _mosquitto_subhier *subhier;
|
657 | struct _sub_token *tokens = NULL, *tail;
|
658 |
|
659 | assert(db);
|
660 | assert(context);
|
661 | assert(sub);
|
662 |
|
663 | if(_sub_topic_tokenise(sub, &tokens)) return 1;
|
664 |
|
665 | subhier = db->subs.children;
|
666 | while(subhier){
|
667 | if(!strcmp(subhier->topic, tokens->topic)){
|
668 | _retain_search(db, subhier, tokens, context, sub, sub_qos, 0);
|
669 | break;
|
670 | }
|
671 | subhier = subhier->next;
|
672 | }
|
673 | while(tokens){
|
674 | tail = tokens->next;
|
675 | _mosquitto_free(tokens->topic);
|
676 | _mosquitto_free(tokens);
|
677 | tokens = tail;
|
678 | }
|
679 |
|
680 | return MOSQ_ERR_SUCCESS;
|
681 | }
|
682 |
|