UNPKG

17.9 kBtext/x-cView Raw
1/*
2Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30/* A note on matching topic subscriptions.
31 *
32 * Topics can be up to 32767 characters in length. The / character is used as a
33 * hierarchy delimiter. Messages are published to a particular topic.
34 * Clients may subscribe to particular topics directly, but may also use
35 * wildcards in subscriptions. The + and # characters are used as wildcards.
36 * The # wildcard can be used at the end of a subscription only, and is a
37 * wildcard for the level of hierarchy at which it is placed and all subsequent
38 * levels.
39 * The + wildcard may be used at any point within the subscription and is a
40 * wildcard for only the level of hierarchy at which it is placed.
41 * Neither wildcard may be used as part of a substring.
42 * Valid:
43 * a/b/+
44 * a/+/c
45 * a/#
46 * a/b/#
47 * #
48 * +/b/c
49 * +/+/+
50 * Invalid:
51 * a/#/c
52 * a+/b/c
53 * Valid but non-matching:
54 * a/b
55 * a/+
56 * +/b
57 * b/c/a
58 * a/b/d
59 */
60
61#include <config.h>
62
63#include <assert.h>
64#include <stdio.h>
65#include <string.h>
66
67#include <mosquitto_broker.h>
68#include <memory_mosq.h>
69#include <util_mosq.h>
70
71struct _sub_token {
72 struct _sub_token *next;
73 char *topic;
74};
75
76static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
77{
78 int rc = 0;
79 int rc2;
80 int client_qos, msg_qos;
81 uint16_t mid;
82 struct _mosquitto_subleaf *leaf;
83 bool client_retain;
84
85 leaf = hier->subs;
86
87 if(retain && set_retain){
88#ifdef WITH_PERSISTENCE
89 if(strncmp(topic, "$SYS", 4)){
90 /* Retained messages count as a persistence change, but only if
91 * they aren't for $SYS. */
92 db->persistence_changes++;
93 }
94#endif
95 if(hier->retained){
96 hier->retained->ref_count--;
97 /* FIXME - it would be nice to be able to remove the message from the store at this point if ref_count == 0 */
98 db->retained_count--;
99 }
100 if(stored->msg.payloadlen){
101 hier->retained = stored;
102 hier->retained->ref_count++;
103 db->retained_count++;
104 }else{
105 hier->retained = NULL;
106 }
107 }
108 while(source_id && leaf){
109 if(leaf->context->is_bridge && !strcmp(leaf->context->id, source_id)){
110 leaf = leaf->next;
111 continue;
112 }
113 /* Check for ACL topic access. */
114 rc2 = mosquitto_acl_check(db, leaf->context, topic, MOSQ_ACL_READ);
115 if(rc2 == MOSQ_ERR_ACL_DENIED){
116 leaf = leaf->next;
117 continue;
118 }else if(rc2 == MOSQ_ERR_SUCCESS){
119 client_qos = leaf->qos;
120
121 if(db->config->upgrade_outgoing_qos){
122 msg_qos = client_qos;
123 }else{
124 if(qos > client_qos){
125 msg_qos = client_qos;
126 }else{
127 msg_qos = qos;
128 }
129 }
130 if(msg_qos){
131 mid = _mosquitto_mid_generate(leaf->context);
132 }else{
133 mid = 0;
134 }
135 if(leaf->context->is_bridge){
136 /* If we know the client is a bridge then we should set retain
137 * even if the message is fresh. If we don't do this, retained
138 * messages won't be propagated. */
139 client_retain = retain;
140 }else{
141 /* Client is not a bridge and this isn't a stale message so
142 * retain should be false. */
143 client_retain = false;
144 }
145 if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1;
146 }else{
147 rc = 1;
148 }
149 leaf = leaf->next;
150 }
151 return rc;
152}
153
154static int _sub_topic_tokenise(const char *subtopic, struct _sub_token **topics)
155{
156 struct _sub_token *new_topic, *tail = NULL;
157 int len;
158 int start, stop, tlen;
159 int i;
160
161 assert(subtopic);
162 assert(topics);
163
164 if(subtopic[0] != '$'){
165 new_topic = _mosquitto_malloc(sizeof(struct _sub_token));
166 if(!new_topic) goto cleanup;
167 new_topic->next = NULL;
168 new_topic->topic = _mosquitto_strdup("");
169 if(!new_topic->topic) goto cleanup;
170
171 *topics = new_topic;
172 tail = new_topic;
173 }
174
175 len = strlen(subtopic);
176
177 if(subtopic[0] == '/'){
178 new_topic = _mosquitto_malloc(sizeof(struct _sub_token));
179 if(!new_topic) goto cleanup;
180 new_topic->next = NULL;
181 new_topic->topic = _mosquitto_strdup("");
182 if(!new_topic->topic) goto cleanup;
183
184 *topics = new_topic;
185 tail = new_topic;
186
187 start = 1;
188 }else{
189 start = 0;
190 }
191
192 stop = 0;
193 for(i=start; i<len+1; i++){
194 if(subtopic[i] == '/' || subtopic[i] == '\0'){
195 stop = i;
196 new_topic = _mosquitto_malloc(sizeof(struct _sub_token));
197 if(!new_topic) goto cleanup;
198 new_topic->next = NULL;
199
200 if(start != stop){
201 tlen = stop-start + 1;
202
203 new_topic->topic = _mosquitto_calloc(tlen, sizeof(char));
204 if(!new_topic->topic) goto cleanup;
205 memcpy(new_topic->topic, &subtopic[start], tlen-1);
206 }else{
207 new_topic->topic = _mosquitto_strdup("");
208 if(!new_topic->topic) goto cleanup;
209 }
210 if(tail){
211 tail->next = new_topic;
212 tail = tail->next;
213 }else{
214 tail = new_topic;
215 *topics = tail;
216 }
217 start = i+1;
218 }
219 }
220
221 return MOSQ_ERR_SUCCESS;
222
223cleanup:
224 tail = *topics;
225 *topics = NULL;
226 while(tail){
227 if(tail->topic) _mosquitto_free(tail->topic);
228 new_topic = tail->next;
229 _mosquitto_free(tail);
230 tail = new_topic;
231 }
232 return 1;
233}
234
235static int _sub_add(struct mosquitto_db *db, struct mosquitto *context, int qos, struct _mosquitto_subhier *subhier, struct _sub_token *tokens)
236{
237 struct _mosquitto_subhier *branch, *last = NULL;
238 struct _mosquitto_subleaf *leaf, *last_leaf;
239
240 if(!tokens){
241 if(context){
242 leaf = subhier->subs;
243 last_leaf = NULL;
244 while(leaf){
245 if(!strcmp(leaf->context->id, context->id)){
246 /* Client making a second subscription to same topic. Only
247 * need to update QoS. Return -1 to indicate this to the
248 * calling function. */
249 leaf->qos = qos;
250 return -1;
251 }
252 last_leaf = leaf;
253 leaf = leaf->next;
254 }
255 leaf = _mosquitto_malloc(sizeof(struct _mosquitto_subleaf));
256 if(!leaf) return MOSQ_ERR_NOMEM;
257 leaf->next = NULL;
258 leaf->context = context;
259 leaf->qos = qos;
260 if(last_leaf){
261 last_leaf->next = leaf;
262 leaf->prev = last_leaf;
263 }else{
264 subhier->subs = leaf;
265 leaf->prev = NULL;
266 }
267 db->subscription_count++;
268 }
269 return MOSQ_ERR_SUCCESS;
270 }
271
272 branch = subhier->children;
273 while(branch){
274 if(!strcmp(branch->topic, tokens->topic)){
275 return _sub_add(db, context, qos, branch, tokens->next);
276 }
277 last = branch;
278 branch = branch->next;
279 }
280 /* Not found */
281 branch = _mosquitto_calloc(1, sizeof(struct _mosquitto_subhier));
282 if(!branch) return MOSQ_ERR_NOMEM;
283 branch->topic = _mosquitto_strdup(tokens->topic);
284 if(!branch->topic){
285 _mosquitto_free(branch);
286 return MOSQ_ERR_NOMEM;
287 }
288 if(!last){
289 subhier->children = branch;
290 }else{
291 last->next = branch;
292 }
293 return _sub_add(db, context, qos, branch, tokens->next);
294}
295
296static int _sub_remove(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *subhier, struct _sub_token *tokens)
297{
298 struct _mosquitto_subhier *branch, *last = NULL;
299 struct _mosquitto_subleaf *leaf;
300
301 if(!tokens){
302 leaf = subhier->subs;
303 while(leaf){
304 if(leaf->context==context){
305 db->subscription_count--;
306 if(leaf->prev){
307 leaf->prev->next = leaf->next;
308 }else{
309 subhier->subs = leaf->next;
310 }
311 if(leaf->next){
312 leaf->next->prev = leaf->prev;
313 }
314 _mosquitto_free(leaf);
315 return MOSQ_ERR_SUCCESS;
316 }
317 leaf = leaf->next;
318 }
319 return MOSQ_ERR_SUCCESS;
320 }
321
322 branch = subhier->children;
323 while(branch){
324 if(!strcmp(branch->topic, tokens->topic)){
325 _sub_remove(db, context, branch, tokens->next);
326 if(!branch->children && !branch->subs && !branch->retained){
327 if(last){
328 last->next = branch->next;
329 }else{
330 subhier->children = branch->next;
331 }
332 _mosquitto_free(branch->topic);
333 _mosquitto_free(branch);
334 }
335 return MOSQ_ERR_SUCCESS;
336 }
337 last = branch;
338 branch = branch->next;
339 }
340 return MOSQ_ERR_SUCCESS;
341}
342
343static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
344{
345 /* FIXME - need to take into account source_id if the client is a bridge */
346 struct _mosquitto_subhier *branch;
347 bool sr;
348
349 branch = subhier->children;
350 while(branch){
351 sr = set_retain;
352
353 if(tokens && tokens->topic && (!strcmp(branch->topic, tokens->topic) || !strcmp(branch->topic, "+"))){
354 /* The topic matches this subscription.
355 * Doesn't include # wildcards */
356 if(!strcmp(branch->topic, "+")){
357 /* Don't set a retained message where + is in the hierarchy. */
358 sr = false;
359 }
360 _sub_search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr);
361 if(!tokens->next){
362 _subs_process(db, branch, source_id, topic, qos, retain, stored, sr);
363 }
364 }else if(!strcmp(branch->topic, "#") && !branch->children){
365 /* The topic matches due to a # wildcard - process the
366 * subscriptions but *don't* return. Although this branch has ended
367 * there may still be other subscriptions to deal with.
368 */
369 _subs_process(db, branch, source_id, topic, qos, retain, stored, false);
370 }
371 branch = branch->next;
372 }
373}
374
375int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct _mosquitto_subhier *root)
376{
377 int rc = 0;
378 struct _mosquitto_subhier *subhier, *child;
379 struct _sub_token *tokens = NULL, *tail;
380
381 assert(root);
382 assert(sub);
383
384 if(_sub_topic_tokenise(sub, &tokens)) return 1;
385
386 subhier = root->children;
387 while(subhier){
388 if(!strcmp(subhier->topic, tokens->topic)){
389 rc = _sub_add(db, context, qos, subhier, tokens);
390 break;
391 }
392 subhier = subhier->next;
393 }
394 if(!subhier){
395 child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
396 if(!child){
397 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
398 return MOSQ_ERR_NOMEM;
399 }
400 child->next = NULL;
401 child->topic = _mosquitto_strdup(tokens->topic);
402 if(!child->topic){
403 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
404 return MOSQ_ERR_NOMEM;
405 }
406 child->subs = NULL;
407 child->children = NULL;
408 child->retained = NULL;
409 db->subs.children = child;
410
411 rc = _sub_add(db, context, qos, child, tokens);
412 }
413
414 while(tokens){
415 tail = tokens->next;
416 _mosquitto_free(tokens->topic);
417 _mosquitto_free(tokens);
418 tokens = tail;
419 }
420 /* We aren't worried about -1 (already subscribed) return codes. */
421 if(rc == -1) rc = MOSQ_ERR_SUCCESS;
422 return rc;
423}
424
425int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct _mosquitto_subhier *root)
426{
427 int rc = 0;
428 struct _mosquitto_subhier *subhier;
429 struct _sub_token *tokens = NULL, *tail;
430
431 assert(root);
432 assert(sub);
433
434 if(_sub_topic_tokenise(sub, &tokens)) return 1;
435
436 subhier = root->children;
437 while(subhier){
438 if(!strcmp(subhier->topic, tokens->topic)){
439 rc = _sub_remove(db, context, subhier, tokens);
440 break;
441 }
442 subhier = subhier->next;
443 }
444
445 while(tokens){
446 tail = tokens->next;
447 _mosquitto_free(tokens->topic);
448 _mosquitto_free(tokens);
449 tokens = tail;
450 }
451
452 return rc;
453}
454
455int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored)
456{
457 int rc = 0;
458 struct _mosquitto_subhier *subhier;
459 struct _sub_token *tokens = NULL, *tail;
460
461 assert(db);
462 assert(topic);
463
464 if(_sub_topic_tokenise(topic, &tokens)) return 1;
465
466 subhier = db->subs.children;
467 while(subhier){
468 if(!strcmp(subhier->topic, tokens->topic)){
469 if(retain){
470 /* We have a message that needs to be retained, so ensure that the subscription
471 * tree for its topic exists.
472 */
473 _sub_add(db, NULL, 0, subhier, tokens);
474 }
475 _sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true);
476 }
477 subhier = subhier->next;
478 }
479 while(tokens){
480 tail = tokens->next;
481 _mosquitto_free(tokens->topic);
482 _mosquitto_free(tokens);
483 tokens = tail;
484 }
485
486 return rc;
487}
488
489static int _subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root)
490{
491 int rc = 0;
492 struct _mosquitto_subhier *child, *last = NULL;
493 struct _mosquitto_subleaf *leaf, *next;
494
495 if(!root) return MOSQ_ERR_SUCCESS;
496
497 leaf = root->subs;
498 while(leaf){
499 if(leaf->context == context){
500 db->subscription_count--;
501 if(leaf->prev){
502 leaf->prev->next = leaf->next;
503 }else{
504 root->subs = leaf->next;
505 }
506 if(leaf->next){
507 leaf->next->prev = leaf->prev;
508 }
509 next = leaf->next;
510 _mosquitto_free(leaf);
511 leaf = next;
512 }else{
513 leaf = leaf->next;
514 }
515 }
516
517 child = root->children;
518 while(child){
519 _subs_clean_session(db, context, child);
520 if(!child->children && !child->subs && !child->retained){
521 if(last){
522 last->next = child->next;
523 }else{
524 root->children = child->next;
525 }
526 _mosquitto_free(child->topic);
527 _mosquitto_free(child);
528 if(last){
529 child = last->next;
530 }else{
531 child = root->children;
532 }
533 }else{
534 last = child;
535 child = child->next;
536 }
537 }
538 return rc;
539}
540
541/* Remove all subscriptions for a client.
542 */
543int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root)
544{
545 struct _mosquitto_subhier *child;
546
547 child = root->children;
548 while(child){
549 _subs_clean_session(db, context, child);
550 child = child->next;
551 }
552
553 return MOSQ_ERR_SUCCESS;
554}
555
556void mqtt3_sub_tree_print(struct _mosquitto_subhier *root, int level)
557{
558 int i;
559 struct _mosquitto_subhier *branch;
560 struct _mosquitto_subleaf *leaf;
561
562 for(i=0; i<level*2; i++){
563 printf(" ");
564 }
565 printf("%s", root->topic);
566 leaf = root->subs;
567 while(leaf){
568 if(leaf->context){
569 printf(" (%s, %d)", leaf->context->id, leaf->qos);
570 }else{
571 printf(" (%s, %d)", "", leaf->qos);
572 }
573 leaf = leaf->next;
574 }
575 if(root->retained){
576 printf(" (r)");
577 }
578 printf("\n");
579
580 branch = root->children;
581 while(branch){
582 mqtt3_sub_tree_print(branch, level+1);
583 branch = branch->next;
584 }
585}
586
587static int _retain_process(struct mosquitto_db *db, struct mosquitto_msg_store *retained, struct mosquitto *context, const char *sub, int sub_qos)
588{
589 int rc = 0;
590 int qos;
591 uint16_t mid;
592
593 rc = mosquitto_acl_check(db, context, retained->msg.topic, MOSQ_ACL_READ);
594 if(rc == MOSQ_ERR_ACL_DENIED){
595 return MOSQ_ERR_SUCCESS;
596 }else if(rc != MOSQ_ERR_SUCCESS){
597 return rc;
598 }
599
600 qos = retained->msg.qos;
601
602 if(qos > sub_qos) qos = sub_qos;
603 if(qos > 0){
604 mid = _mosquitto_mid_generate(context);
605 }else{
606 mid = 0;
607 }
608 return mqtt3_db_message_insert(db, context, mid, mosq_md_out, qos, true, retained);
609}
610
611static int _retain_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, struct mosquitto *context, const char *sub, int sub_qos, int level)
612{
613 struct _mosquitto_subhier *branch;
614 int flag = 0;
615
616 branch = subhier->children;
617 while(branch){
618 /* Subscriptions with wildcards in aren't really valid topics to publish to
619 * so they can't have retained messages.
620 */
621 if(!strcmp(tokens->topic, "#") && !tokens->next){
622 /* Set flag to indicate that we should check for retained messages
623 * on "foo" when we are subscribing to e.g. "foo/#" and then exit
624 * this function and return to an earlier _retain_search().
625 */
626 flag = -1;
627 if(branch->retained){
628 _retain_process(db, branch->retained, context, sub, sub_qos);
629 }
630 if(branch->children){
631 _retain_search(db, branch, tokens, context, sub, sub_qos, level+1);
632 }
633 }else if(strcmp(branch->topic, "+") && (!strcmp(branch->topic, tokens->topic) || !strcmp(tokens->topic, "+"))){
634 if(tokens->next){
635 if(_retain_search(db, branch, tokens->next, context, sub, sub_qos, level+1) == -1
636 || (!branch->next && tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){
637
638 if(branch->retained){
639 _retain_process(db, branch->retained, context, sub, sub_qos);
640 }
641 }
642 }else{
643 if(branch->retained){
644 _retain_process(db, branch->retained, context, sub, sub_qos);
645 }
646 }
647 }
648
649 branch = branch->next;
650 }
651 return flag;
652}
653
654int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos)
655{
656 struct _mosquitto_subhier *subhier;
657 struct _sub_token *tokens = NULL, *tail;
658
659 assert(db);
660 assert(context);
661 assert(sub);
662
663 if(_sub_topic_tokenise(sub, &tokens)) return 1;
664
665 subhier = db->subs.children;
666 while(subhier){
667 if(!strcmp(subhier->topic, tokens->topic)){
668 _retain_search(db, subhier, tokens, context, sub, sub_qos, 0);
669 break;
670 }
671 subhier = subhier->next;
672 }
673 while(tokens){
674 tail = tokens->next;
675 _mosquitto_free(tokens->topic);
676 _mosquitto_free(tokens);
677 tokens = tail;
678 }
679
680 return MOSQ_ERR_SUCCESS;
681}
682