UNPKG

34.6 kBtext/x-cView Raw
1/*
2Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#include <assert.h>
31#include <errno.h>
32#include <signal.h>
33#include <stdio.h>
34#include <string.h>
35#ifndef WIN32
36#include <sys/select.h>
37#include <sys/time.h>
38#include <unistd.h>
39#else
40#include <winsock2.h>
41#include <windows.h>
42typedef int ssize_t;
43#endif
44
45#include "mosquitto.h"
46#include "mosquitto_internal.h"
47#include "logging_mosq.h"
48#include "messages_mosq.h"
49#include "memory_mosq.h"
50#include "mqtt3_protocol.h"
51#include "net_mosq.h"
52#include "read_handle.h"
53#include "send_mosq.h"
54#include "time_mosq.h"
55#include "tls_mosq.h"
56#include "util_mosq.h"
57#include "will_mosq.h"
58
59#if !defined(WIN32) && !defined(__SYMBIAN32__)
60#define HAVE_PSELECT
61#endif
62
63void _mosquitto_destroy(struct mosquitto *mosq);
64static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking);
65static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address);
66
67int mosquitto_lib_version(int *major, int *minor, int *revision)
68{
69 if(major) *major = LIBMOSQUITTO_MAJOR;
70 if(minor) *minor = LIBMOSQUITTO_MINOR;
71 if(revision) *revision = LIBMOSQUITTO_REVISION;
72 return LIBMOSQUITTO_VERSION_NUMBER;
73}
74
75int mosquitto_lib_init(void)
76{
77#ifdef WIN32
78 srand(GetTickCount());
79#else
80 struct timeval tv;
81
82 gettimeofday(&tv, NULL);
83 srand(tv.tv_sec*1000 + tv.tv_usec/1000);
84#endif
85
86 _mosquitto_net_init();
87
88 return MOSQ_ERR_SUCCESS;
89}
90
91int mosquitto_lib_cleanup(void)
92{
93 _mosquitto_net_cleanup();
94
95 return MOSQ_ERR_SUCCESS;
96}
97
98struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata)
99{
100 struct mosquitto *mosq = NULL;
101 int rc;
102
103 if(clean_session == false && id == NULL){
104 errno = EINVAL;
105 return NULL;
106 }
107
108#ifndef WIN32
109 signal(SIGPIPE, SIG_IGN);
110#endif
111
112 mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
113 if(mosq){
114 mosq->sock = INVALID_SOCKET;
115 mosq->sockpairR = INVALID_SOCKET;
116 mosq->sockpairW = INVALID_SOCKET;
117#ifdef WITH_THREADING
118 mosq->thread_id = pthread_self();
119#endif
120 rc = mosquitto_reinitialise(mosq, id, clean_session, userdata);
121 if(rc){
122 mosquitto_destroy(mosq);
123 if(rc == MOSQ_ERR_INVAL){
124 errno = EINVAL;
125 }else if(rc == MOSQ_ERR_NOMEM){
126 errno = ENOMEM;
127 }
128 return NULL;
129 }
130 }else{
131 errno = ENOMEM;
132 }
133 return mosq;
134}
135
136int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *userdata)
137{
138 int i;
139
140 if(!mosq) return MOSQ_ERR_INVAL;
141
142 if(clean_session == false && id == NULL){
143 return MOSQ_ERR_INVAL;
144 }
145
146 _mosquitto_destroy(mosq);
147 memset(mosq, 0, sizeof(struct mosquitto));
148
149 if(userdata){
150 mosq->userdata = userdata;
151 }else{
152 mosq->userdata = mosq;
153 }
154 mosq->sock = INVALID_SOCKET;
155 mosq->sockpairR = INVALID_SOCKET;
156 mosq->sockpairW = INVALID_SOCKET;
157 mosq->keepalive = 60;
158 mosq->message_retry = 20;
159 mosq->last_retry_check = 0;
160 mosq->clean_session = clean_session;
161 if(id){
162 if(strlen(id) == 0){
163 return MOSQ_ERR_INVAL;
164 }
165 mosq->id = _mosquitto_strdup(id);
166 }else{
167 mosq->id = (char *)_mosquitto_calloc(24, sizeof(char));
168 if(!mosq->id){
169 return MOSQ_ERR_NOMEM;
170 }
171 mosq->id[0] = 'm';
172 mosq->id[1] = 'o';
173 mosq->id[2] = 's';
174 mosq->id[3] = 'q';
175 mosq->id[4] = '/';
176
177 for(i=5; i<23; i++){
178 mosq->id[i] = (rand()%73)+48;
179 }
180 }
181 mosq->in_packet.payload = NULL;
182 _mosquitto_packet_cleanup(&mosq->in_packet);
183 mosq->out_packet = NULL;
184 mosq->current_out_packet = NULL;
185 mosq->last_msg_in = mosquitto_time();
186 mosq->last_msg_out = mosquitto_time();
187 mosq->ping_t = 0;
188 mosq->last_mid = 0;
189 mosq->state = mosq_cs_new;
190 mosq->in_messages = NULL;
191 mosq->in_messages_last = NULL;
192 mosq->out_messages = NULL;
193 mosq->out_messages_last = NULL;
194 mosq->max_inflight_messages = 20;
195 mosq->will = NULL;
196 mosq->on_connect = NULL;
197 mosq->on_publish = NULL;
198 mosq->on_message = NULL;
199 mosq->on_subscribe = NULL;
200 mosq->on_unsubscribe = NULL;
201 mosq->host = NULL;
202 mosq->port = 1883;
203 mosq->in_callback = false;
204 mosq->in_queue_len = 0;
205 mosq->out_queue_len = 0;
206 mosq->reconnect_delay = 1;
207 mosq->reconnect_delay_max = 1;
208 mosq->reconnect_exponential_backoff = false;
209 mosq->threaded = false;
210#ifdef WITH_TLS
211 mosq->ssl = NULL;
212 mosq->tls_cert_reqs = SSL_VERIFY_PEER;
213 mosq->tls_insecure = false;
214#endif
215#ifdef WITH_THREADING
216 pthread_mutex_init(&mosq->callback_mutex, NULL);
217 pthread_mutex_init(&mosq->log_callback_mutex, NULL);
218 pthread_mutex_init(&mosq->state_mutex, NULL);
219 pthread_mutex_init(&mosq->out_packet_mutex, NULL);
220 pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
221 pthread_mutex_init(&mosq->msgtime_mutex, NULL);
222 pthread_mutex_init(&mosq->in_message_mutex, NULL);
223 pthread_mutex_init(&mosq->out_message_mutex, NULL);
224 mosq->thread_id = pthread_self();
225#endif
226
227 return MOSQ_ERR_SUCCESS;
228}
229
230int mosquitto_will_set(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
231{
232 if(!mosq) return MOSQ_ERR_INVAL;
233 return _mosquitto_will_set(mosq, topic, payloadlen, payload, qos, retain);
234}
235
236int mosquitto_will_clear(struct mosquitto *mosq)
237{
238 if(!mosq) return MOSQ_ERR_INVAL;
239 return _mosquitto_will_clear(mosq);
240}
241
242int mosquitto_username_pw_set(struct mosquitto *mosq, const char *username, const char *password)
243{
244 if(!mosq) return MOSQ_ERR_INVAL;
245
246 if(mosq->username){
247 _mosquitto_free(mosq->username);
248 mosq->username = NULL;
249 }
250 if(mosq->password){
251 _mosquitto_free(mosq->password);
252 mosq->password = NULL;
253 }
254
255 if(username){
256 mosq->username = _mosquitto_strdup(username);
257 if(!mosq->username) return MOSQ_ERR_NOMEM;
258 if(password){
259 mosq->password = _mosquitto_strdup(password);
260 if(!mosq->password){
261 _mosquitto_free(mosq->username);
262 mosq->username = NULL;
263 return MOSQ_ERR_NOMEM;
264 }
265 }
266 }
267 return MOSQ_ERR_SUCCESS;
268}
269
270int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigned int reconnect_delay, unsigned int reconnect_delay_max, bool reconnect_exponential_backoff)
271{
272 if(!mosq) return MOSQ_ERR_INVAL;
273
274 mosq->reconnect_delay = reconnect_delay;
275 mosq->reconnect_delay_max = reconnect_delay_max;
276 mosq->reconnect_exponential_backoff = reconnect_exponential_backoff;
277
278 return MOSQ_ERR_SUCCESS;
279
280}
281
282void _mosquitto_destroy(struct mosquitto *mosq)
283{
284 struct _mosquitto_packet *packet;
285 if(!mosq) return;
286
287#ifdef WITH_THREADING
288 if(mosq->threaded && !pthread_equal(mosq->thread_id, pthread_self())){
289 pthread_cancel(mosq->thread_id);
290 pthread_join(mosq->thread_id, NULL);
291 mosq->threaded = false;
292 }
293
294 if(mosq->id){
295 /* If mosq->id is not NULL then the client has already been initialised
296 * and so the mutexes need destroying. If mosq->id is NULL, the mutexes
297 * haven't been initialised. */
298 pthread_mutex_destroy(&mosq->callback_mutex);
299 pthread_mutex_destroy(&mosq->log_callback_mutex);
300 pthread_mutex_destroy(&mosq->state_mutex);
301 pthread_mutex_destroy(&mosq->out_packet_mutex);
302 pthread_mutex_destroy(&mosq->current_out_packet_mutex);
303 pthread_mutex_destroy(&mosq->msgtime_mutex);
304 pthread_mutex_destroy(&mosq->in_message_mutex);
305 pthread_mutex_destroy(&mosq->out_message_mutex);
306 }
307#endif
308 if(mosq->sock != INVALID_SOCKET){
309 _mosquitto_socket_close(mosq);
310 }
311 _mosquitto_message_cleanup_all(mosq);
312 _mosquitto_will_clear(mosq);
313#ifdef WITH_TLS
314 if(mosq->ssl){
315 SSL_free(mosq->ssl);
316 }
317 if(mosq->ssl_ctx){
318 SSL_CTX_free(mosq->ssl_ctx);
319 }
320 if(mosq->tls_cafile) _mosquitto_free(mosq->tls_cafile);
321 if(mosq->tls_capath) _mosquitto_free(mosq->tls_capath);
322 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
323 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
324 if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;
325 if(mosq->tls_version) _mosquitto_free(mosq->tls_version);
326 if(mosq->tls_ciphers) _mosquitto_free(mosq->tls_ciphers);
327 if(mosq->tls_psk) _mosquitto_free(mosq->tls_psk);
328 if(mosq->tls_psk_identity) _mosquitto_free(mosq->tls_psk_identity);
329#endif
330
331 if(mosq->address){
332 _mosquitto_free(mosq->address);
333 mosq->address = NULL;
334 }
335 if(mosq->id){
336 _mosquitto_free(mosq->id);
337 mosq->id = NULL;
338 }
339 if(mosq->username){
340 _mosquitto_free(mosq->username);
341 mosq->username = NULL;
342 }
343 if(mosq->password){
344 _mosquitto_free(mosq->password);
345 mosq->password = NULL;
346 }
347 if(mosq->host){
348 _mosquitto_free(mosq->host);
349 mosq->host = NULL;
350 }
351 if(mosq->bind_address){
352 _mosquitto_free(mosq->bind_address);
353 mosq->bind_address = NULL;
354 }
355
356 /* Out packet cleanup */
357 if(mosq->out_packet && !mosq->current_out_packet){
358 mosq->current_out_packet = mosq->out_packet;
359 mosq->out_packet = mosq->out_packet->next;
360 }
361 while(mosq->current_out_packet){
362 packet = mosq->current_out_packet;
363 /* Free data and reset values */
364 mosq->current_out_packet = mosq->out_packet;
365 if(mosq->out_packet){
366 mosq->out_packet = mosq->out_packet->next;
367 }
368
369 _mosquitto_packet_cleanup(packet);
370 _mosquitto_free(packet);
371 }
372
373 _mosquitto_packet_cleanup(&mosq->in_packet);
374 if(mosq->sockpairR != INVALID_SOCKET){
375 COMPAT_CLOSE(mosq->sockpairR);
376 mosq->sockpairR = INVALID_SOCKET;
377 }
378 if(mosq->sockpairW != INVALID_SOCKET){
379 COMPAT_CLOSE(mosq->sockpairW);
380 mosq->sockpairW = INVALID_SOCKET;
381 }
382}
383
384void mosquitto_destroy(struct mosquitto *mosq)
385{
386 if(!mosq) return;
387
388 _mosquitto_destroy(mosq);
389 _mosquitto_free(mosq);
390}
391
392int mosquitto_socket(struct mosquitto *mosq)
393{
394 if(!mosq) return MOSQ_ERR_INVAL;
395 return mosq->sock;
396}
397
398static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
399{
400 if(!mosq) return MOSQ_ERR_INVAL;
401 if(!host || port <= 0) return MOSQ_ERR_INVAL;
402
403 if(mosq->host) _mosquitto_free(mosq->host);
404 mosq->host = _mosquitto_strdup(host);
405 if(!mosq->host) return MOSQ_ERR_NOMEM;
406 mosq->port = port;
407
408 if(mosq->bind_address) _mosquitto_free(mosq->bind_address);
409 if(bind_address){
410 mosq->bind_address = _mosquitto_strdup(bind_address);
411 if(!mosq->bind_address) return MOSQ_ERR_NOMEM;
412 }
413
414 mosq->keepalive = keepalive;
415
416 if(_mosquitto_socketpair(&mosq->sockpairR, &mosq->sockpairW)){
417 _mosquitto_log_printf(mosq, MOSQ_LOG_WARNING,
418 "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
419 }
420
421 return MOSQ_ERR_SUCCESS;
422}
423
424int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive)
425{
426 return mosquitto_connect_bind(mosq, host, port, keepalive, NULL);
427}
428
429int mosquitto_connect_bind(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
430{
431 int rc;
432 rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
433 if(rc) return rc;
434
435 pthread_mutex_lock(&mosq->state_mutex);
436 mosq->state = mosq_cs_new;
437 pthread_mutex_unlock(&mosq->state_mutex);
438
439 return _mosquitto_reconnect(mosq, true);
440}
441
442int mosquitto_connect_async(struct mosquitto *mosq, const char *host, int port, int keepalive)
443{
444 return mosquitto_connect_bind_async(mosq, host, port, keepalive, NULL);
445}
446
447int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
448{
449 int rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
450 if(rc) return rc;
451
452 pthread_mutex_lock(&mosq->state_mutex);
453 mosq->state = mosq_cs_connect_async;
454 pthread_mutex_unlock(&mosq->state_mutex);
455
456 return _mosquitto_reconnect(mosq, false);
457}
458
459int mosquitto_reconnect_async(struct mosquitto *mosq)
460{
461 return _mosquitto_reconnect(mosq, false);
462}
463
464int mosquitto_reconnect(struct mosquitto *mosq)
465{
466 return _mosquitto_reconnect(mosq, true);
467}
468
469static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking)
470{
471 int rc;
472 struct _mosquitto_packet *packet;
473 if(!mosq) return MOSQ_ERR_INVAL;
474 if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
475
476 pthread_mutex_lock(&mosq->state_mutex);
477 mosq->state = mosq_cs_new;
478 pthread_mutex_unlock(&mosq->state_mutex);
479
480 pthread_mutex_lock(&mosq->msgtime_mutex);
481 mosq->last_msg_in = mosquitto_time();
482 mosq->last_msg_out = mosquitto_time();
483 pthread_mutex_unlock(&mosq->msgtime_mutex);
484
485 mosq->ping_t = 0;
486
487 _mosquitto_packet_cleanup(&mosq->in_packet);
488
489 pthread_mutex_lock(&mosq->current_out_packet_mutex);
490 pthread_mutex_lock(&mosq->out_packet_mutex);
491
492 if(mosq->out_packet && !mosq->current_out_packet){
493 mosq->current_out_packet = mosq->out_packet;
494 mosq->out_packet = mosq->out_packet->next;
495 }
496
497 while(mosq->current_out_packet){
498 packet = mosq->current_out_packet;
499 /* Free data and reset values */
500 mosq->current_out_packet = mosq->out_packet;
501 if(mosq->out_packet){
502 mosq->out_packet = mosq->out_packet->next;
503 }
504
505 _mosquitto_packet_cleanup(packet);
506 _mosquitto_free(packet);
507 }
508 pthread_mutex_unlock(&mosq->out_packet_mutex);
509 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
510
511 _mosquitto_messages_reconnect_reset(mosq);
512
513 rc = _mosquitto_socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
514 if(rc){
515 return rc;
516 }
517
518 return _mosquitto_send_connect(mosq, mosq->keepalive, mosq->clean_session);
519}
520
521int mosquitto_disconnect(struct mosquitto *mosq)
522{
523 if(!mosq) return MOSQ_ERR_INVAL;
524
525 pthread_mutex_lock(&mosq->state_mutex);
526 mosq->state = mosq_cs_disconnecting;
527 pthread_mutex_unlock(&mosq->state_mutex);
528
529 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
530 return _mosquitto_send_disconnect(mosq);
531}
532
533int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
534{
535 struct mosquitto_message_all *message;
536 uint16_t local_mid;
537
538 if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
539 if(strlen(topic) == 0) return MOSQ_ERR_INVAL;
540 if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
541
542 if(_mosquitto_topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS){
543 return MOSQ_ERR_INVAL;
544 }
545
546 local_mid = _mosquitto_mid_generate(mosq);
547 if(mid){
548 *mid = local_mid;
549 }
550
551 if(qos == 0){
552 return _mosquitto_send_publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
553 }else{
554 message = _mosquitto_calloc(1, sizeof(struct mosquitto_message_all));
555 if(!message) return MOSQ_ERR_NOMEM;
556
557 message->next = NULL;
558 message->timestamp = mosquitto_time();
559 message->msg.mid = local_mid;
560 message->msg.topic = _mosquitto_strdup(topic);
561 if(!message->msg.topic){
562 _mosquitto_message_cleanup(&message);
563 return MOSQ_ERR_NOMEM;
564 }
565 if(payloadlen){
566 message->msg.payloadlen = payloadlen;
567 message->msg.payload = _mosquitto_malloc(payloadlen*sizeof(uint8_t));
568 if(!message->msg.payload){
569 _mosquitto_message_cleanup(&message);
570 return MOSQ_ERR_NOMEM;
571 }
572 memcpy(message->msg.payload, payload, payloadlen*sizeof(uint8_t));
573 }else{
574 message->msg.payloadlen = 0;
575 message->msg.payload = NULL;
576 }
577 message->msg.qos = qos;
578 message->msg.retain = retain;
579 message->dup = false;
580
581 pthread_mutex_lock(&mosq->out_message_mutex);
582 _mosquitto_message_queue(mosq, message, mosq_md_out);
583 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
584 mosq->inflight_messages++;
585 if(qos == 1){
586 message->state = mosq_ms_wait_for_puback;
587 }else if(qos == 2){
588 message->state = mosq_ms_wait_for_pubrec;
589 }
590 pthread_mutex_unlock(&mosq->out_message_mutex);
591 return _mosquitto_send_publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup);
592 }else{
593 message->state = mosq_ms_invalid;
594 pthread_mutex_unlock(&mosq->out_message_mutex);
595 return MOSQ_ERR_SUCCESS;
596 }
597 }
598}
599
600int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
601{
602 if(!mosq) return MOSQ_ERR_INVAL;
603 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
604
605 if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
606
607 return _mosquitto_send_subscribe(mosq, mid, false, sub, qos);
608}
609
610int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub)
611{
612 if(!mosq) return MOSQ_ERR_INVAL;
613 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
614
615 if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
616
617 return _mosquitto_send_unsubscribe(mosq, mid, false, sub);
618}
619
620int mosquitto_tls_set(struct mosquitto *mosq, const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))
621{
622#ifdef WITH_TLS
623 FILE *fptr;
624
625 if(!mosq || (!cafile && !capath) || (certfile && !keyfile) || (!certfile && keyfile)) return MOSQ_ERR_INVAL;
626
627 if(cafile){
628 fptr = _mosquitto_fopen(cafile, "rt");
629 if(fptr){
630 fclose(fptr);
631 }else{
632 return MOSQ_ERR_INVAL;
633 }
634 mosq->tls_cafile = _mosquitto_strdup(cafile);
635
636 if(!mosq->tls_cafile){
637 return MOSQ_ERR_NOMEM;
638 }
639 }else if(mosq->tls_cafile){
640 _mosquitto_free(mosq->tls_cafile);
641 mosq->tls_cafile = NULL;
642 }
643
644 if(capath){
645 mosq->tls_capath = _mosquitto_strdup(capath);
646 if(!mosq->tls_capath){
647 return MOSQ_ERR_NOMEM;
648 }
649 }else if(mosq->tls_capath){
650 _mosquitto_free(mosq->tls_capath);
651 mosq->tls_capath = NULL;
652 }
653
654 if(certfile){
655 fptr = _mosquitto_fopen(certfile, "rt");
656 if(fptr){
657 fclose(fptr);
658 }else{
659 if(mosq->tls_cafile){
660 _mosquitto_free(mosq->tls_cafile);
661 mosq->tls_cafile = NULL;
662 }
663 if(mosq->tls_capath){
664 _mosquitto_free(mosq->tls_capath);
665 mosq->tls_capath = NULL;
666 }
667 return MOSQ_ERR_INVAL;
668 }
669 mosq->tls_certfile = _mosquitto_strdup(certfile);
670 if(!mosq->tls_certfile){
671 return MOSQ_ERR_NOMEM;
672 }
673 }else{
674 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
675 mosq->tls_certfile = NULL;
676 }
677
678 if(keyfile){
679 fptr = _mosquitto_fopen(keyfile, "rt");
680 if(fptr){
681 fclose(fptr);
682 }else{
683 if(mosq->tls_cafile){
684 _mosquitto_free(mosq->tls_cafile);
685 mosq->tls_cafile = NULL;
686 }
687 if(mosq->tls_capath){
688 _mosquitto_free(mosq->tls_capath);
689 mosq->tls_capath = NULL;
690 }
691 if(mosq->tls_certfile){
692 _mosquitto_free(mosq->tls_certfile);
693 mosq->tls_certfile = NULL;
694 }
695 return MOSQ_ERR_INVAL;
696 }
697 mosq->tls_keyfile = _mosquitto_strdup(keyfile);
698 if(!mosq->tls_keyfile){
699 return MOSQ_ERR_NOMEM;
700 }
701 }else{
702 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
703 mosq->tls_keyfile = NULL;
704 }
705
706 mosq->tls_pw_callback = pw_callback;
707
708
709 return MOSQ_ERR_SUCCESS;
710#else
711 return MOSQ_ERR_NOT_SUPPORTED;
712
713#endif
714}
715
716int mosquitto_tls_opts_set(struct mosquitto *mosq, int cert_reqs, const char *tls_version, const char *ciphers)
717{
718#ifdef WITH_TLS
719 if(!mosq) return MOSQ_ERR_INVAL;
720
721 mosq->tls_cert_reqs = cert_reqs;
722 if(tls_version){
723#if OPENSSL_VERSION_NUMBER >= 0x10001000L
724 if(!strcasecmp(tls_version, "tlsv1.2")
725 || !strcasecmp(tls_version, "tlsv1.1")
726 || !strcasecmp(tls_version, "tlsv1")){
727
728 mosq->tls_version = _mosquitto_strdup(tls_version);
729 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
730 }else{
731 return MOSQ_ERR_INVAL;
732 }
733#else
734 if(!strcasecmp(tls_version, "tlsv1")){
735 mosq->tls_version = _mosquitto_strdup(tls_version);
736 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
737 }else{
738 return MOSQ_ERR_INVAL;
739 }
740#endif
741 }else{
742#if OPENSSL_VERSION_NUMBER >= 0x10001000L
743 mosq->tls_version = _mosquitto_strdup("tlsv1.2");
744#else
745 mosq->tls_version = _mosquitto_strdup("tlsv1");
746#endif
747 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
748 }
749 if(ciphers){
750 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
751 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
752 }else{
753 mosq->tls_ciphers = NULL;
754 }
755
756
757 return MOSQ_ERR_SUCCESS;
758#else
759 return MOSQ_ERR_NOT_SUPPORTED;
760
761#endif
762}
763
764
765int mosquitto_tls_insecure_set(struct mosquitto *mosq, bool value)
766{
767#ifdef WITH_TLS
768 if(!mosq) return MOSQ_ERR_INVAL;
769 mosq->tls_insecure = value;
770 return MOSQ_ERR_SUCCESS;
771#else
772 return MOSQ_ERR_NOT_SUPPORTED;
773#endif
774}
775
776
777int mosquitto_tls_psk_set(struct mosquitto *mosq, const char *psk, const char *identity, const char *ciphers)
778{
779#ifdef REAL_WITH_TLS_PSK
780 if(!mosq || !psk || !identity) return MOSQ_ERR_INVAL;
781
782 /* Check for hex only digits */
783 if(strspn(psk, "0123456789abcdefABCDEF") < strlen(psk)){
784 return MOSQ_ERR_INVAL;
785 }
786 mosq->tls_psk = _mosquitto_strdup(psk);
787 if(!mosq->tls_psk) return MOSQ_ERR_NOMEM;
788
789 mosq->tls_psk_identity = _mosquitto_strdup(identity);
790 if(!mosq->tls_psk_identity){
791 _mosquitto_free(mosq->tls_psk);
792 return MOSQ_ERR_NOMEM;
793 }
794 if(ciphers){
795 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
796 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
797 }else{
798 mosq->tls_ciphers = NULL;
799 }
800
801 return MOSQ_ERR_SUCCESS;
802#else
803 return MOSQ_ERR_NOT_SUPPORTED;
804#endif
805}
806
807
808int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
809{
810#ifdef HAVE_PSELECT
811 struct timespec local_timeout;
812#else
813 struct timeval local_timeout;
814#endif
815 fd_set readfds, writefds;
816 int fdcount;
817 int rc;
818 char pairbuf;
819 int maxfd = 0;
820
821 if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
822
823 FD_ZERO(&readfds);
824 FD_ZERO(&writefds);
825 if(mosq->sock != INVALID_SOCKET){
826 maxfd = mosq->sock;
827 FD_SET(mosq->sock, &readfds);
828 pthread_mutex_lock(&mosq->current_out_packet_mutex);
829 pthread_mutex_lock(&mosq->out_packet_mutex);
830 if(mosq->out_packet || mosq->current_out_packet){
831 FD_SET(mosq->sock, &writefds);
832#ifdef WITH_TLS
833 }else if(mosq->ssl && mosq->want_write){
834 FD_SET(mosq->sock, &writefds);
835#endif
836 }
837 pthread_mutex_unlock(&mosq->out_packet_mutex);
838 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
839 }else{
840#ifdef WITH_SRV
841 if(mosq->achan){
842 pthread_mutex_lock(&mosq->state_mutex);
843 if(mosq->state == mosq_cs_connect_srv){
844 rc = ares_fds(mosq->achan, &readfds, &writefds);
845 if(rc > maxfd){
846 maxfd = rc;
847 }
848 }else{
849 return MOSQ_ERR_NO_CONN;
850 }
851 pthread_mutex_unlock(&mosq->state_mutex);
852 }
853#else
854 return MOSQ_ERR_NO_CONN;
855#endif
856 }
857 if(mosq->sockpairR != INVALID_SOCKET){
858 /* sockpairR is used to break out of select() before the timeout, on a
859 * call to publish() etc. */
860 FD_SET(mosq->sockpairR, &readfds);
861 if(mosq->sockpairR > maxfd){
862 maxfd = mosq->sockpairR;
863 }
864 }
865
866 if(timeout >= 0){
867 local_timeout.tv_sec = timeout/1000;
868#ifdef HAVE_PSELECT
869 local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
870#else
871 local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
872#endif
873 }else{
874 local_timeout.tv_sec = 1;
875#ifdef HAVE_PSELECT
876 local_timeout.tv_nsec = 0;
877#else
878 local_timeout.tv_usec = 0;
879#endif
880 }
881
882#ifdef HAVE_PSELECT
883 fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
884#else
885 fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
886#endif
887 if(fdcount == -1){
888#ifdef WIN32
889 errno = WSAGetLastError();
890#endif
891 if(errno == EINTR){
892 return MOSQ_ERR_SUCCESS;
893 }else{
894 return MOSQ_ERR_ERRNO;
895 }
896 }else{
897 if(mosq->sock != INVALID_SOCKET){
898 if(FD_ISSET(mosq->sock, &readfds)){
899 rc = mosquitto_loop_read(mosq, max_packets);
900 if(rc || mosq->sock == INVALID_SOCKET){
901 return rc;
902 }
903 }
904 if(mosq->sockpairR >= 0 && FD_ISSET(mosq->sockpairR, &readfds)){
905#ifndef WIN32
906 if(read(mosq->sockpairR, &pairbuf, 1) == 0){
907 }
908#else
909 recv(mosq->sockpairR, &pairbuf, 1, 0);
910#endif
911 /* Fake write possible, to stimulate output write even though
912 * we didn't ask for it, because at that point the publish or
913 * other command wasn't present. */
914 FD_SET(mosq->sock, &writefds);
915 }
916 if(FD_ISSET(mosq->sock, &writefds)){
917 rc = mosquitto_loop_write(mosq, max_packets);
918 if(rc || mosq->sock == INVALID_SOCKET){
919 return rc;
920 }
921 }
922 }
923#ifdef WITH_SRV
924 if(mosq->achan){
925 ares_process(mosq->achan, &readfds, &writefds);
926 }
927#endif
928 }
929 return mosquitto_loop_misc(mosq);
930}
931
932int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
933{
934 int run = 1;
935 int rc;
936 unsigned int reconnects = 0;
937 unsigned long reconnect_delay;
938
939 if(!mosq) return MOSQ_ERR_INVAL;
940
941 if(mosq->state == mosq_cs_connect_async){
942 mosquitto_reconnect(mosq);
943 }
944
945 while(run){
946 do{
947 rc = mosquitto_loop(mosq, timeout, max_packets);
948 if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){
949 reconnects = 0;
950 }
951 }while(rc == MOSQ_ERR_SUCCESS);
952 if(errno == EPROTO){
953 return rc;
954 }
955 pthread_mutex_lock(&mosq->state_mutex);
956 if(mosq->state == mosq_cs_disconnecting){
957 run = 0;
958 pthread_mutex_unlock(&mosq->state_mutex);
959 }else{
960 pthread_mutex_unlock(&mosq->state_mutex);
961
962 if(mosq->reconnect_delay > 0 && mosq->reconnect_exponential_backoff){
963 reconnect_delay = mosq->reconnect_delay*reconnects*reconnects;
964 }else{
965 reconnect_delay = mosq->reconnect_delay;
966 }
967
968 if(reconnect_delay > mosq->reconnect_delay_max){
969 reconnect_delay = mosq->reconnect_delay_max;
970 }else{
971 reconnects++;
972 }
973
974#ifdef WIN32
975 Sleep(reconnect_delay*1000);
976#else
977 sleep(reconnect_delay);
978#endif
979
980 pthread_mutex_lock(&mosq->state_mutex);
981 if(mosq->state == mosq_cs_disconnecting){
982 run = 0;
983 pthread_mutex_unlock(&mosq->state_mutex);
984 }else{
985 pthread_mutex_unlock(&mosq->state_mutex);
986 mosquitto_reconnect(mosq);
987 }
988 }
989 }
990 return rc;
991}
992
993int mosquitto_loop_misc(struct mosquitto *mosq)
994{
995 time_t now;
996 int rc;
997
998 if(!mosq) return MOSQ_ERR_INVAL;
999 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
1000
1001 now = mosquitto_time();
1002
1003 _mosquitto_check_keepalive(mosq);
1004 if(mosq->last_retry_check+1 < now){
1005 _mosquitto_message_retry_check(mosq);
1006 mosq->last_retry_check = now;
1007 }
1008 if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){
1009 /* mosq->ping_t != 0 means we are waiting for a pingresp.
1010 * This hasn't happened in the keepalive time so we should disconnect.
1011 */
1012 _mosquitto_socket_close(mosq);
1013 pthread_mutex_lock(&mosq->state_mutex);
1014 if(mosq->state == mosq_cs_disconnecting){
1015 rc = MOSQ_ERR_SUCCESS;
1016 }else{
1017 rc = 1;
1018 }
1019 pthread_mutex_unlock(&mosq->state_mutex);
1020 pthread_mutex_lock(&mosq->callback_mutex);
1021 if(mosq->on_disconnect){
1022 mosq->in_callback = true;
1023 mosq->on_disconnect(mosq, mosq->userdata, rc);
1024 mosq->in_callback = false;
1025 }
1026 pthread_mutex_unlock(&mosq->callback_mutex);
1027 return MOSQ_ERR_CONN_LOST;
1028 }
1029 return MOSQ_ERR_SUCCESS;
1030}
1031
1032static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
1033{
1034 if(rc){
1035 _mosquitto_socket_close(mosq);
1036 pthread_mutex_lock(&mosq->state_mutex);
1037 if(mosq->state == mosq_cs_disconnecting){
1038 rc = MOSQ_ERR_SUCCESS;
1039 }
1040 pthread_mutex_unlock(&mosq->state_mutex);
1041 pthread_mutex_lock(&mosq->callback_mutex);
1042 if(mosq->on_disconnect){
1043 mosq->in_callback = true;
1044 mosq->on_disconnect(mosq, mosq->userdata, rc);
1045 mosq->in_callback = false;
1046 }
1047 pthread_mutex_unlock(&mosq->callback_mutex);
1048 return rc;
1049 }
1050 return rc;
1051}
1052
1053int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
1054{
1055 int rc;
1056 int i;
1057 if(max_packets < 1) return MOSQ_ERR_INVAL;
1058
1059 pthread_mutex_lock(&mosq->out_message_mutex);
1060 max_packets = mosq->out_queue_len;
1061 pthread_mutex_unlock(&mosq->out_message_mutex);
1062
1063 pthread_mutex_lock(&mosq->in_message_mutex);
1064 max_packets += mosq->in_queue_len;
1065 pthread_mutex_unlock(&mosq->in_message_mutex);
1066
1067 if(max_packets < 1) max_packets = 1;
1068 /* Queue len here tells us how many messages are awaiting processing and
1069 * have QoS > 0. We should try to deal with that many in this loop in order
1070 * to keep up. */
1071 for(i=0; i<max_packets; i++){
1072 rc = _mosquitto_packet_read(mosq);
1073 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1074 return _mosquitto_loop_rc_handle(mosq, rc);
1075 }
1076 }
1077 return rc;
1078}
1079
1080int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
1081{
1082 int rc;
1083 int i;
1084 if(max_packets < 1) return MOSQ_ERR_INVAL;
1085
1086 pthread_mutex_lock(&mosq->out_message_mutex);
1087 max_packets = mosq->out_queue_len;
1088 pthread_mutex_unlock(&mosq->out_message_mutex);
1089
1090 pthread_mutex_lock(&mosq->in_message_mutex);
1091 max_packets += mosq->in_queue_len;
1092 pthread_mutex_unlock(&mosq->in_message_mutex);
1093
1094 if(max_packets < 1) max_packets = 1;
1095 /* Queue len here tells us how many messages are awaiting processing and
1096 * have QoS > 0. We should try to deal with that many in this loop in order
1097 * to keep up. */
1098 for(i=0; i<max_packets; i++){
1099 rc = _mosquitto_packet_write(mosq);
1100 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1101 return _mosquitto_loop_rc_handle(mosq, rc);
1102 }
1103 }
1104 return rc;
1105}
1106
1107bool mosquitto_want_write(struct mosquitto *mosq)
1108{
1109 if(mosq->out_packet || mosq->current_out_packet){
1110 return true;
1111#ifdef WITH_TLS
1112 }else if(mosq->ssl && mosq->want_write){
1113 return true;
1114#endif
1115 }else{
1116 return false;
1117 }
1118}
1119
1120void mosquitto_connect_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int))
1121{
1122 pthread_mutex_lock(&mosq->callback_mutex);
1123 mosq->on_connect = on_connect;
1124 pthread_mutex_unlock(&mosq->callback_mutex);
1125}
1126
1127void mosquitto_disconnect_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int))
1128{
1129 pthread_mutex_lock(&mosq->callback_mutex);
1130 mosq->on_disconnect = on_disconnect;
1131 pthread_mutex_unlock(&mosq->callback_mutex);
1132}
1133
1134void mosquitto_publish_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int))
1135{
1136 pthread_mutex_lock(&mosq->callback_mutex);
1137 mosq->on_publish = on_publish;
1138 pthread_mutex_unlock(&mosq->callback_mutex);
1139}
1140
1141void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *))
1142{
1143 pthread_mutex_lock(&mosq->callback_mutex);
1144 mosq->on_message = on_message;
1145 pthread_mutex_unlock(&mosq->callback_mutex);
1146}
1147
1148void mosquitto_subscribe_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *))
1149{
1150 pthread_mutex_lock(&mosq->callback_mutex);
1151 mosq->on_subscribe = on_subscribe;
1152 pthread_mutex_unlock(&mosq->callback_mutex);
1153}
1154
1155void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int))
1156{
1157 pthread_mutex_lock(&mosq->callback_mutex);
1158 mosq->on_unsubscribe = on_unsubscribe;
1159 pthread_mutex_unlock(&mosq->callback_mutex);
1160}
1161
1162void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
1163{
1164 pthread_mutex_lock(&mosq->log_callback_mutex);
1165 mosq->on_log = on_log;
1166 pthread_mutex_unlock(&mosq->log_callback_mutex);
1167}
1168
1169void mosquitto_user_data_set(struct mosquitto *mosq, void *userdata)
1170{
1171 if(mosq){
1172 mosq->userdata = userdata;
1173 }
1174}
1175
1176const char *mosquitto_strerror(int mosq_errno)
1177{
1178 switch(mosq_errno){
1179 case MOSQ_ERR_SUCCESS:
1180 return "No error.";
1181 case MOSQ_ERR_NOMEM:
1182 return "Out of memory.";
1183 case MOSQ_ERR_PROTOCOL:
1184 return "A network protocol error occurred when communicating with the broker.";
1185 case MOSQ_ERR_INVAL:
1186 return "Invalid function arguments provided.";
1187 case MOSQ_ERR_NO_CONN:
1188 return "The client is not currently connected.";
1189 case MOSQ_ERR_CONN_REFUSED:
1190 return "The connection was refused.";
1191 case MOSQ_ERR_NOT_FOUND:
1192 return "Message not found (internal error).";
1193 case MOSQ_ERR_CONN_LOST:
1194 return "The connection was lost.";
1195 case MOSQ_ERR_TLS:
1196 return "A TLS error occurred.";
1197 case MOSQ_ERR_PAYLOAD_SIZE:
1198 return "Payload too large.";
1199 case MOSQ_ERR_NOT_SUPPORTED:
1200 return "This feature is not supported.";
1201 case MOSQ_ERR_AUTH:
1202 return "Authorisation failed.";
1203 case MOSQ_ERR_ACL_DENIED:
1204 return "Access denied by ACL.";
1205 case MOSQ_ERR_UNKNOWN:
1206 return "Unknown error.";
1207 case MOSQ_ERR_ERRNO:
1208 return strerror(errno);
1209 default:
1210 return "Unknown error.";
1211 }
1212}
1213
1214const char *mosquitto_connack_string(int connack_code)
1215{
1216 switch(connack_code){
1217 case 0:
1218 return "Connection Accepted.";
1219 case 1:
1220 return "Connection Refused: unacceptable protocol version.";
1221 case 2:
1222 return "Connection Refused: identifier rejected.";
1223 case 3:
1224 return "Connection Refused: broker unavailable.";
1225 case 4:
1226 return "Connection Refused: bad user name or password.";
1227 case 5:
1228 return "Connection Refused: not authorised.";
1229 default:
1230 return "Connection Refused: unknown reason.";
1231 }
1232}
1233
1234int mosquitto_sub_topic_tokenise(const char *subtopic, char ***topics, int *count)
1235{
1236 int len;
1237 int hier_count = 1;
1238 int start, stop;
1239 int hier;
1240 int tlen;
1241 int i, j;
1242
1243 if(!subtopic || !topics || !count) return MOSQ_ERR_INVAL;
1244
1245 len = strlen(subtopic);
1246
1247 for(i=0; i<len; i++){
1248 if(subtopic[i] == '/'){
1249 if(i > len-1){
1250 /* Separator at end of line */
1251 }else{
1252 hier_count++;
1253 }
1254 }
1255 }
1256
1257 (*topics) = _mosquitto_calloc(hier_count, sizeof(char *));
1258 if(!(*topics)) return MOSQ_ERR_NOMEM;
1259
1260 start = 0;
1261 stop = 0;
1262 hier = 0;
1263
1264 for(i=0; i<len+1; i++){
1265 if(subtopic[i] == '/' || subtopic[i] == '\0'){
1266 stop = i;
1267 if(start != stop){
1268 tlen = stop-start + 1;
1269 (*topics)[hier] = _mosquitto_calloc(tlen, sizeof(char));
1270 if(!(*topics)[hier]){
1271 for(i=0; i<hier_count; i++){
1272 if((*topics)[hier]){
1273 _mosquitto_free((*topics)[hier]);
1274 }
1275 }
1276 _mosquitto_free((*topics));
1277 return MOSQ_ERR_NOMEM;
1278 }
1279 for(j=start; j<stop; j++){
1280 (*topics)[hier][j-start] = subtopic[j];
1281 }
1282 }
1283 start = i+1;
1284 hier++;
1285 }
1286 }
1287
1288 *count = hier_count;
1289
1290 return MOSQ_ERR_SUCCESS;
1291}
1292
1293int mosquitto_sub_topic_tokens_free(char ***topics, int count)
1294{
1295 int i;
1296
1297 if(!topics || !(*topics) || count<1) return MOSQ_ERR_INVAL;
1298
1299 for(i=0; i<count; i++){
1300 if((*topics)[i]) _mosquitto_free((*topics)[i]);
1301 }
1302 _mosquitto_free(*topics);
1303
1304 return MOSQ_ERR_SUCCESS;
1305}
1306