UNPKG

11.1 kBtext/x-cView Raw
1/*
2Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#include <assert.h>
31#include <stdlib.h>
32#include <string.h>
33
34#include "mosquitto_internal.h"
35#include "mosquitto.h"
36#include "memory_mosq.h"
37#include "messages_mosq.h"
38#include "send_mosq.h"
39#include "time_mosq.h"
40
41void _mosquitto_message_cleanup(struct mosquitto_message_all **message)
42{
43 struct mosquitto_message_all *msg;
44
45 if(!message || !*message) return;
46
47 msg = *message;
48
49 if(msg->msg.topic) _mosquitto_free(msg->msg.topic);
50 if(msg->msg.payload) _mosquitto_free(msg->msg.payload);
51 _mosquitto_free(msg);
52}
53
54void _mosquitto_message_cleanup_all(struct mosquitto *mosq)
55{
56 struct mosquitto_message_all *tmp;
57
58 assert(mosq);
59
60 while(mosq->in_messages){
61 tmp = mosq->in_messages->next;
62 _mosquitto_message_cleanup(&mosq->in_messages);
63 mosq->in_messages = tmp;
64 }
65 while(mosq->out_messages){
66 tmp = mosq->out_messages->next;
67 _mosquitto_message_cleanup(&mosq->out_messages);
68 mosq->out_messages = tmp;
69 }
70}
71
72int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto_message *src)
73{
74 if(!dst || !src) return MOSQ_ERR_INVAL;
75
76 dst->mid = src->mid;
77 dst->topic = _mosquitto_strdup(src->topic);
78 if(!dst->topic) return MOSQ_ERR_NOMEM;
79 dst->qos = src->qos;
80 dst->retain = src->retain;
81 if(src->payloadlen){
82 dst->payload = _mosquitto_malloc(src->payloadlen);
83 if(!dst->payload){
84 _mosquitto_free(dst->topic);
85 return MOSQ_ERR_NOMEM;
86 }
87 memcpy(dst->payload, src->payload, src->payloadlen);
88 dst->payloadlen = src->payloadlen;
89 }else{
90 dst->payloadlen = 0;
91 dst->payload = NULL;
92 }
93 return MOSQ_ERR_SUCCESS;
94}
95
96int _mosquitto_message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
97{
98 struct mosquitto_message_all *message;
99 int rc;
100 assert(mosq);
101
102 rc = _mosquitto_message_remove(mosq, mid, dir, &message);
103 if(rc == MOSQ_ERR_SUCCESS){
104 _mosquitto_message_cleanup(&message);
105 }
106 return rc;
107}
108
109void mosquitto_message_free(struct mosquitto_message **message)
110{
111 struct mosquitto_message *msg;
112
113 if(!message || !*message) return;
114
115 msg = *message;
116
117 if(msg->topic) _mosquitto_free(msg->topic);
118 if(msg->payload) _mosquitto_free(msg->payload);
119 _mosquitto_free(msg);
120}
121
122void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
123{
124 /* mosq->*_message_mutex should be locked before entering this function */
125 assert(mosq);
126 assert(message);
127
128 if(dir == mosq_md_out){
129 mosq->out_queue_len++;
130 message->next = NULL;
131 if(mosq->out_messages_last){
132 mosq->out_messages_last->next = message;
133 }else{
134 mosq->out_messages = message;
135 }
136 mosq->out_messages_last = message;
137 }else{
138 mosq->in_queue_len++;
139 if(message->msg.qos > 0 && (mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages)){
140 mosq->inflight_messages++;
141 }
142 message->next = NULL;
143 if(mosq->in_messages_last){
144 mosq->in_messages_last->next = message;
145 }else{
146 mosq->in_messages = message;
147 }
148 mosq->in_messages_last = message;
149 }
150}
151
152void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq)
153{
154 struct mosquitto_message_all *message;
155 struct mosquitto_message_all *prev = NULL;
156 assert(mosq);
157
158 pthread_mutex_lock(&mosq->in_message_mutex);
159 message = mosq->in_messages;
160 mosq->in_queue_len = 0;
161 while(message){
162 mosq->in_queue_len++;
163 message->timestamp = 0;
164 if(message->msg.qos != 2){
165 if(prev){
166 prev->next = message->next;
167 _mosquitto_message_cleanup(&message);
168 message = prev;
169 }else{
170 mosq->in_messages = message->next;
171 _mosquitto_message_cleanup(&message);
172 message = mosq->in_messages;
173 }
174 }else{
175 /* Message state can be preserved here because it should match
176 * whatever the client has got. */
177 }
178 prev = message;
179 message = message->next;
180 }
181 mosq->in_messages_last = prev;
182 pthread_mutex_unlock(&mosq->in_message_mutex);
183
184
185 pthread_mutex_lock(&mosq->out_message_mutex);
186 mosq->inflight_messages = 0;
187 message = mosq->out_messages;
188 mosq->out_queue_len = 0;
189 while(message){
190 mosq->out_queue_len++;
191 message->timestamp = 0;
192
193 if(message->msg.qos > 0){
194 mosq->inflight_messages++;
195 }
196 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
197 if(message->msg.qos == 1){
198 message->state = mosq_ms_wait_for_puback;
199 }else if(message->msg.qos == 2){
200 /* Should be able to preserve state. */
201 }
202 }else{
203 message->state = mosq_ms_invalid;
204 }
205 prev = message;
206 message = message->next;
207 }
208 mosq->out_messages_last = prev;
209 pthread_mutex_unlock(&mosq->out_message_mutex);
210}
211
212int _mosquitto_message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
213{
214 struct mosquitto_message_all *cur, *prev = NULL;
215 bool found = false;
216 int rc;
217 assert(mosq);
218 assert(message);
219
220 if(dir == mosq_md_out){
221 pthread_mutex_lock(&mosq->out_message_mutex);
222 cur = mosq->out_messages;
223 while(cur){
224 if(cur->msg.mid == mid){
225 if(prev){
226 prev->next = cur->next;
227 }else{
228 mosq->out_messages = cur->next;
229 }
230 *message = cur;
231 mosq->out_queue_len--;
232 if(cur->next == NULL){
233 mosq->out_messages_last = prev;
234 }else if(!mosq->out_messages){
235 mosq->out_messages_last = NULL;
236 }
237 if(cur->msg.qos > 0){
238 mosq->inflight_messages--;
239 }
240 found = true;
241 break;
242 }
243 prev = cur;
244 cur = cur->next;
245 }
246
247 if(found){
248 cur = mosq->out_messages;
249 while(cur){
250 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
251 if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){
252 mosq->inflight_messages++;
253 if(cur->msg.qos == 1){
254 cur->state = mosq_ms_wait_for_puback;
255 }else if(cur->msg.qos == 2){
256 cur->state = mosq_ms_wait_for_pubrec;
257 }
258 rc = _mosquitto_send_publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup);
259 if(rc){
260 pthread_mutex_unlock(&mosq->out_message_mutex);
261 return rc;
262 }
263 }
264 }else{
265 pthread_mutex_unlock(&mosq->out_message_mutex);
266 return MOSQ_ERR_SUCCESS;
267 }
268 cur = cur->next;
269 }
270 pthread_mutex_unlock(&mosq->out_message_mutex);
271 return MOSQ_ERR_SUCCESS;
272 }else{
273 pthread_mutex_unlock(&mosq->out_message_mutex);
274 return MOSQ_ERR_NOT_FOUND;
275 }
276 }else{
277 pthread_mutex_lock(&mosq->in_message_mutex);
278 cur = mosq->in_messages;
279 while(cur){
280 if(cur->msg.mid == mid){
281 if(prev){
282 prev->next = cur->next;
283 }else{
284 mosq->in_messages = cur->next;
285 }
286 *message = cur;
287 mosq->in_queue_len--;
288 if(cur->next == NULL){
289 mosq->in_messages_last = prev;
290 }else if(!mosq->in_messages){
291 mosq->in_messages_last = NULL;
292 }
293 if(cur->msg.qos == 2){
294 mosq->inflight_messages--;
295 }
296 found = true;
297 break;
298 }
299 prev = cur;
300 cur = cur->next;
301 }
302
303 pthread_mutex_unlock(&mosq->in_message_mutex);
304 if(found){
305 return MOSQ_ERR_SUCCESS;
306 }else{
307 return MOSQ_ERR_NOT_FOUND;
308 }
309 }
310}
311
312#ifdef WITH_THREADING
313void _mosquitto_message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages, pthread_mutex_t mutex)
314#else
315void _mosquitto_message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages)
316#endif
317{
318 time_t now = mosquitto_time();
319 assert(mosq);
320
321#ifdef WITH_THREADING
322 pthread_mutex_lock(&mutex);
323#endif
324
325 while(messages){
326 if(messages->timestamp + mosq->message_retry < now){
327 switch(messages->state){
328 case mosq_ms_wait_for_puback:
329 case mosq_ms_wait_for_pubrec:
330 messages->timestamp = now;
331 messages->dup = true;
332 _mosquitto_send_publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup);
333 break;
334 case mosq_ms_wait_for_pubrel:
335 messages->timestamp = now;
336 messages->dup = true;
337 _mosquitto_send_pubrec(mosq, messages->msg.mid);
338 break;
339 case mosq_ms_wait_for_pubcomp:
340 messages->timestamp = now;
341 messages->dup = true;
342 _mosquitto_send_pubrel(mosq, messages->msg.mid, true);
343 break;
344 default:
345 break;
346 }
347 }
348 messages = messages->next;
349 }
350#ifdef WITH_THREADING
351 pthread_mutex_unlock(&mutex);
352#endif
353}
354
355void _mosquitto_message_retry_check(struct mosquitto *mosq)
356{
357#ifdef WITH_THREADING
358 _mosquitto_message_retry_check_actual(mosq, mosq->out_messages, mosq->out_message_mutex);
359 _mosquitto_message_retry_check_actual(mosq, mosq->in_messages, mosq->in_message_mutex);
360#else
361 _mosquitto_message_retry_check_actual(mosq, mosq->out_messages);
362 _mosquitto_message_retry_check_actual(mosq, mosq->in_messages);
363#endif
364}
365
366void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry)
367{
368 assert(mosq);
369 if(mosq) mosq->message_retry = message_retry;
370}
371
372int _mosquitto_message_out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
373{
374 struct mosquitto_message_all *message;
375 assert(mosq);
376
377 pthread_mutex_lock(&mosq->out_message_mutex);
378 message = mosq->out_messages;
379 while(message){
380 if(message->msg.mid == mid){
381 message->state = state;
382 message->timestamp = mosquitto_time();
383 pthread_mutex_unlock(&mosq->out_message_mutex);
384 return MOSQ_ERR_SUCCESS;
385 }
386 message = message->next;
387 }
388 pthread_mutex_unlock(&mosq->out_message_mutex);
389 return MOSQ_ERR_NOT_FOUND;
390}
391
392int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max_inflight_messages)
393{
394 if(!mosq) return MOSQ_ERR_INVAL;
395
396 mosq->max_inflight_messages = max_inflight_messages;
397
398 return MOSQ_ERR_SUCCESS;
399}
400