UNPKG

29.5 kBtext/x-cView Raw
1/*
2Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <string.h>
35#ifndef WIN32
36#include <netdb.h>
37#include <sys/socket.h>
38#include <unistd.h>
39#else
40#include <winsock2.h>
41#include <ws2tcpip.h>
42#endif
43
44#ifdef __ANDROID__
45#include <linux/in.h>
46#include <linux/in6.h>
47#include <sys/endian.h>
48#endif
49
50#ifdef __FreeBSD__
51# include <netinet/in.h>
52#endif
53
54#ifdef __SYMBIAN32__
55#include <netinet/in.h>
56#endif
57
58#ifdef __QNX__
59#ifndef AI_ADDRCONFIG
60#define AI_ADDRCONFIG 0
61#endif
62#include <net/netbyte.h>
63#include <netinet/in.h>
64#endif
65
66#ifdef WITH_TLS
67#include <openssl/err.h>
68#include <tls_mosq.h>
69#endif
70
71#ifdef WITH_BROKER
72# include <mosquitto_broker.h>
73# ifdef WITH_SYS_TREE
74 extern uint64_t g_bytes_received;
75 extern uint64_t g_bytes_sent;
76 extern unsigned long g_msgs_received;
77 extern unsigned long g_msgs_sent;
78 extern unsigned long g_pub_msgs_received;
79 extern unsigned long g_pub_msgs_sent;
80# endif
81#else
82# include <read_handle.h>
83#endif
84
85#include "logging_mosq.h"
86#include "memory_mosq.h"
87#include "mqtt3_protocol.h"
88#include "net_mosq.h"
89#include "time_mosq.h"
90#include "util_mosq.h"
91
92#ifdef WITH_TLS
93int tls_ex_index_mosq = -1;
94#endif
95
96void _mosquitto_net_init(void)
97{
98#ifdef WIN32
99 WSADATA wsaData;
100 WSAStartup(MAKEWORD(2,2), &wsaData);
101#endif
102
103#ifdef WITH_SRV
104 ares_library_init(ARES_LIB_INIT_ALL);
105#endif
106
107#ifdef WITH_TLS
108 SSL_load_error_strings();
109 SSL_library_init();
110 OpenSSL_add_all_algorithms();
111 if(tls_ex_index_mosq == -1){
112 tls_ex_index_mosq = SSL_get_ex_new_index(0, "client context", NULL, NULL, NULL);
113 }
114#endif
115}
116
117void _mosquitto_net_cleanup(void)
118{
119#ifdef WITH_TLS
120 ERR_free_strings();
121 EVP_cleanup();
122 CRYPTO_cleanup_all_ex_data();
123#endif
124
125#ifdef WITH_SRV
126 ares_library_cleanup();
127#endif
128
129#ifdef WIN32
130 WSACleanup();
131#endif
132}
133
134void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet)
135{
136 if(!packet) return;
137
138 /* Free data and reset values */
139 packet->command = 0;
140 packet->have_remaining = 0;
141 packet->remaining_count = 0;
142 packet->remaining_mult = 1;
143 packet->remaining_length = 0;
144 if(packet->payload) _mosquitto_free(packet->payload);
145 packet->payload = NULL;
146 packet->to_process = 0;
147 packet->pos = 0;
148}
149
150int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
151{
152#ifndef WITH_BROKER
153 char sockpair_data = 0;
154#endif
155 assert(mosq);
156 assert(packet);
157
158 packet->pos = 0;
159 packet->to_process = packet->packet_length;
160
161 packet->next = NULL;
162 pthread_mutex_lock(&mosq->out_packet_mutex);
163 if(mosq->out_packet){
164 mosq->out_packet_last->next = packet;
165 }else{
166 mosq->out_packet = packet;
167 }
168 mosq->out_packet_last = packet;
169 pthread_mutex_unlock(&mosq->out_packet_mutex);
170#ifdef WITH_BROKER
171 return _mosquitto_packet_write(mosq);
172#else
173
174 /* Write a single byte to sockpairW (connected to sockpairR) to break out
175 * of select() if in threaded mode. */
176 if(mosq->sockpairW != INVALID_SOCKET){
177#ifndef WIN32
178 if(write(mosq->sockpairW, &sockpair_data, 1)){
179 }
180#else
181 send(mosq->sockpairW, &sockpair_data, 1, 0);
182#endif
183 }
184
185 if(mosq->in_callback == false && mosq->threaded == false){
186 return _mosquitto_packet_write(mosq);
187 }else{
188 return MOSQ_ERR_SUCCESS;
189 }
190#endif
191}
192
193/* Close a socket associated with a context and set it to -1.
194 * Returns 1 on failure (context is NULL)
195 * Returns 0 on success.
196 */
197int _mosquitto_socket_close(struct mosquitto *mosq)
198{
199 int rc = 0;
200
201 assert(mosq);
202#ifdef WITH_TLS
203 if(mosq->ssl){
204 SSL_shutdown(mosq->ssl);
205 SSL_free(mosq->ssl);
206 mosq->ssl = NULL;
207 }
208 if(mosq->ssl_ctx){
209 SSL_CTX_free(mosq->ssl_ctx);
210 mosq->ssl_ctx = NULL;
211 }
212#endif
213
214 if(mosq->sock != INVALID_SOCKET){
215 rc = COMPAT_CLOSE(mosq->sock);
216 mosq->sock = INVALID_SOCKET;
217 }
218
219 return rc;
220}
221
222#ifdef REAL_WITH_TLS_PSK
223static unsigned int psk_client_callback(SSL *ssl, const char *hint,
224 char *identity, unsigned int max_identity_len,
225 unsigned char *psk, unsigned int max_psk_len)
226{
227 struct mosquitto *mosq;
228 int len;
229
230 mosq = SSL_get_ex_data(ssl, tls_ex_index_mosq);
231 if(!mosq) return 0;
232
233 snprintf(identity, max_identity_len, "%s", mosq->tls_psk_identity);
234
235 len = _mosquitto_hex2bin(mosq->tls_psk, psk, max_psk_len);
236 if (len < 0) return 0;
237 return len;
238}
239#endif
240
241int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
242{
243 struct addrinfo hints;
244 struct addrinfo *ainfo, *rp;
245 struct addrinfo *ainfo_bind, *rp_bind;
246 int s;
247 int rc;
248#ifdef WIN32
249 uint32_t val = 1;
250#endif
251
252 *sock = INVALID_SOCKET;
253 memset(&hints, 0, sizeof(struct addrinfo));
254 hints.ai_family = PF_UNSPEC;
255 hints.ai_flags = AI_ADDRCONFIG;
256 hints.ai_socktype = SOCK_STREAM;
257
258 s = getaddrinfo(host, NULL, &hints, &ainfo);
259 if(s){
260 errno = s;
261 return MOSQ_ERR_EAI;
262 }
263
264 if(bind_address){
265 s = getaddrinfo(bind_address, NULL, &hints, &ainfo_bind);
266 if(s){
267 freeaddrinfo(ainfo);
268 errno = s;
269 return MOSQ_ERR_EAI;
270 }
271 }
272
273 for(rp = ainfo; rp != NULL; rp = rp->ai_next){
274 *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
275 if(*sock == INVALID_SOCKET) continue;
276
277 if(rp->ai_family == PF_INET){
278 ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
279 }else if(rp->ai_family == PF_INET6){
280 ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
281 }else{
282 continue;
283 }
284
285 if(bind_address){
286 for(rp_bind = ainfo_bind; rp_bind != NULL; rp_bind = rp_bind->ai_next){
287 if(bind(*sock, rp_bind->ai_addr, rp_bind->ai_addrlen) == 0){
288 break;
289 }
290 }
291 if(!rp_bind){
292 COMPAT_CLOSE(*sock);
293 continue;
294 }
295 }
296
297 if(!blocking){
298 /* Set non-blocking */
299 if(_mosquitto_socket_nonblock(*sock)){
300 COMPAT_CLOSE(*sock);
301 continue;
302 }
303 }
304
305 rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
306#ifdef WIN32
307 errno = WSAGetLastError();
308#endif
309 if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
310 if(blocking){
311 /* Set non-blocking */
312 if(_mosquitto_socket_nonblock(*sock)){
313 COMPAT_CLOSE(*sock);
314 continue;
315 }
316 }
317 break;
318 }
319
320 COMPAT_CLOSE(*sock);
321 *sock = INVALID_SOCKET;
322 }
323 freeaddrinfo(ainfo);
324 if(bind_address){
325 freeaddrinfo(ainfo_bind);
326 }
327 if(!rp){
328 return MOSQ_ERR_ERRNO;
329 }
330 return MOSQ_ERR_SUCCESS;
331}
332
333/* Create a socket and connect it to 'ip' on port 'port'.
334 * Returns -1 on failure (ip is NULL, socket creation/connection error)
335 * Returns sock number on success.
336 */
337int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
338{
339 int sock = INVALID_SOCKET;
340 int rc;
341#ifdef WITH_TLS
342 int ret;
343 BIO *bio;
344#endif
345
346 if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
347
348#ifdef WITH_TLS
349 if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
350 blocking = true;
351 }
352#endif
353
354 rc = _mosquitto_try_connect(host, port, &sock, bind_address, blocking);
355 if(rc != MOSQ_ERR_SUCCESS) return rc;
356
357#ifdef WITH_TLS
358 if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
359#if OPENSSL_VERSION_NUMBER >= 0x10001000L
360 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1.2")){
361 mosq->ssl_ctx = SSL_CTX_new(TLSv1_2_client_method());
362 }else if(!strcmp(mosq->tls_version, "tlsv1.1")){
363 mosq->ssl_ctx = SSL_CTX_new(TLSv1_1_client_method());
364 }else if(!strcmp(mosq->tls_version, "tlsv1")){
365 mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
366 }else{
367 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
368 COMPAT_CLOSE(sock);
369 return MOSQ_ERR_INVAL;
370 }
371#else
372 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1")){
373 mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
374 }else{
375 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
376 COMPAT_CLOSE(sock);
377 return MOSQ_ERR_INVAL;
378 }
379#endif
380 if(!mosq->ssl_ctx){
381 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
382 COMPAT_CLOSE(sock);
383 return MOSQ_ERR_TLS;
384 }
385
386#if OPENSSL_VERSION_NUMBER >= 0x10000000
387 /* Disable compression */
388 SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
389#endif
390#ifdef SSL_MODE_RELEASE_BUFFERS
391 /* Use even less memory per SSL connection. */
392 SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
393#endif
394
395 if(mosq->tls_ciphers){
396 ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
397 if(ret == 0){
398 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
399 COMPAT_CLOSE(sock);
400 return MOSQ_ERR_TLS;
401 }
402 }
403 if(mosq->tls_cafile || mosq->tls_capath){
404 ret = SSL_CTX_load_verify_locations(mosq->ssl_ctx, mosq->tls_cafile, mosq->tls_capath);
405 if(ret == 0){
406#ifdef WITH_BROKER
407 if(mosq->tls_cafile && mosq->tls_capath){
408 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\" and bridge_capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
409 }else if(mosq->tls_cafile){
410 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\".", mosq->tls_cafile);
411 }else{
412 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_capath \"%s\".", mosq->tls_capath);
413 }
414#else
415 if(mosq->tls_cafile && mosq->tls_capath){
416 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\" and capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
417 }else if(mosq->tls_cafile){
418 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\".", mosq->tls_cafile);
419 }else{
420 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
421 }
422#endif
423 COMPAT_CLOSE(sock);
424 return MOSQ_ERR_TLS;
425 }
426 if(mosq->tls_cert_reqs == 0){
427 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_NONE, NULL);
428 }else{
429 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_PEER, _mosquitto_server_certificate_verify);
430 }
431
432 if(mosq->tls_pw_callback){
433 SSL_CTX_set_default_passwd_cb(mosq->ssl_ctx, mosq->tls_pw_callback);
434 SSL_CTX_set_default_passwd_cb_userdata(mosq->ssl_ctx, mosq);
435 }
436
437 if(mosq->tls_certfile){
438 ret = SSL_CTX_use_certificate_chain_file(mosq->ssl_ctx, mosq->tls_certfile);
439 if(ret != 1){
440#ifdef WITH_BROKER
441 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate, check bridge_certfile \"%s\".", mosq->tls_certfile);
442#else
443 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
444#endif
445 COMPAT_CLOSE(sock);
446 return MOSQ_ERR_TLS;
447 }
448 }
449 if(mosq->tls_keyfile){
450 ret = SSL_CTX_use_PrivateKey_file(mosq->ssl_ctx, mosq->tls_keyfile, SSL_FILETYPE_PEM);
451 if(ret != 1){
452#ifdef WITH_BROKER
453 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file, check bridge_keyfile \"%s\".", mosq->tls_keyfile);
454#else
455 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
456#endif
457 COMPAT_CLOSE(sock);
458 return MOSQ_ERR_TLS;
459 }
460 ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
461 if(ret != 1){
462 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
463 COMPAT_CLOSE(sock);
464 return MOSQ_ERR_TLS;
465 }
466 }
467#ifdef REAL_WITH_TLS_PSK
468 }else if(mosq->tls_psk){
469 SSL_CTX_set_psk_client_callback(mosq->ssl_ctx, psk_client_callback);
470#endif
471 }
472
473 mosq->ssl = SSL_new(mosq->ssl_ctx);
474 if(!mosq->ssl){
475 COMPAT_CLOSE(sock);
476 return MOSQ_ERR_TLS;
477 }
478 SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
479 bio = BIO_new_socket(sock, BIO_NOCLOSE);
480 if(!bio){
481 COMPAT_CLOSE(sock);
482 return MOSQ_ERR_TLS;
483 }
484 SSL_set_bio(mosq->ssl, bio, bio);
485
486 ret = SSL_connect(mosq->ssl);
487 if(ret != 1){
488 ret = SSL_get_error(mosq->ssl, ret);
489 if(ret == SSL_ERROR_WANT_READ){
490 /* We always try to read anyway */
491 }else if(ret == SSL_ERROR_WANT_WRITE){
492 mosq->want_write = true;
493 }else{
494 COMPAT_CLOSE(sock);
495 return MOSQ_ERR_TLS;
496 }
497 }
498 }
499#endif
500
501 mosq->sock = sock;
502
503 return MOSQ_ERR_SUCCESS;
504}
505
506int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte)
507{
508 assert(packet);
509 if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
510
511 *byte = packet->payload[packet->pos];
512 packet->pos++;
513
514 return MOSQ_ERR_SUCCESS;
515}
516
517void _mosquitto_write_byte(struct _mosquitto_packet *packet, uint8_t byte)
518{
519 assert(packet);
520 assert(packet->pos+1 <= packet->packet_length);
521
522 packet->payload[packet->pos] = byte;
523 packet->pos++;
524}
525
526int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count)
527{
528 assert(packet);
529 if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
530
531 memcpy(bytes, &(packet->payload[packet->pos]), count);
532 packet->pos += count;
533
534 return MOSQ_ERR_SUCCESS;
535}
536
537void _mosquitto_write_bytes(struct _mosquitto_packet *packet, const void *bytes, uint32_t count)
538{
539 assert(packet);
540 assert(packet->pos+count <= packet->packet_length);
541
542 memcpy(&(packet->payload[packet->pos]), bytes, count);
543 packet->pos += count;
544}
545
546int _mosquitto_read_string(struct _mosquitto_packet *packet, char **str)
547{
548 uint16_t len;
549 int rc;
550
551 assert(packet);
552 rc = _mosquitto_read_uint16(packet, &len);
553 if(rc) return rc;
554
555 if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
556
557 *str = _mosquitto_calloc(len+1, sizeof(char));
558 if(*str){
559 memcpy(*str, &(packet->payload[packet->pos]), len);
560 packet->pos += len;
561 }else{
562 return MOSQ_ERR_NOMEM;
563 }
564
565 return MOSQ_ERR_SUCCESS;
566}
567
568void _mosquitto_write_string(struct _mosquitto_packet *packet, const char *str, uint16_t length)
569{
570 assert(packet);
571 _mosquitto_write_uint16(packet, length);
572 _mosquitto_write_bytes(packet, str, length);
573}
574
575int _mosquitto_read_uint16(struct _mosquitto_packet *packet, uint16_t *word)
576{
577 uint8_t msb, lsb;
578
579 assert(packet);
580 if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
581
582 msb = packet->payload[packet->pos];
583 packet->pos++;
584 lsb = packet->payload[packet->pos];
585 packet->pos++;
586
587 *word = (msb<<8) + lsb;
588
589 return MOSQ_ERR_SUCCESS;
590}
591
592void _mosquitto_write_uint16(struct _mosquitto_packet *packet, uint16_t word)
593{
594 _mosquitto_write_byte(packet, MOSQ_MSB(word));
595 _mosquitto_write_byte(packet, MOSQ_LSB(word));
596}
597
598ssize_t _mosquitto_net_read(struct mosquitto *mosq, void *buf, size_t count)
599{
600#ifdef WITH_TLS
601 int ret;
602 int err;
603 char ebuf[256];
604 unsigned long e;
605#endif
606 assert(mosq);
607 errno = 0;
608#ifdef WITH_TLS
609 if(mosq->ssl){
610 ret = SSL_read(mosq->ssl, buf, count);
611 if(ret <= 0){
612 err = SSL_get_error(mosq->ssl, ret);
613 if(err == SSL_ERROR_WANT_READ){
614 ret = -1;
615 errno = EAGAIN;
616 }else if(err == SSL_ERROR_WANT_WRITE){
617 ret = -1;
618 mosq->want_write = true;
619 errno = EAGAIN;
620 }else{
621 e = ERR_get_error();
622 while(e){
623 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
624 e = ERR_get_error();
625 }
626 errno = EPROTO;
627 }
628 }
629 return (ssize_t )ret;
630 }else{
631 /* Call normal read/recv */
632
633#endif
634
635#ifndef WIN32
636 return read(mosq->sock, buf, count);
637#else
638 return recv(mosq->sock, buf, count, 0);
639#endif
640
641#ifdef WITH_TLS
642 }
643#endif
644}
645
646ssize_t _mosquitto_net_write(struct mosquitto *mosq, void *buf, size_t count)
647{
648#ifdef WITH_TLS
649 int ret;
650 int err;
651 char ebuf[256];
652 unsigned long e;
653#endif
654 assert(mosq);
655
656 errno = 0;
657#ifdef WITH_TLS
658 if(mosq->ssl){
659 ret = SSL_write(mosq->ssl, buf, count);
660 if(ret < 0){
661 err = SSL_get_error(mosq->ssl, ret);
662 if(err == SSL_ERROR_WANT_READ){
663 ret = -1;
664 errno = EAGAIN;
665 }else if(err == SSL_ERROR_WANT_WRITE){
666 ret = -1;
667 mosq->want_write = true;
668 errno = EAGAIN;
669 }else{
670 e = ERR_get_error();
671 while(e){
672 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
673 e = ERR_get_error();
674 }
675 errno = EPROTO;
676 }
677 }
678 return (ssize_t )ret;
679 }else{
680 /* Call normal write/send */
681#endif
682
683#ifndef WIN32
684 return write(mosq->sock, buf, count);
685#else
686 return send(mosq->sock, buf, count, 0);
687#endif
688
689#ifdef WITH_TLS
690 }
691#endif
692}
693
694int _mosquitto_packet_write(struct mosquitto *mosq)
695{
696 ssize_t write_length;
697 struct _mosquitto_packet *packet;
698
699 if(!mosq) return MOSQ_ERR_INVAL;
700 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
701
702 pthread_mutex_lock(&mosq->current_out_packet_mutex);
703 pthread_mutex_lock(&mosq->out_packet_mutex);
704 if(mosq->out_packet && !mosq->current_out_packet){
705 mosq->current_out_packet = mosq->out_packet;
706 mosq->out_packet = mosq->out_packet->next;
707 if(!mosq->out_packet){
708 mosq->out_packet_last = NULL;
709 }
710 }
711 pthread_mutex_unlock(&mosq->out_packet_mutex);
712
713 while(mosq->current_out_packet){
714 packet = mosq->current_out_packet;
715
716 while(packet->to_process > 0){
717 write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
718 if(write_length > 0){
719#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
720 g_bytes_sent += write_length;
721#endif
722 packet->to_process -= write_length;
723 packet->pos += write_length;
724 }else{
725#ifdef WIN32
726 errno = WSAGetLastError();
727#endif
728 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
729 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
730 return MOSQ_ERR_SUCCESS;
731 }else{
732 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
733 switch(errno){
734 case COMPAT_ECONNRESET:
735 return MOSQ_ERR_CONN_LOST;
736 default:
737 return MOSQ_ERR_ERRNO;
738 }
739 }
740 }
741 }
742
743#ifdef WITH_BROKER
744# ifdef WITH_SYS_TREE
745 g_msgs_sent++;
746 if(((packet->command)&0xF6) == PUBLISH){
747 g_pub_msgs_sent++;
748 }
749# endif
750#else
751 if(((packet->command)&0xF6) == PUBLISH){
752 pthread_mutex_lock(&mosq->callback_mutex);
753 if(mosq->on_publish){
754 /* This is a QoS=0 message */
755 mosq->in_callback = true;
756 mosq->on_publish(mosq, mosq->userdata, packet->mid);
757 mosq->in_callback = false;
758 }
759 pthread_mutex_unlock(&mosq->callback_mutex);
760 }else if(((packet->command)&0xF0) == DISCONNECT){
761 /* FIXME what cleanup needs doing here?
762 * incoming/outgoing messages? */
763 _mosquitto_socket_close(mosq);
764
765 /* Start of duplicate, possibly unnecessary code.
766 * This does leave things in a consistent state at least. */
767 /* Free data and reset values */
768 pthread_mutex_lock(&mosq->out_packet_mutex);
769 mosq->current_out_packet = mosq->out_packet;
770 if(mosq->out_packet){
771 mosq->out_packet = mosq->out_packet->next;
772 if(!mosq->out_packet){
773 mosq->out_packet_last = NULL;
774 }
775 }
776 pthread_mutex_unlock(&mosq->out_packet_mutex);
777
778 _mosquitto_packet_cleanup(packet);
779 _mosquitto_free(packet);
780
781 pthread_mutex_lock(&mosq->msgtime_mutex);
782 mosq->last_msg_out = mosquitto_time();
783 pthread_mutex_unlock(&mosq->msgtime_mutex);
784 /* End of duplicate, possibly unnecessary code */
785
786 pthread_mutex_lock(&mosq->callback_mutex);
787 if(mosq->on_disconnect){
788 mosq->in_callback = true;
789 mosq->on_disconnect(mosq, mosq->userdata, 0);
790 mosq->in_callback = false;
791 }
792 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
793 return MOSQ_ERR_SUCCESS;
794 }
795#endif
796
797 /* Free data and reset values */
798 pthread_mutex_lock(&mosq->out_packet_mutex);
799 mosq->current_out_packet = mosq->out_packet;
800 if(mosq->out_packet){
801 mosq->out_packet = mosq->out_packet->next;
802 if(!mosq->out_packet){
803 mosq->out_packet_last = NULL;
804 }
805 }
806 pthread_mutex_unlock(&mosq->out_packet_mutex);
807
808 _mosquitto_packet_cleanup(packet);
809 _mosquitto_free(packet);
810
811 pthread_mutex_lock(&mosq->msgtime_mutex);
812 mosq->last_msg_out = mosquitto_time();
813 pthread_mutex_unlock(&mosq->msgtime_mutex);
814 }
815 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
816 return MOSQ_ERR_SUCCESS;
817}
818
819#ifdef WITH_BROKER
820int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
821#else
822int _mosquitto_packet_read(struct mosquitto *mosq)
823#endif
824{
825 uint8_t byte;
826 ssize_t read_length;
827 int rc = 0;
828
829 if(!mosq) return MOSQ_ERR_INVAL;
830 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
831 /* This gets called if pselect() indicates that there is network data
832 * available - ie. at least one byte. What we do depends on what data we
833 * already have.
834 * If we've not got a command, attempt to read one and save it. This should
835 * always work because it's only a single byte.
836 * Then try to read the remaining length. This may fail because it is may
837 * be more than one byte - will need to save data pending next read if it
838 * does fail.
839 * Then try to read the remaining payload, where 'payload' here means the
840 * combined variable header and actual payload. This is the most likely to
841 * fail due to longer length, so save current data and current position.
842 * After all data is read, send to _mosquitto_handle_packet() to deal with.
843 * Finally, free the memory and reset everything to starting conditions.
844 */
845 if(!mosq->in_packet.command){
846 read_length = _mosquitto_net_read(mosq, &byte, 1);
847 if(read_length == 1){
848 mosq->in_packet.command = byte;
849#ifdef WITH_BROKER
850# ifdef WITH_SYS_TREE
851 g_bytes_received++;
852# endif
853 /* Clients must send CONNECT as their first command. */
854 if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
855#endif
856 }else{
857 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
858#ifdef WIN32
859 errno = WSAGetLastError();
860#endif
861 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
862 return MOSQ_ERR_SUCCESS;
863 }else{
864 switch(errno){
865 case COMPAT_ECONNRESET:
866 return MOSQ_ERR_CONN_LOST;
867 default:
868 return MOSQ_ERR_ERRNO;
869 }
870 }
871 }
872 }
873 if(!mosq->in_packet.have_remaining){
874 do{
875 read_length = _mosquitto_net_read(mosq, &byte, 1);
876 if(read_length == 1){
877 mosq->in_packet.remaining_count++;
878 /* Max 4 bytes length for remaining length as defined by protocol.
879 * Anything more likely means a broken/malicious client.
880 */
881 if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
882
883#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
884 g_bytes_received++;
885#endif
886 mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
887 mosq->in_packet.remaining_mult *= 128;
888 }else{
889 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
890#ifdef WIN32
891 errno = WSAGetLastError();
892#endif
893 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
894 return MOSQ_ERR_SUCCESS;
895 }else{
896 switch(errno){
897 case COMPAT_ECONNRESET:
898 return MOSQ_ERR_CONN_LOST;
899 default:
900 return MOSQ_ERR_ERRNO;
901 }
902 }
903 }
904 }while((byte & 128) != 0);
905
906 if(mosq->in_packet.remaining_length > 0){
907 mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
908 if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
909 mosq->in_packet.to_process = mosq->in_packet.remaining_length;
910 }
911 mosq->in_packet.have_remaining = 1;
912 }
913 while(mosq->in_packet.to_process>0){
914 read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
915 if(read_length > 0){
916#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
917 g_bytes_received += read_length;
918#endif
919 mosq->in_packet.to_process -= read_length;
920 mosq->in_packet.pos += read_length;
921 }else{
922#ifdef WIN32
923 errno = WSAGetLastError();
924#endif
925 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
926 if(mosq->in_packet.to_process > 1000){
927 /* Update last_msg_in time if more than 1000 bytes left to
928 * receive. Helps when receiving large messages.
929 * This is an arbitrary limit, but with some consideration.
930 * If a client can't send 1000 bytes in a second it
931 * probably shouldn't be using a 1 second keep alive. */
932 pthread_mutex_lock(&mosq->msgtime_mutex);
933 mosq->last_msg_in = mosquitto_time();
934 pthread_mutex_unlock(&mosq->msgtime_mutex);
935 }
936 return MOSQ_ERR_SUCCESS;
937 }else{
938 switch(errno){
939 case COMPAT_ECONNRESET:
940 return MOSQ_ERR_CONN_LOST;
941 default:
942 return MOSQ_ERR_ERRNO;
943 }
944 }
945 }
946 }
947
948 /* All data for this packet is read. */
949 mosq->in_packet.pos = 0;
950#ifdef WITH_BROKER
951# ifdef WITH_SYS_TREE
952 g_msgs_received++;
953 if(((mosq->in_packet.command)&0xF5) == PUBLISH){
954 g_pub_msgs_received++;
955 }
956# endif
957 rc = mqtt3_packet_handle(db, mosq);
958#else
959 rc = _mosquitto_packet_handle(mosq);
960#endif
961
962 /* Free data and reset values */
963 _mosquitto_packet_cleanup(&mosq->in_packet);
964
965 pthread_mutex_lock(&mosq->msgtime_mutex);
966 mosq->last_msg_in = mosquitto_time();
967 pthread_mutex_unlock(&mosq->msgtime_mutex);
968 return rc;
969}
970
971int _mosquitto_socket_nonblock(int sock)
972{
973#ifndef WIN32
974 int opt;
975 /* Set non-blocking */
976 opt = fcntl(sock, F_GETFL, 0);
977 if(opt == -1){
978 COMPAT_CLOSE(sock);
979 return 1;
980 }
981 if(fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
982 /* If either fcntl fails, don't want to allow this client to connect. */
983 COMPAT_CLOSE(sock);
984 return 1;
985 }
986#else
987 opt = 1;
988 if(ioctlsocket(sock, FIONBIO, &opt)){
989 COMPAT_CLOSE(sock);
990 return 1;
991 }
992#endif
993 return 0;
994}
995
996
997#ifndef WITH_BROKER
998int _mosquitto_socketpair(int *pairR, int *pairW)
999{
1000#ifdef WIN32
1001 int family[2] = {AF_INET, AF_INET6};
1002 int i;
1003 struct sockaddr_storage ss;
1004 struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
1005 struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
1006 socklen_t ss_len;
1007 int spR, spW;
1008
1009 int listensock;
1010
1011 *pairR = -1;
1012 *pairW = -1;
1013
1014 for(i=0; i<2; i++){
1015 memset(&ss, 0, sizeof(ss));
1016 if(family[i] == AF_INET){
1017 sa->sin_family = family[i];
1018 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1019 sa->sin_port = 0;
1020 ss_len = sizeof(struct sockaddr_in);
1021 }else if(family[i] == AF_INET6){
1022 sa6->sin6_family = family[i];
1023 sa6->sin6_addr = in6addr_loopback;
1024 sa6->sin6_port = 0;
1025 ss_len = sizeof(struct sockaddr_in6);
1026 }else{
1027 return MOSQ_ERR_INVAL;
1028 }
1029
1030 listensock = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1031 if(listensock == -1){
1032 continue;
1033 }
1034
1035 if(bind(listensock, (struct sockaddr *)&ss, ss_len) == -1){
1036 COMPAT_CLOSE(listensock);
1037 continue;
1038 }
1039
1040 if(listen(listensock, 1) == -1){
1041 COMPAT_CLOSE(listensock);
1042 continue;
1043 }
1044 memset(&ss, 0, sizeof(ss));
1045 ss_len = sizeof(ss);
1046 if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
1047 COMPAT_CLOSE(listensock);
1048 continue;
1049 }
1050
1051 if(_mosquitto_socket_nonblock(listensock)){
1052 continue;
1053 }
1054
1055 if(family[i] == AF_INET){
1056 sa->sin_family = family[i];
1057 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1058 ss_len = sizeof(struct sockaddr_in);
1059 }else if(family[i] == AF_INET6){
1060 sa6->sin6_family = family[i];
1061 sa6->sin6_addr = in6addr_loopback;
1062 ss_len = sizeof(struct sockaddr_in6);
1063 }
1064
1065 spR = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1066 if(spR == -1){
1067 COMPAT_CLOSE(listensock);
1068 continue;
1069 }
1070 if(_mosquitto_socket_nonblock(spR)){
1071 COMPAT_CLOSE(listensock);
1072 continue;
1073 }
1074 if(connect(spR, (struct sockaddr *)&ss, ss_len) < 0){
1075#ifdef WIN32
1076 errno = WSAGetLastError();
1077#endif
1078 if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1079 COMPAT_CLOSE(spR);
1080 COMPAT_CLOSE(listensock);
1081 continue;
1082 }
1083 }
1084 spW = accept(listensock, NULL, 0);
1085 if(spW == -1){
1086#ifdef WIN32
1087 errno = WSAGetLastError();
1088#endif
1089 if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1090 COMPAT_CLOSE(spR);
1091 COMPAT_CLOSE(listensock);
1092 continue;
1093 }
1094 }
1095
1096 if(_mosquitto_socket_nonblock(spW)){
1097 COMPAT_CLOSE(spR);
1098 COMPAT_CLOSE(listensock);
1099 continue;
1100 }
1101 COMPAT_CLOSE(listensock);
1102
1103 *pairR = spR;
1104 *pairW = spW;
1105 return MOSQ_ERR_SUCCESS;
1106 }
1107 return MOSQ_ERR_UNKNOWN;
1108#else
1109 int sv[2];
1110
1111 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
1112 return MOSQ_ERR_ERRNO;
1113 }
1114 if(_mosquitto_socket_nonblock(sv[0])){
1115 COMPAT_CLOSE(sv[0]);
1116 COMPAT_CLOSE(sv[1]);
1117 return MOSQ_ERR_ERRNO;
1118 }
1119 if(_mosquitto_socket_nonblock(sv[1])){
1120 COMPAT_CLOSE(sv[0]);
1121 COMPAT_CLOSE(sv[1]);
1122 return MOSQ_ERR_ERRNO;
1123 }
1124 *pairR = sv[0];
1125 *pairW = sv[1];
1126 return MOSQ_ERR_SUCCESS;
1127#endif
1128}
1129#endif