UNPKG

23.2 kBtext/x-cView Raw
1/*
2Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#include <assert.h>
31#include <stdio.h>
32
33#include <config.h>
34
35#include <mosquitto_broker.h>
36#include <memory_mosq.h>
37#include <send_mosq.h>
38#include <time_mosq.h>
39
40static int max_inflight = 20;
41static int max_queued = 100;
42#ifdef WITH_SYS_TREE
43extern unsigned long g_msgs_dropped;
44#endif
45
46int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
47{
48 int rc = 0;
49 struct _mosquitto_subhier *child;
50
51 if(!config || !db) return MOSQ_ERR_INVAL;
52
53 db->last_db_id = 0;
54
55 db->context_count = 1;
56 db->contexts = _mosquitto_malloc(sizeof(struct mosquitto*)*db->context_count);
57 if(!db->contexts) return MOSQ_ERR_NOMEM;
58 db->contexts[0] = NULL;
59 // Initialize the hashtable
60 db->clientid_index_hash = NULL;
61
62 db->subs.next = NULL;
63 db->subs.subs = NULL;
64 db->subs.topic = "";
65
66 child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
67 if(!child){
68 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
69 return MOSQ_ERR_NOMEM;
70 }
71 child->next = NULL;
72 child->topic = _mosquitto_strdup("");
73 if(!child->topic){
74 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
75 return MOSQ_ERR_NOMEM;
76 }
77 child->subs = NULL;
78 child->children = NULL;
79 child->retained = NULL;
80 db->subs.children = child;
81
82 child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
83 if(!child){
84 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
85 return MOSQ_ERR_NOMEM;
86 }
87 child->next = NULL;
88 child->topic = _mosquitto_strdup("$SYS");
89 if(!child->topic){
90 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
91 return MOSQ_ERR_NOMEM;
92 }
93 child->subs = NULL;
94 child->children = NULL;
95 child->retained = NULL;
96 db->subs.children->next = child;
97
98 db->unpwd = NULL;
99
100#ifdef WITH_PERSISTENCE
101 if(config->persistence && config->persistence_filepath){
102 if(mqtt3_db_restore(db)) return 1;
103 }
104#endif
105
106 return rc;
107}
108
109static void subhier_clean(struct _mosquitto_subhier *subhier)
110{
111 struct _mosquitto_subhier *next;
112 struct _mosquitto_subleaf *leaf, *nextleaf;
113
114 while(subhier){
115 next = subhier->next;
116 leaf = subhier->subs;
117 while(leaf){
118 nextleaf = leaf->next;
119 _mosquitto_free(leaf);
120 leaf = nextleaf;
121 }
122 if(subhier->retained){
123 subhier->retained->ref_count--;
124 }
125 subhier_clean(subhier->children);
126 if(subhier->topic) _mosquitto_free(subhier->topic);
127
128 _mosquitto_free(subhier);
129 subhier = next;
130 }
131}
132
133int mqtt3_db_close(struct mosquitto_db *db)
134{
135 subhier_clean(db->subs.children);
136 mqtt3_db_store_clean(db);
137
138 return MOSQ_ERR_SUCCESS;
139}
140
141/* Returns the number of client currently in the database.
142 * This includes inactive clients.
143 * Returns 1 on failure (count is NULL)
144 * Returns 0 on success.
145 */
146int mqtt3_db_client_count(struct mosquitto_db *db, unsigned int *count, unsigned int *inactive_count)
147{
148 int i;
149
150 if(!db || !count || !inactive_count) return MOSQ_ERR_INVAL;
151
152 *count = 0;
153 *inactive_count = 0;
154 for(i=0; i<db->context_count; i++){
155 if(db->contexts[i]){
156 (*count)++;
157 if(db->contexts[i]->sock == INVALID_SOCKET){
158 (*inactive_count)++;
159 }
160 }
161 }
162
163 return MOSQ_ERR_SUCCESS;
164}
165
166static void _message_remove(struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
167{
168 if(!context || !msg || !(*msg)){
169 return;
170 }
171
172 /* FIXME - it would be nice to be able to remove the stored message here if ref_count==0 */
173 (*msg)->store->ref_count--;
174 if(last){
175 last->next = (*msg)->next;
176 if(!last->next){
177 context->last_msg = last;
178 }
179 }else{
180 context->msgs = (*msg)->next;
181 if(!context->msgs){
182 context->last_msg = NULL;
183 }
184 }
185 context->msg_count--;
186 if((*msg)->qos > 0){
187 context->msg_count12--;
188 }
189 _mosquitto_free(*msg);
190 if(last){
191 *msg = last->next;
192 }else{
193 *msg = context->msgs;
194 }
195}
196
197int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
198{
199 struct mosquitto_client_msg *tail, *last = NULL;
200 int msg_index = 0;
201 bool deleted = false;
202
203 if(!context) return MOSQ_ERR_INVAL;
204
205 tail = context->msgs;
206 while(tail){
207 msg_index++;
208 if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
209 tail->timestamp = mosquitto_time();
210 if(tail->direction == mosq_md_out){
211 switch(tail->qos){
212 case 0:
213 tail->state = mosq_ms_publish_qos0;
214 break;
215 case 1:
216 tail->state = mosq_ms_publish_qos1;
217 break;
218 case 2:
219 tail->state = mosq_ms_publish_qos2;
220 break;
221 }
222 }else{
223 if(tail->qos == 2){
224 tail->state = mosq_ms_wait_for_pubrel;
225 }
226 }
227 }
228 if(tail->mid == mid && tail->direction == dir){
229 msg_index--;
230 _message_remove(context, &tail, last);
231 deleted = true;
232 }else{
233 last = tail;
234 tail = tail->next;
235 }
236 if(msg_index > max_inflight && deleted){
237 return MOSQ_ERR_SUCCESS;
238 }
239 }
240
241 return MOSQ_ERR_SUCCESS;
242}
243
244int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored)
245{
246 struct mosquitto_client_msg *msg;
247 enum mosquitto_msg_state state = mosq_ms_invalid;
248 int rc = 0;
249 int i;
250 char **dest_ids;
251
252 assert(stored);
253 if(!context) return MOSQ_ERR_INVAL;
254
255 /* Check whether we've already sent this message to this client
256 * for outgoing messages only.
257 * If retain==true then this is a stale retained message and so should be
258 * sent regardless. FIXME - this does mean retained messages will received
259 * multiple times for overlapping subscriptions, although this is only the
260 * case for SUBSCRIPTION with multiple subs in so is a minor concern.
261 */
262 if(db->config->allow_duplicate_messages == false
263 && dir == mosq_md_out && retain == false && stored->dest_ids){
264
265 for(i=0; i<stored->dest_id_count; i++){
266 if(!strcmp(stored->dest_ids[i], context->id)){
267 /* We have already sent this message to this client. */
268 return MOSQ_ERR_SUCCESS;
269 }
270 }
271 }
272 if(context->sock == INVALID_SOCKET){
273 /* Client is not connected only queue messages with QoS>0. */
274 if(qos == 0 && !db->config->queue_qos0_messages){
275 if(!context->bridge){
276 return 2;
277 }else{
278 if(context->bridge->start_type != bst_lazy){
279 return 2;
280 }
281 }
282 }
283 }
284
285 if(context->sock != INVALID_SOCKET){
286 if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
287 if(dir == mosq_md_out){
288 switch(qos){
289 case 0:
290 state = mosq_ms_publish_qos0;
291 break;
292 case 1:
293 state = mosq_ms_publish_qos1;
294 break;
295 case 2:
296 state = mosq_ms_publish_qos2;
297 break;
298 }
299 }else{
300 if(qos == 2){
301 state = mosq_ms_wait_for_pubrel;
302 }else{
303 return 1;
304 }
305 }
306 }else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){
307 state = mosq_ms_queued;
308 rc = 2;
309 }else{
310 /* Dropping message due to full queue. */
311 if(context->is_dropping == false){
312 context->is_dropping = true;
313 _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
314 "Outgoing messages are being dropped for client %s.",
315 context->id);
316 }
317#ifdef WITH_SYS_TREE
318 g_msgs_dropped++;
319#endif
320 return 2;
321 }
322 }else{
323 if(max_queued > 0 && context->msg_count12 >= max_queued){
324#ifdef WITH_SYS_TREE
325 g_msgs_dropped++;
326#endif
327 if(context->is_dropping == false){
328 context->is_dropping = true;
329 _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
330 "Outgoing messages are being dropped for client %s.",
331 context->id);
332 }
333 return 2;
334 }else{
335 state = mosq_ms_queued;
336 }
337 }
338 assert(state != mosq_ms_invalid);
339
340#ifdef WITH_PERSISTENCE
341 if(state == mosq_ms_queued){
342 db->persistence_changes++;
343 }
344#endif
345
346 msg = _mosquitto_malloc(sizeof(struct mosquitto_client_msg));
347 if(!msg) return MOSQ_ERR_NOMEM;
348 msg->next = NULL;
349 msg->store = stored;
350 msg->store->ref_count++;
351 msg->mid = mid;
352 msg->timestamp = mosquitto_time();
353 msg->direction = dir;
354 msg->state = state;
355 msg->dup = false;
356 msg->qos = qos;
357 msg->retain = retain;
358 if(context->last_msg){
359 context->last_msg->next = msg;
360 context->last_msg = msg;
361 }else{
362 context->msgs = msg;
363 context->last_msg = msg;
364 }
365 context->msg_count++;
366 if(qos > 0){
367 context->msg_count12++;
368 }
369
370 if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
371 /* Record which client ids this message has been sent to so we can avoid duplicates.
372 * Outgoing messages only.
373 * If retain==true then this is a stale retained message and so should be
374 * sent regardless. FIXME - this does mean retained messages will received
375 * multiple times for overlapping subscriptions, although this is only the
376 * case for SUBSCRIPTION with multiple subs in so is a minor concern.
377 */
378 dest_ids = _mosquitto_realloc(stored->dest_ids, sizeof(char *)*(stored->dest_id_count+1));
379 if(dest_ids){
380 stored->dest_ids = dest_ids;
381 stored->dest_id_count++;
382 stored->dest_ids[stored->dest_id_count-1] = _mosquitto_strdup(context->id);
383 if(!stored->dest_ids[stored->dest_id_count-1]){
384 return MOSQ_ERR_NOMEM;
385 }
386 }else{
387 return MOSQ_ERR_NOMEM;
388 }
389 }
390#ifdef WITH_BRIDGE
391 if(context->bridge && context->bridge->start_type == bst_lazy
392 && context->sock == INVALID_SOCKET
393 && context->msg_count >= context->bridge->threshold){
394
395 context->bridge->lazy_reconnect = true;
396 }
397#endif
398
399 return rc;
400}
401
402int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state)
403{
404 struct mosquitto_client_msg *tail;
405
406 tail = context->msgs;
407 while(tail){
408 if(tail->mid == mid && tail->direction == dir){
409 tail->state = state;
410 tail->timestamp = mosquitto_time();
411 return MOSQ_ERR_SUCCESS;
412 }
413 tail = tail->next;
414 }
415 return 1;
416}
417
418int mqtt3_db_messages_delete(struct mosquitto *context)
419{
420 struct mosquitto_client_msg *tail, *next;
421
422 if(!context) return MOSQ_ERR_INVAL;
423
424 tail = context->msgs;
425 while(tail){
426 /* FIXME - it would be nice to be able to remove the stored message here if rec_count==0 */
427 tail->store->ref_count--;
428 next = tail->next;
429 _mosquitto_free(tail);
430 tail = next;
431 }
432 context->msgs = NULL;
433 context->last_msg = NULL;
434 context->msg_count = 0;
435 context->msg_count12 = 0;
436
437 return MOSQ_ERR_SUCCESS;
438}
439
440int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain)
441{
442 struct mosquitto_msg_store *stored;
443 char *source_id;
444
445 assert(db);
446
447 if(!topic) return MOSQ_ERR_INVAL;
448
449 if(context){
450 source_id = context->id;
451 }else{
452 source_id = "";
453 }
454 if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1;
455
456 return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, stored);
457}
458
459int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
460{
461 struct mosquitto_msg_store *temp;
462
463 assert(db);
464 assert(stored);
465
466 temp = _mosquitto_malloc(sizeof(struct mosquitto_msg_store));
467 if(!temp) return MOSQ_ERR_NOMEM;
468
469 temp->next = db->msg_store;
470 temp->ref_count = 0;
471 if(source){
472 temp->source_id = _mosquitto_strdup(source);
473 }else{
474 temp->source_id = _mosquitto_strdup("");
475 }
476 if(!temp->source_id){
477 _mosquitto_free(temp);
478 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
479 return MOSQ_ERR_NOMEM;
480 }
481 temp->source_mid = source_mid;
482 temp->msg.mid = 0;
483 temp->msg.qos = qos;
484 temp->msg.retain = retain;
485 if(topic){
486 temp->msg.topic = _mosquitto_strdup(topic);
487 if(!temp->msg.topic){
488 _mosquitto_free(temp->source_id);
489 _mosquitto_free(temp);
490 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
491 return MOSQ_ERR_NOMEM;
492 }
493 }else{
494 temp->msg.topic = NULL;
495 }
496 temp->msg.payloadlen = payloadlen;
497 if(payloadlen){
498 temp->msg.payload = _mosquitto_malloc(sizeof(char)*payloadlen);
499 if(!temp->msg.payload){
500 if(temp->source_id) _mosquitto_free(temp->source_id);
501 if(temp->msg.topic) _mosquitto_free(temp->msg.topic);
502 if(temp->msg.payload) _mosquitto_free(temp->msg.payload);
503 _mosquitto_free(temp);
504 return MOSQ_ERR_NOMEM;
505 }
506 memcpy(temp->msg.payload, payload, sizeof(char)*payloadlen);
507 }else{
508 temp->msg.payload = NULL;
509 }
510
511 if(!temp->source_id || (payloadlen && !temp->msg.payload)){
512 if(temp->source_id) _mosquitto_free(temp->source_id);
513 if(temp->msg.topic) _mosquitto_free(temp->msg.topic);
514 if(temp->msg.payload) _mosquitto_free(temp->msg.payload);
515 _mosquitto_free(temp);
516 return 1;
517 }
518 temp->dest_ids = NULL;
519 temp->dest_id_count = 0;
520 db->msg_store_count++;
521 db->msg_store = temp;
522 (*stored) = temp;
523
524 if(!store_id){
525 temp->db_id = ++db->last_db_id;
526 }else{
527 temp->db_id = store_id;
528 }
529
530 return MOSQ_ERR_SUCCESS;
531}
532
533int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
534{
535 struct mosquitto_client_msg *tail;
536
537 if(!context) return MOSQ_ERR_INVAL;
538
539 *stored = NULL;
540 tail = context->msgs;
541 while(tail){
542 if(tail->store->source_mid == mid && tail->direction == mosq_md_in){
543 *stored = tail->store;
544 return MOSQ_ERR_SUCCESS;
545 }
546 tail = tail->next;
547 }
548
549 return 1;
550}
551
552/* Called on reconnect to set outgoing messages to a sensible state and force a
553 * retry, and to set incoming messages to expect an appropriate retry. */
554int mqtt3_db_message_reconnect_reset(struct mosquitto *context)
555{
556 struct mosquitto_client_msg *msg;
557 struct mosquitto_client_msg *prev = NULL;
558 int count;
559
560 msg = context->msgs;
561 context->msg_count = 0;
562 context->msg_count12 = 0;
563 while(msg){
564 context->last_msg = msg;
565
566 context->msg_count++;
567 if(msg->qos > 0){
568 context->msg_count12++;
569 }
570
571 if(msg->direction == mosq_md_out){
572 if(msg->state != mosq_ms_queued){
573 switch(msg->qos){
574 case 0:
575 msg->state = mosq_ms_publish_qos0;
576 break;
577 case 1:
578 msg->state = mosq_ms_publish_qos1;
579 break;
580 case 2:
581 if(msg->state == mosq_ms_wait_for_pubcomp){
582 msg->state = mosq_ms_resend_pubrel;
583 }else{
584 msg->state = mosq_ms_publish_qos2;
585 }
586 break;
587 }
588 }
589 }else{
590 if(msg->qos != 2){
591 /* Anything <QoS 2 can be completely retried by the client at
592 * no harm. */
593 _message_remove(context, &msg, prev);
594 }else{
595 /* Message state can be preserved here because it should match
596 * whatever the client has got. */
597 }
598 }
599 prev = msg;
600 if(msg) msg = msg->next;
601 }
602 /* Messages received when the client was disconnected are put
603 * in the mosq_ms_queued state. If we don't change them to the
604 * appropriate "publish" state, then the queued messages won't
605 * get sent until the client next receives a message - and they
606 * will be sent out of order.
607 */
608 if(context->msgs){
609 count = 0;
610 msg = context->msgs;
611 while(msg && (max_inflight == 0 || count < max_inflight)){
612 if(msg->state == mosq_ms_queued){
613 switch(msg->qos){
614 case 0:
615 msg->state = mosq_ms_publish_qos0;
616 break;
617 case 1:
618 msg->state = mosq_ms_publish_qos1;
619 break;
620 case 2:
621 msg->state = mosq_ms_publish_qos2;
622 break;
623 }
624 }
625 msg = msg->next;
626 count++;
627 }
628 }
629
630 return MOSQ_ERR_SUCCESS;
631}
632
633int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout)
634{
635 int i;
636 time_t threshold;
637 enum mosquitto_msg_state new_state = mosq_ms_invalid;
638 struct mosquitto *context;
639 struct mosquitto_client_msg *msg;
640
641 threshold = mosquitto_time() - timeout;
642
643 for(i=0; i<db->context_count; i++){
644 context = db->contexts[i];
645 if(!context) continue;
646
647 msg = context->msgs;
648 while(msg){
649 if(msg->timestamp < threshold && msg->state != mosq_ms_queued){
650 switch(msg->state){
651 case mosq_ms_wait_for_puback:
652 new_state = mosq_ms_publish_qos1;
653 break;
654 case mosq_ms_wait_for_pubrec:
655 new_state = mosq_ms_publish_qos2;
656 break;
657 case mosq_ms_wait_for_pubrel:
658 new_state = mosq_ms_send_pubrec;
659 break;
660 case mosq_ms_wait_for_pubcomp:
661 new_state = mosq_ms_resend_pubrel;
662 break;
663 default:
664 break;
665 }
666 if(new_state != mosq_ms_invalid){
667 msg->timestamp = mosquitto_time();
668 msg->state = new_state;
669 msg->dup = true;
670 }
671 }
672 msg = msg->next;
673 }
674 }
675
676 return MOSQ_ERR_SUCCESS;
677}
678
679int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
680{
681 struct mosquitto_client_msg *tail, *last = NULL;
682 int qos;
683 int retain;
684 char *topic;
685 char *source_id;
686 int msg_index = 0;
687 bool deleted = false;
688
689 if(!context) return MOSQ_ERR_INVAL;
690
691 tail = context->msgs;
692 while(tail){
693 msg_index++;
694 if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
695 tail->timestamp = mosquitto_time();
696 if(tail->direction == mosq_md_out){
697 switch(tail->qos){
698 case 0:
699 tail->state = mosq_ms_publish_qos0;
700 break;
701 case 1:
702 tail->state = mosq_ms_publish_qos1;
703 break;
704 case 2:
705 tail->state = mosq_ms_publish_qos2;
706 break;
707 }
708 }else{
709 if(tail->qos == 2){
710 _mosquitto_send_pubrec(context, tail->mid);
711 tail->state = mosq_ms_wait_for_pubrel;
712 }
713 }
714 }
715 if(tail->mid == mid && tail->direction == dir){
716 qos = tail->store->msg.qos;
717 topic = tail->store->msg.topic;
718 retain = tail->retain;
719 source_id = tail->store->source_id;
720
721 /* topic==NULL should be a QoS 2 message that was
722 * denied/dropped and is being processed so the client doesn't
723 * keep resending it. That means we don't send it to other
724 * clients. */
725 if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, tail->store)){
726 _message_remove(context, &tail, last);
727 deleted = true;
728 }else{
729 return 1;
730 }
731 }else{
732 last = tail;
733 tail = tail->next;
734 }
735 if(msg_index > max_inflight && deleted){
736 return MOSQ_ERR_SUCCESS;
737 }
738 }
739 if(deleted){
740 return MOSQ_ERR_SUCCESS;
741 }else{
742 return 1;
743 }
744}
745
746int mqtt3_db_message_write(struct mosquitto *context)
747{
748 int rc;
749 struct mosquitto_client_msg *tail, *last = NULL;
750 uint16_t mid;
751 int retries;
752 int retain;
753 const char *topic;
754 int qos;
755 uint32_t payloadlen;
756 const void *payload;
757 int msg_count = 0;
758
759 if(!context || context->sock == -1
760 || (context->state == mosq_cs_connected && !context->id)){
761 return MOSQ_ERR_INVAL;
762 }
763
764 tail = context->msgs;
765 while(tail){
766 if(tail->direction == mosq_md_in){
767 msg_count++;
768 }
769 if(tail->state != mosq_ms_queued){
770 mid = tail->mid;
771 retries = tail->dup;
772 retain = tail->retain;
773 topic = tail->store->msg.topic;
774 qos = tail->qos;
775 payloadlen = tail->store->msg.payloadlen;
776 payload = tail->store->msg.payload;
777
778 switch(tail->state){
779 case mosq_ms_publish_qos0:
780 rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
781 if(!rc){
782 _message_remove(context, &tail, last);
783 }else{
784 return rc;
785 }
786 break;
787
788 case mosq_ms_publish_qos1:
789 rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
790 if(!rc){
791 tail->timestamp = mosquitto_time();
792 tail->dup = 1; /* Any retry attempts are a duplicate. */
793 tail->state = mosq_ms_wait_for_puback;
794 }else{
795 return rc;
796 }
797 last = tail;
798 tail = tail->next;
799 break;
800
801 case mosq_ms_publish_qos2:
802 rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
803 if(!rc){
804 tail->timestamp = mosquitto_time();
805 tail->dup = 1; /* Any retry attempts are a duplicate. */
806 tail->state = mosq_ms_wait_for_pubrec;
807 }else{
808 return rc;
809 }
810 last = tail;
811 tail = tail->next;
812 break;
813
814 case mosq_ms_send_pubrec:
815 rc = _mosquitto_send_pubrec(context, mid);
816 if(!rc){
817 tail->state = mosq_ms_wait_for_pubrel;
818 }else{
819 return rc;
820 }
821 last = tail;
822 tail = tail->next;
823 break;
824
825 case mosq_ms_resend_pubrel:
826 rc = _mosquitto_send_pubrel(context, mid, true);
827 if(!rc){
828 tail->state = mosq_ms_wait_for_pubcomp;
829 }else{
830 return rc;
831 }
832 last = tail;
833 tail = tail->next;
834 break;
835
836 case mosq_ms_resend_pubcomp:
837 rc = _mosquitto_send_pubcomp(context, mid);
838 if(!rc){
839 tail->state = mosq_ms_wait_for_pubrel;
840 }else{
841 return rc;
842 }
843 last = tail;
844 tail = tail->next;
845 break;
846
847 default:
848 last = tail;
849 tail = tail->next;
850 break;
851 }
852 }else{
853 /* state == mosq_ms_queued */
854 if(tail->direction == mosq_md_in && (max_inflight == 0 || msg_count < max_inflight)){
855 if(tail->qos == 2){
856 tail->state = mosq_ms_send_pubrec;
857 }
858 }else{
859 last = tail;
860 tail = tail->next;
861 }
862 }
863 }
864
865 return MOSQ_ERR_SUCCESS;
866}
867
868void mqtt3_db_store_clean(struct mosquitto_db *db)
869{
870 /* FIXME - this may not be necessary if checks are made when messages are removed. */
871 struct mosquitto_msg_store *tail, *last = NULL;
872 int i;
873 assert(db);
874
875 tail = db->msg_store;
876 while(tail){
877 if(tail->ref_count == 0){
878 if(tail->source_id) _mosquitto_free(tail->source_id);
879 if(tail->dest_ids){
880 for(i=0; i<tail->dest_id_count; i++){
881 if(tail->dest_ids[i]) _mosquitto_free(tail->dest_ids[i]);
882 }
883 _mosquitto_free(tail->dest_ids);
884 }
885 if(tail->msg.topic) _mosquitto_free(tail->msg.topic);
886 if(tail->msg.payload) _mosquitto_free(tail->msg.payload);
887 if(last){
888 last->next = tail->next;
889 _mosquitto_free(tail);
890 tail = last->next;
891 }else{
892 db->msg_store = tail->next;
893 _mosquitto_free(tail);
894 tail = db->msg_store;
895 }
896 db->msg_store_count--;
897 }else{
898 last = tail;
899 tail = tail->next;
900 }
901 }
902}
903
904void mqtt3_db_limits_set(int inflight, int queued)
905{
906 max_inflight = inflight;
907 max_queued = queued;
908}
909
910void mqtt3_db_vacuum(void)
911{
912 /* FIXME - reimplement? */
913}
914