1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #include <assert.h>
|
31 | #include <stdio.h>
|
32 |
|
33 | #include <config.h>
|
34 |
|
35 | #include <mosquitto_broker.h>
|
36 | #include <memory_mosq.h>
|
37 | #include <send_mosq.h>
|
38 | #include <time_mosq.h>
|
39 |
|
40 | static int max_inflight = 20;
|
41 | static int max_queued = 100;
|
42 | #ifdef WITH_SYS_TREE
|
43 | extern unsigned long g_msgs_dropped;
|
44 | #endif
|
45 |
|
46 | int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
|
47 | {
|
48 | int rc = 0;
|
49 | struct _mosquitto_subhier *child;
|
50 |
|
51 | if(!config || !db) return MOSQ_ERR_INVAL;
|
52 |
|
53 | db->last_db_id = 0;
|
54 |
|
55 | db->context_count = 1;
|
56 | db->contexts = _mosquitto_malloc(sizeof(struct mosquitto*)*db->context_count);
|
57 | if(!db->contexts) return MOSQ_ERR_NOMEM;
|
58 | db->contexts[0] = NULL;
|
59 |
|
60 | db->clientid_index_hash = NULL;
|
61 |
|
62 | db->subs.next = NULL;
|
63 | db->subs.subs = NULL;
|
64 | db->subs.topic = "";
|
65 |
|
66 | child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
|
67 | if(!child){
|
68 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
69 | return MOSQ_ERR_NOMEM;
|
70 | }
|
71 | child->next = NULL;
|
72 | child->topic = _mosquitto_strdup("");
|
73 | if(!child->topic){
|
74 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
75 | return MOSQ_ERR_NOMEM;
|
76 | }
|
77 | child->subs = NULL;
|
78 | child->children = NULL;
|
79 | child->retained = NULL;
|
80 | db->subs.children = child;
|
81 |
|
82 | child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
|
83 | if(!child){
|
84 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
85 | return MOSQ_ERR_NOMEM;
|
86 | }
|
87 | child->next = NULL;
|
88 | child->topic = _mosquitto_strdup("$SYS");
|
89 | if(!child->topic){
|
90 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
91 | return MOSQ_ERR_NOMEM;
|
92 | }
|
93 | child->subs = NULL;
|
94 | child->children = NULL;
|
95 | child->retained = NULL;
|
96 | db->subs.children->next = child;
|
97 |
|
98 | db->unpwd = NULL;
|
99 |
|
100 | #ifdef WITH_PERSISTENCE
|
101 | if(config->persistence && config->persistence_filepath){
|
102 | if(mqtt3_db_restore(db)) return 1;
|
103 | }
|
104 | #endif
|
105 |
|
106 | return rc;
|
107 | }
|
108 |
|
109 | static void subhier_clean(struct _mosquitto_subhier *subhier)
|
110 | {
|
111 | struct _mosquitto_subhier *next;
|
112 | struct _mosquitto_subleaf *leaf, *nextleaf;
|
113 |
|
114 | while(subhier){
|
115 | next = subhier->next;
|
116 | leaf = subhier->subs;
|
117 | while(leaf){
|
118 | nextleaf = leaf->next;
|
119 | _mosquitto_free(leaf);
|
120 | leaf = nextleaf;
|
121 | }
|
122 | if(subhier->retained){
|
123 | subhier->retained->ref_count--;
|
124 | }
|
125 | subhier_clean(subhier->children);
|
126 | if(subhier->topic) _mosquitto_free(subhier->topic);
|
127 |
|
128 | _mosquitto_free(subhier);
|
129 | subhier = next;
|
130 | }
|
131 | }
|
132 |
|
133 | int mqtt3_db_close(struct mosquitto_db *db)
|
134 | {
|
135 | subhier_clean(db->subs.children);
|
136 | mqtt3_db_store_clean(db);
|
137 |
|
138 | return MOSQ_ERR_SUCCESS;
|
139 | }
|
140 |
|
141 |
|
142 |
|
143 |
|
144 |
|
145 |
|
146 | int mqtt3_db_client_count(struct mosquitto_db *db, unsigned int *count, unsigned int *inactive_count)
|
147 | {
|
148 | int i;
|
149 |
|
150 | if(!db || !count || !inactive_count) return MOSQ_ERR_INVAL;
|
151 |
|
152 | *count = 0;
|
153 | *inactive_count = 0;
|
154 | for(i=0; i<db->context_count; i++){
|
155 | if(db->contexts[i]){
|
156 | (*count)++;
|
157 | if(db->contexts[i]->sock == INVALID_SOCKET){
|
158 | (*inactive_count)++;
|
159 | }
|
160 | }
|
161 | }
|
162 |
|
163 | return MOSQ_ERR_SUCCESS;
|
164 | }
|
165 |
|
166 | static void _message_remove(struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
|
167 | {
|
168 | if(!context || !msg || !(*msg)){
|
169 | return;
|
170 | }
|
171 |
|
172 |
|
173 | (*msg)->store->ref_count--;
|
174 | if(last){
|
175 | last->next = (*msg)->next;
|
176 | if(!last->next){
|
177 | context->last_msg = last;
|
178 | }
|
179 | }else{
|
180 | context->msgs = (*msg)->next;
|
181 | if(!context->msgs){
|
182 | context->last_msg = NULL;
|
183 | }
|
184 | }
|
185 | context->msg_count--;
|
186 | if((*msg)->qos > 0){
|
187 | context->msg_count12--;
|
188 | }
|
189 | _mosquitto_free(*msg);
|
190 | if(last){
|
191 | *msg = last->next;
|
192 | }else{
|
193 | *msg = context->msgs;
|
194 | }
|
195 | }
|
196 |
|
197 | int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
|
198 | {
|
199 | struct mosquitto_client_msg *tail, *last = NULL;
|
200 | int msg_index = 0;
|
201 | bool deleted = false;
|
202 |
|
203 | if(!context) return MOSQ_ERR_INVAL;
|
204 |
|
205 | tail = context->msgs;
|
206 | while(tail){
|
207 | msg_index++;
|
208 | if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
|
209 | tail->timestamp = mosquitto_time();
|
210 | if(tail->direction == mosq_md_out){
|
211 | switch(tail->qos){
|
212 | case 0:
|
213 | tail->state = mosq_ms_publish_qos0;
|
214 | break;
|
215 | case 1:
|
216 | tail->state = mosq_ms_publish_qos1;
|
217 | break;
|
218 | case 2:
|
219 | tail->state = mosq_ms_publish_qos2;
|
220 | break;
|
221 | }
|
222 | }else{
|
223 | if(tail->qos == 2){
|
224 | tail->state = mosq_ms_wait_for_pubrel;
|
225 | }
|
226 | }
|
227 | }
|
228 | if(tail->mid == mid && tail->direction == dir){
|
229 | msg_index--;
|
230 | _message_remove(context, &tail, last);
|
231 | deleted = true;
|
232 | }else{
|
233 | last = tail;
|
234 | tail = tail->next;
|
235 | }
|
236 | if(msg_index > max_inflight && deleted){
|
237 | return MOSQ_ERR_SUCCESS;
|
238 | }
|
239 | }
|
240 |
|
241 | return MOSQ_ERR_SUCCESS;
|
242 | }
|
243 |
|
244 | int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored)
|
245 | {
|
246 | struct mosquitto_client_msg *msg;
|
247 | enum mosquitto_msg_state state = mosq_ms_invalid;
|
248 | int rc = 0;
|
249 | int i;
|
250 | char **dest_ids;
|
251 |
|
252 | assert(stored);
|
253 | if(!context) return MOSQ_ERR_INVAL;
|
254 |
|
255 | |
256 |
|
257 |
|
258 |
|
259 |
|
260 |
|
261 |
|
262 | if(db->config->allow_duplicate_messages == false
|
263 | && dir == mosq_md_out && retain == false && stored->dest_ids){
|
264 |
|
265 | for(i=0; i<stored->dest_id_count; i++){
|
266 | if(!strcmp(stored->dest_ids[i], context->id)){
|
267 |
|
268 | return MOSQ_ERR_SUCCESS;
|
269 | }
|
270 | }
|
271 | }
|
272 | if(context->sock == INVALID_SOCKET){
|
273 |
|
274 | if(qos == 0 && !db->config->queue_qos0_messages){
|
275 | if(!context->bridge){
|
276 | return 2;
|
277 | }else{
|
278 | if(context->bridge->start_type != bst_lazy){
|
279 | return 2;
|
280 | }
|
281 | }
|
282 | }
|
283 | }
|
284 |
|
285 | if(context->sock != INVALID_SOCKET){
|
286 | if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
|
287 | if(dir == mosq_md_out){
|
288 | switch(qos){
|
289 | case 0:
|
290 | state = mosq_ms_publish_qos0;
|
291 | break;
|
292 | case 1:
|
293 | state = mosq_ms_publish_qos1;
|
294 | break;
|
295 | case 2:
|
296 | state = mosq_ms_publish_qos2;
|
297 | break;
|
298 | }
|
299 | }else{
|
300 | if(qos == 2){
|
301 | state = mosq_ms_wait_for_pubrel;
|
302 | }else{
|
303 | return 1;
|
304 | }
|
305 | }
|
306 | }else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){
|
307 | state = mosq_ms_queued;
|
308 | rc = 2;
|
309 | }else{
|
310 |
|
311 | if(context->is_dropping == false){
|
312 | context->is_dropping = true;
|
313 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
|
314 | "Outgoing messages are being dropped for client %s.",
|
315 | context->id);
|
316 | }
|
317 | #ifdef WITH_SYS_TREE
|
318 | g_msgs_dropped++;
|
319 | #endif
|
320 | return 2;
|
321 | }
|
322 | }else{
|
323 | if(max_queued > 0 && context->msg_count12 >= max_queued){
|
324 | #ifdef WITH_SYS_TREE
|
325 | g_msgs_dropped++;
|
326 | #endif
|
327 | if(context->is_dropping == false){
|
328 | context->is_dropping = true;
|
329 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
|
330 | "Outgoing messages are being dropped for client %s.",
|
331 | context->id);
|
332 | }
|
333 | return 2;
|
334 | }else{
|
335 | state = mosq_ms_queued;
|
336 | }
|
337 | }
|
338 | assert(state != mosq_ms_invalid);
|
339 |
|
340 | #ifdef WITH_PERSISTENCE
|
341 | if(state == mosq_ms_queued){
|
342 | db->persistence_changes++;
|
343 | }
|
344 | #endif
|
345 |
|
346 | msg = _mosquitto_malloc(sizeof(struct mosquitto_client_msg));
|
347 | if(!msg) return MOSQ_ERR_NOMEM;
|
348 | msg->next = NULL;
|
349 | msg->store = stored;
|
350 | msg->store->ref_count++;
|
351 | msg->mid = mid;
|
352 | msg->timestamp = mosquitto_time();
|
353 | msg->direction = dir;
|
354 | msg->state = state;
|
355 | msg->dup = false;
|
356 | msg->qos = qos;
|
357 | msg->retain = retain;
|
358 | if(context->last_msg){
|
359 | context->last_msg->next = msg;
|
360 | context->last_msg = msg;
|
361 | }else{
|
362 | context->msgs = msg;
|
363 | context->last_msg = msg;
|
364 | }
|
365 | context->msg_count++;
|
366 | if(qos > 0){
|
367 | context->msg_count12++;
|
368 | }
|
369 |
|
370 | if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
|
371 | |
372 |
|
373 |
|
374 |
|
375 |
|
376 |
|
377 |
|
378 | dest_ids = _mosquitto_realloc(stored->dest_ids, sizeof(char *)*(stored->dest_id_count+1));
|
379 | if(dest_ids){
|
380 | stored->dest_ids = dest_ids;
|
381 | stored->dest_id_count++;
|
382 | stored->dest_ids[stored->dest_id_count-1] = _mosquitto_strdup(context->id);
|
383 | if(!stored->dest_ids[stored->dest_id_count-1]){
|
384 | return MOSQ_ERR_NOMEM;
|
385 | }
|
386 | }else{
|
387 | return MOSQ_ERR_NOMEM;
|
388 | }
|
389 | }
|
390 | #ifdef WITH_BRIDGE
|
391 | if(context->bridge && context->bridge->start_type == bst_lazy
|
392 | && context->sock == INVALID_SOCKET
|
393 | && context->msg_count >= context->bridge->threshold){
|
394 |
|
395 | context->bridge->lazy_reconnect = true;
|
396 | }
|
397 | #endif
|
398 |
|
399 | return rc;
|
400 | }
|
401 |
|
402 | int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state)
|
403 | {
|
404 | struct mosquitto_client_msg *tail;
|
405 |
|
406 | tail = context->msgs;
|
407 | while(tail){
|
408 | if(tail->mid == mid && tail->direction == dir){
|
409 | tail->state = state;
|
410 | tail->timestamp = mosquitto_time();
|
411 | return MOSQ_ERR_SUCCESS;
|
412 | }
|
413 | tail = tail->next;
|
414 | }
|
415 | return 1;
|
416 | }
|
417 |
|
418 | int mqtt3_db_messages_delete(struct mosquitto *context)
|
419 | {
|
420 | struct mosquitto_client_msg *tail, *next;
|
421 |
|
422 | if(!context) return MOSQ_ERR_INVAL;
|
423 |
|
424 | tail = context->msgs;
|
425 | while(tail){
|
426 |
|
427 | tail->store->ref_count--;
|
428 | next = tail->next;
|
429 | _mosquitto_free(tail);
|
430 | tail = next;
|
431 | }
|
432 | context->msgs = NULL;
|
433 | context->last_msg = NULL;
|
434 | context->msg_count = 0;
|
435 | context->msg_count12 = 0;
|
436 |
|
437 | return MOSQ_ERR_SUCCESS;
|
438 | }
|
439 |
|
440 | int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain)
|
441 | {
|
442 | struct mosquitto_msg_store *stored;
|
443 | char *source_id;
|
444 |
|
445 | assert(db);
|
446 |
|
447 | if(!topic) return MOSQ_ERR_INVAL;
|
448 |
|
449 | if(context){
|
450 | source_id = context->id;
|
451 | }else{
|
452 | source_id = "";
|
453 | }
|
454 | if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1;
|
455 |
|
456 | return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, stored);
|
457 | }
|
458 |
|
459 | int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
|
460 | {
|
461 | struct mosquitto_msg_store *temp;
|
462 |
|
463 | assert(db);
|
464 | assert(stored);
|
465 |
|
466 | temp = _mosquitto_malloc(sizeof(struct mosquitto_msg_store));
|
467 | if(!temp) return MOSQ_ERR_NOMEM;
|
468 |
|
469 | temp->next = db->msg_store;
|
470 | temp->ref_count = 0;
|
471 | if(source){
|
472 | temp->source_id = _mosquitto_strdup(source);
|
473 | }else{
|
474 | temp->source_id = _mosquitto_strdup("");
|
475 | }
|
476 | if(!temp->source_id){
|
477 | _mosquitto_free(temp);
|
478 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
479 | return MOSQ_ERR_NOMEM;
|
480 | }
|
481 | temp->source_mid = source_mid;
|
482 | temp->msg.mid = 0;
|
483 | temp->msg.qos = qos;
|
484 | temp->msg.retain = retain;
|
485 | if(topic){
|
486 | temp->msg.topic = _mosquitto_strdup(topic);
|
487 | if(!temp->msg.topic){
|
488 | _mosquitto_free(temp->source_id);
|
489 | _mosquitto_free(temp);
|
490 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
491 | return MOSQ_ERR_NOMEM;
|
492 | }
|
493 | }else{
|
494 | temp->msg.topic = NULL;
|
495 | }
|
496 | temp->msg.payloadlen = payloadlen;
|
497 | if(payloadlen){
|
498 | temp->msg.payload = _mosquitto_malloc(sizeof(char)*payloadlen);
|
499 | if(!temp->msg.payload){
|
500 | if(temp->source_id) _mosquitto_free(temp->source_id);
|
501 | if(temp->msg.topic) _mosquitto_free(temp->msg.topic);
|
502 | if(temp->msg.payload) _mosquitto_free(temp->msg.payload);
|
503 | _mosquitto_free(temp);
|
504 | return MOSQ_ERR_NOMEM;
|
505 | }
|
506 | memcpy(temp->msg.payload, payload, sizeof(char)*payloadlen);
|
507 | }else{
|
508 | temp->msg.payload = NULL;
|
509 | }
|
510 |
|
511 | if(!temp->source_id || (payloadlen && !temp->msg.payload)){
|
512 | if(temp->source_id) _mosquitto_free(temp->source_id);
|
513 | if(temp->msg.topic) _mosquitto_free(temp->msg.topic);
|
514 | if(temp->msg.payload) _mosquitto_free(temp->msg.payload);
|
515 | _mosquitto_free(temp);
|
516 | return 1;
|
517 | }
|
518 | temp->dest_ids = NULL;
|
519 | temp->dest_id_count = 0;
|
520 | db->msg_store_count++;
|
521 | db->msg_store = temp;
|
522 | (*stored) = temp;
|
523 |
|
524 | if(!store_id){
|
525 | temp->db_id = ++db->last_db_id;
|
526 | }else{
|
527 | temp->db_id = store_id;
|
528 | }
|
529 |
|
530 | return MOSQ_ERR_SUCCESS;
|
531 | }
|
532 |
|
533 | int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
|
534 | {
|
535 | struct mosquitto_client_msg *tail;
|
536 |
|
537 | if(!context) return MOSQ_ERR_INVAL;
|
538 |
|
539 | *stored = NULL;
|
540 | tail = context->msgs;
|
541 | while(tail){
|
542 | if(tail->store->source_mid == mid && tail->direction == mosq_md_in){
|
543 | *stored = tail->store;
|
544 | return MOSQ_ERR_SUCCESS;
|
545 | }
|
546 | tail = tail->next;
|
547 | }
|
548 |
|
549 | return 1;
|
550 | }
|
551 |
|
552 |
|
553 |
|
554 | int mqtt3_db_message_reconnect_reset(struct mosquitto *context)
|
555 | {
|
556 | struct mosquitto_client_msg *msg;
|
557 | struct mosquitto_client_msg *prev = NULL;
|
558 | int count;
|
559 |
|
560 | msg = context->msgs;
|
561 | context->msg_count = 0;
|
562 | context->msg_count12 = 0;
|
563 | while(msg){
|
564 | context->last_msg = msg;
|
565 |
|
566 | context->msg_count++;
|
567 | if(msg->qos > 0){
|
568 | context->msg_count12++;
|
569 | }
|
570 |
|
571 | if(msg->direction == mosq_md_out){
|
572 | if(msg->state != mosq_ms_queued){
|
573 | switch(msg->qos){
|
574 | case 0:
|
575 | msg->state = mosq_ms_publish_qos0;
|
576 | break;
|
577 | case 1:
|
578 | msg->state = mosq_ms_publish_qos1;
|
579 | break;
|
580 | case 2:
|
581 | if(msg->state == mosq_ms_wait_for_pubcomp){
|
582 | msg->state = mosq_ms_resend_pubrel;
|
583 | }else{
|
584 | msg->state = mosq_ms_publish_qos2;
|
585 | }
|
586 | break;
|
587 | }
|
588 | }
|
589 | }else{
|
590 | if(msg->qos != 2){
|
591 | |
592 |
|
593 | _message_remove(context, &msg, prev);
|
594 | }else{
|
595 | |
596 |
|
597 | }
|
598 | }
|
599 | prev = msg;
|
600 | if(msg) msg = msg->next;
|
601 | }
|
602 | |
603 |
|
604 |
|
605 |
|
606 |
|
607 |
|
608 | if(context->msgs){
|
609 | count = 0;
|
610 | msg = context->msgs;
|
611 | while(msg && (max_inflight == 0 || count < max_inflight)){
|
612 | if(msg->state == mosq_ms_queued){
|
613 | switch(msg->qos){
|
614 | case 0:
|
615 | msg->state = mosq_ms_publish_qos0;
|
616 | break;
|
617 | case 1:
|
618 | msg->state = mosq_ms_publish_qos1;
|
619 | break;
|
620 | case 2:
|
621 | msg->state = mosq_ms_publish_qos2;
|
622 | break;
|
623 | }
|
624 | }
|
625 | msg = msg->next;
|
626 | count++;
|
627 | }
|
628 | }
|
629 |
|
630 | return MOSQ_ERR_SUCCESS;
|
631 | }
|
632 |
|
633 | int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout)
|
634 | {
|
635 | int i;
|
636 | time_t threshold;
|
637 | enum mosquitto_msg_state new_state = mosq_ms_invalid;
|
638 | struct mosquitto *context;
|
639 | struct mosquitto_client_msg *msg;
|
640 |
|
641 | threshold = mosquitto_time() - timeout;
|
642 |
|
643 | for(i=0; i<db->context_count; i++){
|
644 | context = db->contexts[i];
|
645 | if(!context) continue;
|
646 |
|
647 | msg = context->msgs;
|
648 | while(msg){
|
649 | if(msg->timestamp < threshold && msg->state != mosq_ms_queued){
|
650 | switch(msg->state){
|
651 | case mosq_ms_wait_for_puback:
|
652 | new_state = mosq_ms_publish_qos1;
|
653 | break;
|
654 | case mosq_ms_wait_for_pubrec:
|
655 | new_state = mosq_ms_publish_qos2;
|
656 | break;
|
657 | case mosq_ms_wait_for_pubrel:
|
658 | new_state = mosq_ms_send_pubrec;
|
659 | break;
|
660 | case mosq_ms_wait_for_pubcomp:
|
661 | new_state = mosq_ms_resend_pubrel;
|
662 | break;
|
663 | default:
|
664 | break;
|
665 | }
|
666 | if(new_state != mosq_ms_invalid){
|
667 | msg->timestamp = mosquitto_time();
|
668 | msg->state = new_state;
|
669 | msg->dup = true;
|
670 | }
|
671 | }
|
672 | msg = msg->next;
|
673 | }
|
674 | }
|
675 |
|
676 | return MOSQ_ERR_SUCCESS;
|
677 | }
|
678 |
|
679 | int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
|
680 | {
|
681 | struct mosquitto_client_msg *tail, *last = NULL;
|
682 | int qos;
|
683 | int retain;
|
684 | char *topic;
|
685 | char *source_id;
|
686 | int msg_index = 0;
|
687 | bool deleted = false;
|
688 |
|
689 | if(!context) return MOSQ_ERR_INVAL;
|
690 |
|
691 | tail = context->msgs;
|
692 | while(tail){
|
693 | msg_index++;
|
694 | if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
|
695 | tail->timestamp = mosquitto_time();
|
696 | if(tail->direction == mosq_md_out){
|
697 | switch(tail->qos){
|
698 | case 0:
|
699 | tail->state = mosq_ms_publish_qos0;
|
700 | break;
|
701 | case 1:
|
702 | tail->state = mosq_ms_publish_qos1;
|
703 | break;
|
704 | case 2:
|
705 | tail->state = mosq_ms_publish_qos2;
|
706 | break;
|
707 | }
|
708 | }else{
|
709 | if(tail->qos == 2){
|
710 | _mosquitto_send_pubrec(context, tail->mid);
|
711 | tail->state = mosq_ms_wait_for_pubrel;
|
712 | }
|
713 | }
|
714 | }
|
715 | if(tail->mid == mid && tail->direction == dir){
|
716 | qos = tail->store->msg.qos;
|
717 | topic = tail->store->msg.topic;
|
718 | retain = tail->retain;
|
719 | source_id = tail->store->source_id;
|
720 |
|
721 | |
722 |
|
723 |
|
724 |
|
725 | if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, tail->store)){
|
726 | _message_remove(context, &tail, last);
|
727 | deleted = true;
|
728 | }else{
|
729 | return 1;
|
730 | }
|
731 | }else{
|
732 | last = tail;
|
733 | tail = tail->next;
|
734 | }
|
735 | if(msg_index > max_inflight && deleted){
|
736 | return MOSQ_ERR_SUCCESS;
|
737 | }
|
738 | }
|
739 | if(deleted){
|
740 | return MOSQ_ERR_SUCCESS;
|
741 | }else{
|
742 | return 1;
|
743 | }
|
744 | }
|
745 |
|
746 | int mqtt3_db_message_write(struct mosquitto *context)
|
747 | {
|
748 | int rc;
|
749 | struct mosquitto_client_msg *tail, *last = NULL;
|
750 | uint16_t mid;
|
751 | int retries;
|
752 | int retain;
|
753 | const char *topic;
|
754 | int qos;
|
755 | uint32_t payloadlen;
|
756 | const void *payload;
|
757 | int msg_count = 0;
|
758 |
|
759 | if(!context || context->sock == -1
|
760 | || (context->state == mosq_cs_connected && !context->id)){
|
761 | return MOSQ_ERR_INVAL;
|
762 | }
|
763 |
|
764 | tail = context->msgs;
|
765 | while(tail){
|
766 | if(tail->direction == mosq_md_in){
|
767 | msg_count++;
|
768 | }
|
769 | if(tail->state != mosq_ms_queued){
|
770 | mid = tail->mid;
|
771 | retries = tail->dup;
|
772 | retain = tail->retain;
|
773 | topic = tail->store->msg.topic;
|
774 | qos = tail->qos;
|
775 | payloadlen = tail->store->msg.payloadlen;
|
776 | payload = tail->store->msg.payload;
|
777 |
|
778 | switch(tail->state){
|
779 | case mosq_ms_publish_qos0:
|
780 | rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
781 | if(!rc){
|
782 | _message_remove(context, &tail, last);
|
783 | }else{
|
784 | return rc;
|
785 | }
|
786 | break;
|
787 |
|
788 | case mosq_ms_publish_qos1:
|
789 | rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
790 | if(!rc){
|
791 | tail->timestamp = mosquitto_time();
|
792 | tail->dup = 1;
|
793 | tail->state = mosq_ms_wait_for_puback;
|
794 | }else{
|
795 | return rc;
|
796 | }
|
797 | last = tail;
|
798 | tail = tail->next;
|
799 | break;
|
800 |
|
801 | case mosq_ms_publish_qos2:
|
802 | rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
803 | if(!rc){
|
804 | tail->timestamp = mosquitto_time();
|
805 | tail->dup = 1;
|
806 | tail->state = mosq_ms_wait_for_pubrec;
|
807 | }else{
|
808 | return rc;
|
809 | }
|
810 | last = tail;
|
811 | tail = tail->next;
|
812 | break;
|
813 |
|
814 | case mosq_ms_send_pubrec:
|
815 | rc = _mosquitto_send_pubrec(context, mid);
|
816 | if(!rc){
|
817 | tail->state = mosq_ms_wait_for_pubrel;
|
818 | }else{
|
819 | return rc;
|
820 | }
|
821 | last = tail;
|
822 | tail = tail->next;
|
823 | break;
|
824 |
|
825 | case mosq_ms_resend_pubrel:
|
826 | rc = _mosquitto_send_pubrel(context, mid, true);
|
827 | if(!rc){
|
828 | tail->state = mosq_ms_wait_for_pubcomp;
|
829 | }else{
|
830 | return rc;
|
831 | }
|
832 | last = tail;
|
833 | tail = tail->next;
|
834 | break;
|
835 |
|
836 | case mosq_ms_resend_pubcomp:
|
837 | rc = _mosquitto_send_pubcomp(context, mid);
|
838 | if(!rc){
|
839 | tail->state = mosq_ms_wait_for_pubrel;
|
840 | }else{
|
841 | return rc;
|
842 | }
|
843 | last = tail;
|
844 | tail = tail->next;
|
845 | break;
|
846 |
|
847 | default:
|
848 | last = tail;
|
849 | tail = tail->next;
|
850 | break;
|
851 | }
|
852 | }else{
|
853 |
|
854 | if(tail->direction == mosq_md_in && (max_inflight == 0 || msg_count < max_inflight)){
|
855 | if(tail->qos == 2){
|
856 | tail->state = mosq_ms_send_pubrec;
|
857 | }
|
858 | }else{
|
859 | last = tail;
|
860 | tail = tail->next;
|
861 | }
|
862 | }
|
863 | }
|
864 |
|
865 | return MOSQ_ERR_SUCCESS;
|
866 | }
|
867 |
|
868 | void mqtt3_db_store_clean(struct mosquitto_db *db)
|
869 | {
|
870 |
|
871 | struct mosquitto_msg_store *tail, *last = NULL;
|
872 | int i;
|
873 | assert(db);
|
874 |
|
875 | tail = db->msg_store;
|
876 | while(tail){
|
877 | if(tail->ref_count == 0){
|
878 | if(tail->source_id) _mosquitto_free(tail->source_id);
|
879 | if(tail->dest_ids){
|
880 | for(i=0; i<tail->dest_id_count; i++){
|
881 | if(tail->dest_ids[i]) _mosquitto_free(tail->dest_ids[i]);
|
882 | }
|
883 | _mosquitto_free(tail->dest_ids);
|
884 | }
|
885 | if(tail->msg.topic) _mosquitto_free(tail->msg.topic);
|
886 | if(tail->msg.payload) _mosquitto_free(tail->msg.payload);
|
887 | if(last){
|
888 | last->next = tail->next;
|
889 | _mosquitto_free(tail);
|
890 | tail = last->next;
|
891 | }else{
|
892 | db->msg_store = tail->next;
|
893 | _mosquitto_free(tail);
|
894 | tail = db->msg_store;
|
895 | }
|
896 | db->msg_store_count--;
|
897 | }else{
|
898 | last = tail;
|
899 | tail = tail->next;
|
900 | }
|
901 | }
|
902 | }
|
903 |
|
904 | void mqtt3_db_limits_set(int inflight, int queued)
|
905 | {
|
906 | max_inflight = inflight;
|
907 | max_queued = queued;
|
908 | }
|
909 |
|
910 | void mqtt3_db_vacuum(void)
|
911 | {
|
912 |
|
913 | }
|
914 |
|