1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #include <assert.h>
|
31 | #include <stdlib.h>
|
32 | #include <string.h>
|
33 |
|
34 | #include "mosquitto_internal.h"
|
35 | #include "mosquitto.h"
|
36 | #include "memory_mosq.h"
|
37 | #include "messages_mosq.h"
|
38 | #include "send_mosq.h"
|
39 | #include "time_mosq.h"
|
40 |
|
41 | void _mosquitto_message_cleanup(struct mosquitto_message_all **message)
|
42 | {
|
43 | struct mosquitto_message_all *msg;
|
44 |
|
45 | if(!message || !*message) return;
|
46 |
|
47 | msg = *message;
|
48 |
|
49 | if(msg->msg.topic) _mosquitto_free(msg->msg.topic);
|
50 | if(msg->msg.payload) _mosquitto_free(msg->msg.payload);
|
51 | _mosquitto_free(msg);
|
52 | }
|
53 |
|
54 | void _mosquitto_message_cleanup_all(struct mosquitto *mosq)
|
55 | {
|
56 | struct mosquitto_message_all *tmp;
|
57 |
|
58 | assert(mosq);
|
59 |
|
60 | while(mosq->in_messages){
|
61 | tmp = mosq->in_messages->next;
|
62 | _mosquitto_message_cleanup(&mosq->in_messages);
|
63 | mosq->in_messages = tmp;
|
64 | }
|
65 | while(mosq->out_messages){
|
66 | tmp = mosq->out_messages->next;
|
67 | _mosquitto_message_cleanup(&mosq->out_messages);
|
68 | mosq->out_messages = tmp;
|
69 | }
|
70 | }
|
71 |
|
72 | int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto_message *src)
|
73 | {
|
74 | if(!dst || !src) return MOSQ_ERR_INVAL;
|
75 |
|
76 | dst->mid = src->mid;
|
77 | dst->topic = _mosquitto_strdup(src->topic);
|
78 | if(!dst->topic) return MOSQ_ERR_NOMEM;
|
79 | dst->qos = src->qos;
|
80 | dst->retain = src->retain;
|
81 | if(src->payloadlen){
|
82 | dst->payload = _mosquitto_malloc(src->payloadlen);
|
83 | if(!dst->payload){
|
84 | _mosquitto_free(dst->topic);
|
85 | return MOSQ_ERR_NOMEM;
|
86 | }
|
87 | memcpy(dst->payload, src->payload, src->payloadlen);
|
88 | dst->payloadlen = src->payloadlen;
|
89 | }else{
|
90 | dst->payloadlen = 0;
|
91 | dst->payload = NULL;
|
92 | }
|
93 | return MOSQ_ERR_SUCCESS;
|
94 | }
|
95 |
|
96 | int _mosquitto_message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
|
97 | {
|
98 | struct mosquitto_message_all *message;
|
99 | int rc;
|
100 | assert(mosq);
|
101 |
|
102 | rc = _mosquitto_message_remove(mosq, mid, dir, &message);
|
103 | if(rc == MOSQ_ERR_SUCCESS){
|
104 | _mosquitto_message_cleanup(&message);
|
105 | }
|
106 | return rc;
|
107 | }
|
108 |
|
109 | void mosquitto_message_free(struct mosquitto_message **message)
|
110 | {
|
111 | struct mosquitto_message *msg;
|
112 |
|
113 | if(!message || !*message) return;
|
114 |
|
115 | msg = *message;
|
116 |
|
117 | if(msg->topic) _mosquitto_free(msg->topic);
|
118 | if(msg->payload) _mosquitto_free(msg->payload);
|
119 | _mosquitto_free(msg);
|
120 | }
|
121 |
|
122 | void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
|
123 | {
|
124 |
|
125 | assert(mosq);
|
126 | assert(message);
|
127 |
|
128 | if(dir == mosq_md_out){
|
129 | mosq->out_queue_len++;
|
130 | message->next = NULL;
|
131 | if(mosq->out_messages_last){
|
132 | mosq->out_messages_last->next = message;
|
133 | }else{
|
134 | mosq->out_messages = message;
|
135 | }
|
136 | mosq->out_messages_last = message;
|
137 | }else{
|
138 | mosq->in_queue_len++;
|
139 | if(message->msg.qos > 0 && (mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages)){
|
140 | mosq->inflight_messages++;
|
141 | }
|
142 | message->next = NULL;
|
143 | if(mosq->in_messages_last){
|
144 | mosq->in_messages_last->next = message;
|
145 | }else{
|
146 | mosq->in_messages = message;
|
147 | }
|
148 | mosq->in_messages_last = message;
|
149 | }
|
150 | }
|
151 |
|
152 | void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq)
|
153 | {
|
154 | struct mosquitto_message_all *message;
|
155 | struct mosquitto_message_all *prev = NULL;
|
156 | assert(mosq);
|
157 |
|
158 | pthread_mutex_lock(&mosq->in_message_mutex);
|
159 | message = mosq->in_messages;
|
160 | mosq->in_queue_len = 0;
|
161 | while(message){
|
162 | mosq->in_queue_len++;
|
163 | message->timestamp = 0;
|
164 | if(message->msg.qos != 2){
|
165 | if(prev){
|
166 | prev->next = message->next;
|
167 | _mosquitto_message_cleanup(&message);
|
168 | message = prev;
|
169 | }else{
|
170 | mosq->in_messages = message->next;
|
171 | _mosquitto_message_cleanup(&message);
|
172 | message = mosq->in_messages;
|
173 | }
|
174 | }else{
|
175 | |
176 |
|
177 | }
|
178 | prev = message;
|
179 | message = message->next;
|
180 | }
|
181 | mosq->in_messages_last = prev;
|
182 | pthread_mutex_unlock(&mosq->in_message_mutex);
|
183 |
|
184 |
|
185 | pthread_mutex_lock(&mosq->out_message_mutex);
|
186 | mosq->inflight_messages = 0;
|
187 | message = mosq->out_messages;
|
188 | mosq->out_queue_len = 0;
|
189 | while(message){
|
190 | mosq->out_queue_len++;
|
191 | message->timestamp = 0;
|
192 |
|
193 | if(message->msg.qos > 0){
|
194 | mosq->inflight_messages++;
|
195 | }
|
196 | if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
|
197 | if(message->msg.qos == 1){
|
198 | message->state = mosq_ms_wait_for_puback;
|
199 | }else if(message->msg.qos == 2){
|
200 |
|
201 | }
|
202 | }else{
|
203 | message->state = mosq_ms_invalid;
|
204 | }
|
205 | prev = message;
|
206 | message = message->next;
|
207 | }
|
208 | mosq->out_messages_last = prev;
|
209 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
210 | }
|
211 |
|
212 | int _mosquitto_message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
|
213 | {
|
214 | struct mosquitto_message_all *cur, *prev = NULL;
|
215 | bool found = false;
|
216 | int rc;
|
217 | assert(mosq);
|
218 | assert(message);
|
219 |
|
220 | if(dir == mosq_md_out){
|
221 | pthread_mutex_lock(&mosq->out_message_mutex);
|
222 | cur = mosq->out_messages;
|
223 | while(cur){
|
224 | if(cur->msg.mid == mid){
|
225 | if(prev){
|
226 | prev->next = cur->next;
|
227 | }else{
|
228 | mosq->out_messages = cur->next;
|
229 | }
|
230 | *message = cur;
|
231 | mosq->out_queue_len--;
|
232 | if(cur->next == NULL){
|
233 | mosq->out_messages_last = prev;
|
234 | }else if(!mosq->out_messages){
|
235 | mosq->out_messages_last = NULL;
|
236 | }
|
237 | if(cur->msg.qos > 0){
|
238 | mosq->inflight_messages--;
|
239 | }
|
240 | found = true;
|
241 | break;
|
242 | }
|
243 | prev = cur;
|
244 | cur = cur->next;
|
245 | }
|
246 |
|
247 | if(found){
|
248 | cur = mosq->out_messages;
|
249 | while(cur){
|
250 | if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
|
251 | if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){
|
252 | mosq->inflight_messages++;
|
253 | if(cur->msg.qos == 1){
|
254 | cur->state = mosq_ms_wait_for_puback;
|
255 | }else if(cur->msg.qos == 2){
|
256 | cur->state = mosq_ms_wait_for_pubrec;
|
257 | }
|
258 | rc = _mosquitto_send_publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup);
|
259 | if(rc){
|
260 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
261 | return rc;
|
262 | }
|
263 | }
|
264 | }else{
|
265 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
266 | return MOSQ_ERR_SUCCESS;
|
267 | }
|
268 | cur = cur->next;
|
269 | }
|
270 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
271 | return MOSQ_ERR_SUCCESS;
|
272 | }else{
|
273 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
274 | return MOSQ_ERR_NOT_FOUND;
|
275 | }
|
276 | }else{
|
277 | pthread_mutex_lock(&mosq->in_message_mutex);
|
278 | cur = mosq->in_messages;
|
279 | while(cur){
|
280 | if(cur->msg.mid == mid){
|
281 | if(prev){
|
282 | prev->next = cur->next;
|
283 | }else{
|
284 | mosq->in_messages = cur->next;
|
285 | }
|
286 | *message = cur;
|
287 | mosq->in_queue_len--;
|
288 | if(cur->next == NULL){
|
289 | mosq->in_messages_last = prev;
|
290 | }else if(!mosq->in_messages){
|
291 | mosq->in_messages_last = NULL;
|
292 | }
|
293 | if(cur->msg.qos == 2){
|
294 | mosq->inflight_messages--;
|
295 | }
|
296 | found = true;
|
297 | break;
|
298 | }
|
299 | prev = cur;
|
300 | cur = cur->next;
|
301 | }
|
302 |
|
303 | pthread_mutex_unlock(&mosq->in_message_mutex);
|
304 | if(found){
|
305 | return MOSQ_ERR_SUCCESS;
|
306 | }else{
|
307 | return MOSQ_ERR_NOT_FOUND;
|
308 | }
|
309 | }
|
310 | }
|
311 |
|
312 | #ifdef WITH_THREADING
|
313 | void _mosquitto_message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages, pthread_mutex_t mutex)
|
314 | #else
|
315 | void _mosquitto_message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages)
|
316 | #endif
|
317 | {
|
318 | time_t now = mosquitto_time();
|
319 | assert(mosq);
|
320 |
|
321 | #ifdef WITH_THREADING
|
322 | pthread_mutex_lock(&mutex);
|
323 | #endif
|
324 |
|
325 | while(messages){
|
326 | if(messages->timestamp + mosq->message_retry < now){
|
327 | switch(messages->state){
|
328 | case mosq_ms_wait_for_puback:
|
329 | case mosq_ms_wait_for_pubrec:
|
330 | messages->timestamp = now;
|
331 | messages->dup = true;
|
332 | _mosquitto_send_publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup);
|
333 | break;
|
334 | case mosq_ms_wait_for_pubrel:
|
335 | messages->timestamp = now;
|
336 | messages->dup = true;
|
337 | _mosquitto_send_pubrec(mosq, messages->msg.mid);
|
338 | break;
|
339 | case mosq_ms_wait_for_pubcomp:
|
340 | messages->timestamp = now;
|
341 | messages->dup = true;
|
342 | _mosquitto_send_pubrel(mosq, messages->msg.mid, true);
|
343 | break;
|
344 | default:
|
345 | break;
|
346 | }
|
347 | }
|
348 | messages = messages->next;
|
349 | }
|
350 | #ifdef WITH_THREADING
|
351 | pthread_mutex_unlock(&mutex);
|
352 | #endif
|
353 | }
|
354 |
|
355 | void _mosquitto_message_retry_check(struct mosquitto *mosq)
|
356 | {
|
357 | #ifdef WITH_THREADING
|
358 | _mosquitto_message_retry_check_actual(mosq, mosq->out_messages, mosq->out_message_mutex);
|
359 | _mosquitto_message_retry_check_actual(mosq, mosq->in_messages, mosq->in_message_mutex);
|
360 | #else
|
361 | _mosquitto_message_retry_check_actual(mosq, mosq->out_messages);
|
362 | _mosquitto_message_retry_check_actual(mosq, mosq->in_messages);
|
363 | #endif
|
364 | }
|
365 |
|
366 | void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry)
|
367 | {
|
368 | assert(mosq);
|
369 | if(mosq) mosq->message_retry = message_retry;
|
370 | }
|
371 |
|
372 | int _mosquitto_message_out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
|
373 | {
|
374 | struct mosquitto_message_all *message;
|
375 | assert(mosq);
|
376 |
|
377 | pthread_mutex_lock(&mosq->out_message_mutex);
|
378 | message = mosq->out_messages;
|
379 | while(message){
|
380 | if(message->msg.mid == mid){
|
381 | message->state = state;
|
382 | message->timestamp = mosquitto_time();
|
383 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
384 | return MOSQ_ERR_SUCCESS;
|
385 | }
|
386 | message = message->next;
|
387 | }
|
388 | pthread_mutex_unlock(&mosq->out_message_mutex);
|
389 | return MOSQ_ERR_NOT_FOUND;
|
390 | }
|
391 |
|
392 | int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max_inflight_messages)
|
393 | {
|
394 | if(!mosq) return MOSQ_ERR_INVAL;
|
395 |
|
396 | mosq->max_inflight_messages = max_inflight_messages;
|
397 |
|
398 | return MOSQ_ERR_SUCCESS;
|
399 | }
|
400 |
|