1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #include <assert.h>
|
31 | #include <errno.h>
|
32 | #include <fcntl.h>
|
33 | #include <stdio.h>
|
34 | #include <string.h>
|
35 | #ifndef WIN32
|
36 | #include <netdb.h>
|
37 | #include <sys/socket.h>
|
38 | #include <unistd.h>
|
39 | #else
|
40 | #include <winsock2.h>
|
41 | #include <ws2tcpip.h>
|
42 | #endif
|
43 |
|
44 | #ifdef __ANDROID__
|
45 | #include <linux/in.h>
|
46 | #include <linux/in6.h>
|
47 | #include <sys/endian.h>
|
48 | #endif
|
49 |
|
50 | #ifdef __FreeBSD__
|
51 | # include <netinet/in.h>
|
52 | #endif
|
53 |
|
54 | #ifdef __SYMBIAN32__
|
55 | #include <netinet/in.h>
|
56 | #endif
|
57 |
|
58 | #ifdef __QNX__
|
59 | #ifndef AI_ADDRCONFIG
|
60 | #define AI_ADDRCONFIG 0
|
61 | #endif
|
62 | #include <net/netbyte.h>
|
63 | #include <netinet/in.h>
|
64 | #endif
|
65 |
|
66 | #ifdef WITH_TLS
|
67 | #include <openssl/err.h>
|
68 | #include <tls_mosq.h>
|
69 | #endif
|
70 |
|
71 | #ifdef WITH_BROKER
|
72 | # include <mosquitto_broker.h>
|
73 | # ifdef WITH_SYS_TREE
|
74 | extern uint64_t g_bytes_received;
|
75 | extern uint64_t g_bytes_sent;
|
76 | extern unsigned long g_msgs_received;
|
77 | extern unsigned long g_msgs_sent;
|
78 | extern unsigned long g_pub_msgs_received;
|
79 | extern unsigned long g_pub_msgs_sent;
|
80 | # endif
|
81 | #else
|
82 | # include <read_handle.h>
|
83 | #endif
|
84 |
|
85 | #include "logging_mosq.h"
|
86 | #include "memory_mosq.h"
|
87 | #include "mqtt3_protocol.h"
|
88 | #include "net_mosq.h"
|
89 | #include "time_mosq.h"
|
90 | #include "util_mosq.h"
|
91 |
|
92 | #ifdef WITH_TLS
|
93 | int tls_ex_index_mosq = -1;
|
94 | #endif
|
95 |
|
96 | void _mosquitto_net_init(void)
|
97 | {
|
98 | #ifdef WIN32
|
99 | WSADATA wsaData;
|
100 | WSAStartup(MAKEWORD(2,2), &wsaData);
|
101 | #endif
|
102 |
|
103 | #ifdef WITH_SRV
|
104 | ares_library_init(ARES_LIB_INIT_ALL);
|
105 | #endif
|
106 |
|
107 | #ifdef WITH_TLS
|
108 | SSL_load_error_strings();
|
109 | SSL_library_init();
|
110 | OpenSSL_add_all_algorithms();
|
111 | if(tls_ex_index_mosq == -1){
|
112 | tls_ex_index_mosq = SSL_get_ex_new_index(0, "client context", NULL, NULL, NULL);
|
113 | }
|
114 | #endif
|
115 | }
|
116 |
|
117 | void _mosquitto_net_cleanup(void)
|
118 | {
|
119 | #ifdef WITH_TLS
|
120 | ERR_free_strings();
|
121 | EVP_cleanup();
|
122 | CRYPTO_cleanup_all_ex_data();
|
123 | #endif
|
124 |
|
125 | #ifdef WITH_SRV
|
126 | ares_library_cleanup();
|
127 | #endif
|
128 |
|
129 | #ifdef WIN32
|
130 | WSACleanup();
|
131 | #endif
|
132 | }
|
133 |
|
134 | void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet)
|
135 | {
|
136 | if(!packet) return;
|
137 |
|
138 |
|
139 | packet->command = 0;
|
140 | packet->have_remaining = 0;
|
141 | packet->remaining_count = 0;
|
142 | packet->remaining_mult = 1;
|
143 | packet->remaining_length = 0;
|
144 | if(packet->payload) _mosquitto_free(packet->payload);
|
145 | packet->payload = NULL;
|
146 | packet->to_process = 0;
|
147 | packet->pos = 0;
|
148 | }
|
149 |
|
150 | int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
|
151 | {
|
152 | #ifndef WITH_BROKER
|
153 | char sockpair_data = 0;
|
154 | #endif
|
155 | assert(mosq);
|
156 | assert(packet);
|
157 |
|
158 | packet->pos = 0;
|
159 | packet->to_process = packet->packet_length;
|
160 |
|
161 | packet->next = NULL;
|
162 | pthread_mutex_lock(&mosq->out_packet_mutex);
|
163 | if(mosq->out_packet){
|
164 | mosq->out_packet_last->next = packet;
|
165 | }else{
|
166 | mosq->out_packet = packet;
|
167 | }
|
168 | mosq->out_packet_last = packet;
|
169 | pthread_mutex_unlock(&mosq->out_packet_mutex);
|
170 | #ifdef WITH_BROKER
|
171 | return _mosquitto_packet_write(mosq);
|
172 | #else
|
173 |
|
174 | |
175 |
|
176 | if(mosq->sockpairW != INVALID_SOCKET){
|
177 | #ifndef WIN32
|
178 | if(write(mosq->sockpairW, &sockpair_data, 1)){
|
179 | }
|
180 | #else
|
181 | send(mosq->sockpairW, &sockpair_data, 1, 0);
|
182 | #endif
|
183 | }
|
184 |
|
185 | if(mosq->in_callback == false && mosq->threaded == false){
|
186 | return _mosquitto_packet_write(mosq);
|
187 | }else{
|
188 | return MOSQ_ERR_SUCCESS;
|
189 | }
|
190 | #endif
|
191 | }
|
192 |
|
193 |
|
194 |
|
195 |
|
196 |
|
197 | int _mosquitto_socket_close(struct mosquitto *mosq)
|
198 | {
|
199 | int rc = 0;
|
200 |
|
201 | assert(mosq);
|
202 | #ifdef WITH_TLS
|
203 | if(mosq->ssl){
|
204 | SSL_shutdown(mosq->ssl);
|
205 | SSL_free(mosq->ssl);
|
206 | mosq->ssl = NULL;
|
207 | }
|
208 | if(mosq->ssl_ctx){
|
209 | SSL_CTX_free(mosq->ssl_ctx);
|
210 | mosq->ssl_ctx = NULL;
|
211 | }
|
212 | #endif
|
213 |
|
214 | if(mosq->sock != INVALID_SOCKET){
|
215 | rc = COMPAT_CLOSE(mosq->sock);
|
216 | mosq->sock = INVALID_SOCKET;
|
217 | }
|
218 |
|
219 | return rc;
|
220 | }
|
221 |
|
222 | #ifdef REAL_WITH_TLS_PSK
|
223 | static unsigned int psk_client_callback(SSL *ssl, const char *hint,
|
224 | char *identity, unsigned int max_identity_len,
|
225 | unsigned char *psk, unsigned int max_psk_len)
|
226 | {
|
227 | struct mosquitto *mosq;
|
228 | int len;
|
229 |
|
230 | mosq = SSL_get_ex_data(ssl, tls_ex_index_mosq);
|
231 | if(!mosq) return 0;
|
232 |
|
233 | snprintf(identity, max_identity_len, "%s", mosq->tls_psk_identity);
|
234 |
|
235 | len = _mosquitto_hex2bin(mosq->tls_psk, psk, max_psk_len);
|
236 | if (len < 0) return 0;
|
237 | return len;
|
238 | }
|
239 | #endif
|
240 |
|
241 | int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
|
242 | {
|
243 | struct addrinfo hints;
|
244 | struct addrinfo *ainfo, *rp;
|
245 | struct addrinfo *ainfo_bind, *rp_bind;
|
246 | int s;
|
247 | int rc;
|
248 | #ifdef WIN32
|
249 | uint32_t val = 1;
|
250 | #endif
|
251 |
|
252 | *sock = INVALID_SOCKET;
|
253 | memset(&hints, 0, sizeof(struct addrinfo));
|
254 | hints.ai_family = PF_UNSPEC;
|
255 | hints.ai_flags = AI_ADDRCONFIG;
|
256 | hints.ai_socktype = SOCK_STREAM;
|
257 |
|
258 | s = getaddrinfo(host, NULL, &hints, &ainfo);
|
259 | if(s){
|
260 | errno = s;
|
261 | return MOSQ_ERR_EAI;
|
262 | }
|
263 |
|
264 | if(bind_address){
|
265 | s = getaddrinfo(bind_address, NULL, &hints, &ainfo_bind);
|
266 | if(s){
|
267 | freeaddrinfo(ainfo);
|
268 | errno = s;
|
269 | return MOSQ_ERR_EAI;
|
270 | }
|
271 | }
|
272 |
|
273 | for(rp = ainfo; rp != NULL; rp = rp->ai_next){
|
274 | *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
275 | if(*sock == INVALID_SOCKET) continue;
|
276 |
|
277 | if(rp->ai_family == PF_INET){
|
278 | ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
|
279 | }else if(rp->ai_family == PF_INET6){
|
280 | ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
|
281 | }else{
|
282 | continue;
|
283 | }
|
284 |
|
285 | if(bind_address){
|
286 | for(rp_bind = ainfo_bind; rp_bind != NULL; rp_bind = rp_bind->ai_next){
|
287 | if(bind(*sock, rp_bind->ai_addr, rp_bind->ai_addrlen) == 0){
|
288 | break;
|
289 | }
|
290 | }
|
291 | if(!rp_bind){
|
292 | COMPAT_CLOSE(*sock);
|
293 | continue;
|
294 | }
|
295 | }
|
296 |
|
297 | if(!blocking){
|
298 |
|
299 | if(_mosquitto_socket_nonblock(*sock)){
|
300 | COMPAT_CLOSE(*sock);
|
301 | continue;
|
302 | }
|
303 | }
|
304 |
|
305 | rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
|
306 | #ifdef WIN32
|
307 | errno = WSAGetLastError();
|
308 | #endif
|
309 | if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
|
310 | if(blocking){
|
311 |
|
312 | if(_mosquitto_socket_nonblock(*sock)){
|
313 | COMPAT_CLOSE(*sock);
|
314 | continue;
|
315 | }
|
316 | }
|
317 | break;
|
318 | }
|
319 |
|
320 | COMPAT_CLOSE(*sock);
|
321 | *sock = INVALID_SOCKET;
|
322 | }
|
323 | freeaddrinfo(ainfo);
|
324 | if(bind_address){
|
325 | freeaddrinfo(ainfo_bind);
|
326 | }
|
327 | if(!rp){
|
328 | return MOSQ_ERR_ERRNO;
|
329 | }
|
330 | return MOSQ_ERR_SUCCESS;
|
331 | }
|
332 |
|
333 |
|
334 |
|
335 |
|
336 |
|
337 | int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
|
338 | {
|
339 | int sock = INVALID_SOCKET;
|
340 | int rc;
|
341 | #ifdef WITH_TLS
|
342 | int ret;
|
343 | BIO *bio;
|
344 | #endif
|
345 |
|
346 | if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
|
347 |
|
348 | #ifdef WITH_TLS
|
349 | if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
|
350 | blocking = true;
|
351 | }
|
352 | #endif
|
353 |
|
354 | rc = _mosquitto_try_connect(host, port, &sock, bind_address, blocking);
|
355 | if(rc != MOSQ_ERR_SUCCESS) return rc;
|
356 |
|
357 | #ifdef WITH_TLS
|
358 | if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
|
359 | #if OPENSSL_VERSION_NUMBER >= 0x10001000L
|
360 | if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1.2")){
|
361 | mosq->ssl_ctx = SSL_CTX_new(TLSv1_2_client_method());
|
362 | }else if(!strcmp(mosq->tls_version, "tlsv1.1")){
|
363 | mosq->ssl_ctx = SSL_CTX_new(TLSv1_1_client_method());
|
364 | }else if(!strcmp(mosq->tls_version, "tlsv1")){
|
365 | mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
|
366 | }else{
|
367 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
|
368 | COMPAT_CLOSE(sock);
|
369 | return MOSQ_ERR_INVAL;
|
370 | }
|
371 | #else
|
372 | if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1")){
|
373 | mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
|
374 | }else{
|
375 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
|
376 | COMPAT_CLOSE(sock);
|
377 | return MOSQ_ERR_INVAL;
|
378 | }
|
379 | #endif
|
380 | if(!mosq->ssl_ctx){
|
381 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
|
382 | COMPAT_CLOSE(sock);
|
383 | return MOSQ_ERR_TLS;
|
384 | }
|
385 |
|
386 | #if OPENSSL_VERSION_NUMBER >= 0x10000000
|
387 |
|
388 | SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
|
389 | #endif
|
390 | #ifdef SSL_MODE_RELEASE_BUFFERS
|
391 |
|
392 | SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
|
393 | #endif
|
394 |
|
395 | if(mosq->tls_ciphers){
|
396 | ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
|
397 | if(ret == 0){
|
398 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
|
399 | COMPAT_CLOSE(sock);
|
400 | return MOSQ_ERR_TLS;
|
401 | }
|
402 | }
|
403 | if(mosq->tls_cafile || mosq->tls_capath){
|
404 | ret = SSL_CTX_load_verify_locations(mosq->ssl_ctx, mosq->tls_cafile, mosq->tls_capath);
|
405 | if(ret == 0){
|
406 | #ifdef WITH_BROKER
|
407 | if(mosq->tls_cafile && mosq->tls_capath){
|
408 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\" and bridge_capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
|
409 | }else if(mosq->tls_cafile){
|
410 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\".", mosq->tls_cafile);
|
411 | }else{
|
412 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_capath \"%s\".", mosq->tls_capath);
|
413 | }
|
414 | #else
|
415 | if(mosq->tls_cafile && mosq->tls_capath){
|
416 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\" and capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
|
417 | }else if(mosq->tls_cafile){
|
418 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\".", mosq->tls_cafile);
|
419 | }else{
|
420 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
|
421 | }
|
422 | #endif
|
423 | COMPAT_CLOSE(sock);
|
424 | return MOSQ_ERR_TLS;
|
425 | }
|
426 | if(mosq->tls_cert_reqs == 0){
|
427 | SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_NONE, NULL);
|
428 | }else{
|
429 | SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_PEER, _mosquitto_server_certificate_verify);
|
430 | }
|
431 |
|
432 | if(mosq->tls_pw_callback){
|
433 | SSL_CTX_set_default_passwd_cb(mosq->ssl_ctx, mosq->tls_pw_callback);
|
434 | SSL_CTX_set_default_passwd_cb_userdata(mosq->ssl_ctx, mosq);
|
435 | }
|
436 |
|
437 | if(mosq->tls_certfile){
|
438 | ret = SSL_CTX_use_certificate_chain_file(mosq->ssl_ctx, mosq->tls_certfile);
|
439 | if(ret != 1){
|
440 | #ifdef WITH_BROKER
|
441 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate, check bridge_certfile \"%s\".", mosq->tls_certfile);
|
442 | #else
|
443 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
|
444 | #endif
|
445 | COMPAT_CLOSE(sock);
|
446 | return MOSQ_ERR_TLS;
|
447 | }
|
448 | }
|
449 | if(mosq->tls_keyfile){
|
450 | ret = SSL_CTX_use_PrivateKey_file(mosq->ssl_ctx, mosq->tls_keyfile, SSL_FILETYPE_PEM);
|
451 | if(ret != 1){
|
452 | #ifdef WITH_BROKER
|
453 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file, check bridge_keyfile \"%s\".", mosq->tls_keyfile);
|
454 | #else
|
455 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
|
456 | #endif
|
457 | COMPAT_CLOSE(sock);
|
458 | return MOSQ_ERR_TLS;
|
459 | }
|
460 | ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
|
461 | if(ret != 1){
|
462 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
|
463 | COMPAT_CLOSE(sock);
|
464 | return MOSQ_ERR_TLS;
|
465 | }
|
466 | }
|
467 | #ifdef REAL_WITH_TLS_PSK
|
468 | }else if(mosq->tls_psk){
|
469 | SSL_CTX_set_psk_client_callback(mosq->ssl_ctx, psk_client_callback);
|
470 | #endif
|
471 | }
|
472 |
|
473 | mosq->ssl = SSL_new(mosq->ssl_ctx);
|
474 | if(!mosq->ssl){
|
475 | COMPAT_CLOSE(sock);
|
476 | return MOSQ_ERR_TLS;
|
477 | }
|
478 | SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
|
479 | bio = BIO_new_socket(sock, BIO_NOCLOSE);
|
480 | if(!bio){
|
481 | COMPAT_CLOSE(sock);
|
482 | return MOSQ_ERR_TLS;
|
483 | }
|
484 | SSL_set_bio(mosq->ssl, bio, bio);
|
485 |
|
486 | ret = SSL_connect(mosq->ssl);
|
487 | if(ret != 1){
|
488 | ret = SSL_get_error(mosq->ssl, ret);
|
489 | if(ret == SSL_ERROR_WANT_READ){
|
490 |
|
491 | }else if(ret == SSL_ERROR_WANT_WRITE){
|
492 | mosq->want_write = true;
|
493 | }else{
|
494 | COMPAT_CLOSE(sock);
|
495 | return MOSQ_ERR_TLS;
|
496 | }
|
497 | }
|
498 | }
|
499 | #endif
|
500 |
|
501 | mosq->sock = sock;
|
502 |
|
503 | return MOSQ_ERR_SUCCESS;
|
504 | }
|
505 |
|
506 | int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte)
|
507 | {
|
508 | assert(packet);
|
509 | if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
|
510 |
|
511 | *byte = packet->payload[packet->pos];
|
512 | packet->pos++;
|
513 |
|
514 | return MOSQ_ERR_SUCCESS;
|
515 | }
|
516 |
|
517 | void _mosquitto_write_byte(struct _mosquitto_packet *packet, uint8_t byte)
|
518 | {
|
519 | assert(packet);
|
520 | assert(packet->pos+1 <= packet->packet_length);
|
521 |
|
522 | packet->payload[packet->pos] = byte;
|
523 | packet->pos++;
|
524 | }
|
525 |
|
526 | int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count)
|
527 | {
|
528 | assert(packet);
|
529 | if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
|
530 |
|
531 | memcpy(bytes, &(packet->payload[packet->pos]), count);
|
532 | packet->pos += count;
|
533 |
|
534 | return MOSQ_ERR_SUCCESS;
|
535 | }
|
536 |
|
537 | void _mosquitto_write_bytes(struct _mosquitto_packet *packet, const void *bytes, uint32_t count)
|
538 | {
|
539 | assert(packet);
|
540 | assert(packet->pos+count <= packet->packet_length);
|
541 |
|
542 | memcpy(&(packet->payload[packet->pos]), bytes, count);
|
543 | packet->pos += count;
|
544 | }
|
545 |
|
546 | int _mosquitto_read_string(struct _mosquitto_packet *packet, char **str)
|
547 | {
|
548 | uint16_t len;
|
549 | int rc;
|
550 |
|
551 | assert(packet);
|
552 | rc = _mosquitto_read_uint16(packet, &len);
|
553 | if(rc) return rc;
|
554 |
|
555 | if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
|
556 |
|
557 | *str = _mosquitto_calloc(len+1, sizeof(char));
|
558 | if(*str){
|
559 | memcpy(*str, &(packet->payload[packet->pos]), len);
|
560 | packet->pos += len;
|
561 | }else{
|
562 | return MOSQ_ERR_NOMEM;
|
563 | }
|
564 |
|
565 | return MOSQ_ERR_SUCCESS;
|
566 | }
|
567 |
|
568 | void _mosquitto_write_string(struct _mosquitto_packet *packet, const char *str, uint16_t length)
|
569 | {
|
570 | assert(packet);
|
571 | _mosquitto_write_uint16(packet, length);
|
572 | _mosquitto_write_bytes(packet, str, length);
|
573 | }
|
574 |
|
575 | int _mosquitto_read_uint16(struct _mosquitto_packet *packet, uint16_t *word)
|
576 | {
|
577 | uint8_t msb, lsb;
|
578 |
|
579 | assert(packet);
|
580 | if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
|
581 |
|
582 | msb = packet->payload[packet->pos];
|
583 | packet->pos++;
|
584 | lsb = packet->payload[packet->pos];
|
585 | packet->pos++;
|
586 |
|
587 | *word = (msb<<8) + lsb;
|
588 |
|
589 | return MOSQ_ERR_SUCCESS;
|
590 | }
|
591 |
|
592 | void _mosquitto_write_uint16(struct _mosquitto_packet *packet, uint16_t word)
|
593 | {
|
594 | _mosquitto_write_byte(packet, MOSQ_MSB(word));
|
595 | _mosquitto_write_byte(packet, MOSQ_LSB(word));
|
596 | }
|
597 |
|
598 | ssize_t _mosquitto_net_read(struct mosquitto *mosq, void *buf, size_t count)
|
599 | {
|
600 | #ifdef WITH_TLS
|
601 | int ret;
|
602 | int err;
|
603 | char ebuf[256];
|
604 | unsigned long e;
|
605 | #endif
|
606 | assert(mosq);
|
607 | errno = 0;
|
608 | #ifdef WITH_TLS
|
609 | if(mosq->ssl){
|
610 | ret = SSL_read(mosq->ssl, buf, count);
|
611 | if(ret <= 0){
|
612 | err = SSL_get_error(mosq->ssl, ret);
|
613 | if(err == SSL_ERROR_WANT_READ){
|
614 | ret = -1;
|
615 | errno = EAGAIN;
|
616 | }else if(err == SSL_ERROR_WANT_WRITE){
|
617 | ret = -1;
|
618 | mosq->want_write = true;
|
619 | errno = EAGAIN;
|
620 | }else{
|
621 | e = ERR_get_error();
|
622 | while(e){
|
623 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
|
624 | e = ERR_get_error();
|
625 | }
|
626 | errno = EPROTO;
|
627 | }
|
628 | }
|
629 | return (ssize_t )ret;
|
630 | }else{
|
631 |
|
632 |
|
633 | #endif
|
634 |
|
635 | #ifndef WIN32
|
636 | return read(mosq->sock, buf, count);
|
637 | #else
|
638 | return recv(mosq->sock, buf, count, 0);
|
639 | #endif
|
640 |
|
641 | #ifdef WITH_TLS
|
642 | }
|
643 | #endif
|
644 | }
|
645 |
|
646 | ssize_t _mosquitto_net_write(struct mosquitto *mosq, void *buf, size_t count)
|
647 | {
|
648 | #ifdef WITH_TLS
|
649 | int ret;
|
650 | int err;
|
651 | char ebuf[256];
|
652 | unsigned long e;
|
653 | #endif
|
654 | assert(mosq);
|
655 |
|
656 | errno = 0;
|
657 | #ifdef WITH_TLS
|
658 | if(mosq->ssl){
|
659 | ret = SSL_write(mosq->ssl, buf, count);
|
660 | if(ret < 0){
|
661 | err = SSL_get_error(mosq->ssl, ret);
|
662 | if(err == SSL_ERROR_WANT_READ){
|
663 | ret = -1;
|
664 | errno = EAGAIN;
|
665 | }else if(err == SSL_ERROR_WANT_WRITE){
|
666 | ret = -1;
|
667 | mosq->want_write = true;
|
668 | errno = EAGAIN;
|
669 | }else{
|
670 | e = ERR_get_error();
|
671 | while(e){
|
672 | _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
|
673 | e = ERR_get_error();
|
674 | }
|
675 | errno = EPROTO;
|
676 | }
|
677 | }
|
678 | return (ssize_t )ret;
|
679 | }else{
|
680 |
|
681 | #endif
|
682 |
|
683 | #ifndef WIN32
|
684 | return write(mosq->sock, buf, count);
|
685 | #else
|
686 | return send(mosq->sock, buf, count, 0);
|
687 | #endif
|
688 |
|
689 | #ifdef WITH_TLS
|
690 | }
|
691 | #endif
|
692 | }
|
693 |
|
694 | int _mosquitto_packet_write(struct mosquitto *mosq)
|
695 | {
|
696 | ssize_t write_length;
|
697 | struct _mosquitto_packet *packet;
|
698 |
|
699 | if(!mosq) return MOSQ_ERR_INVAL;
|
700 | if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
|
701 |
|
702 | pthread_mutex_lock(&mosq->current_out_packet_mutex);
|
703 | pthread_mutex_lock(&mosq->out_packet_mutex);
|
704 | if(mosq->out_packet && !mosq->current_out_packet){
|
705 | mosq->current_out_packet = mosq->out_packet;
|
706 | mosq->out_packet = mosq->out_packet->next;
|
707 | if(!mosq->out_packet){
|
708 | mosq->out_packet_last = NULL;
|
709 | }
|
710 | }
|
711 | pthread_mutex_unlock(&mosq->out_packet_mutex);
|
712 |
|
713 | while(mosq->current_out_packet){
|
714 | packet = mosq->current_out_packet;
|
715 |
|
716 | while(packet->to_process > 0){
|
717 | write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
|
718 | if(write_length > 0){
|
719 | #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
|
720 | g_bytes_sent += write_length;
|
721 | #endif
|
722 | packet->to_process -= write_length;
|
723 | packet->pos += write_length;
|
724 | }else{
|
725 | #ifdef WIN32
|
726 | errno = WSAGetLastError();
|
727 | #endif
|
728 | if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
|
729 | pthread_mutex_unlock(&mosq->current_out_packet_mutex);
|
730 | return MOSQ_ERR_SUCCESS;
|
731 | }else{
|
732 | pthread_mutex_unlock(&mosq->current_out_packet_mutex);
|
733 | switch(errno){
|
734 | case COMPAT_ECONNRESET:
|
735 | return MOSQ_ERR_CONN_LOST;
|
736 | default:
|
737 | return MOSQ_ERR_ERRNO;
|
738 | }
|
739 | }
|
740 | }
|
741 | }
|
742 |
|
743 | #ifdef WITH_BROKER
|
744 | # ifdef WITH_SYS_TREE
|
745 | g_msgs_sent++;
|
746 | if(((packet->command)&0xF6) == PUBLISH){
|
747 | g_pub_msgs_sent++;
|
748 | }
|
749 | # endif
|
750 | #else
|
751 | if(((packet->command)&0xF6) == PUBLISH){
|
752 | pthread_mutex_lock(&mosq->callback_mutex);
|
753 | if(mosq->on_publish){
|
754 |
|
755 | mosq->in_callback = true;
|
756 | mosq->on_publish(mosq, mosq->userdata, packet->mid);
|
757 | mosq->in_callback = false;
|
758 | }
|
759 | pthread_mutex_unlock(&mosq->callback_mutex);
|
760 | }else if(((packet->command)&0xF0) == DISCONNECT){
|
761 | |
762 |
|
763 | _mosquitto_socket_close(mosq);
|
764 |
|
765 | |
766 |
|
767 |
|
768 | pthread_mutex_lock(&mosq->out_packet_mutex);
|
769 | mosq->current_out_packet = mosq->out_packet;
|
770 | if(mosq->out_packet){
|
771 | mosq->out_packet = mosq->out_packet->next;
|
772 | if(!mosq->out_packet){
|
773 | mosq->out_packet_last = NULL;
|
774 | }
|
775 | }
|
776 | pthread_mutex_unlock(&mosq->out_packet_mutex);
|
777 |
|
778 | _mosquitto_packet_cleanup(packet);
|
779 | _mosquitto_free(packet);
|
780 |
|
781 | pthread_mutex_lock(&mosq->msgtime_mutex);
|
782 | mosq->last_msg_out = mosquitto_time();
|
783 | pthread_mutex_unlock(&mosq->msgtime_mutex);
|
784 |
|
785 |
|
786 | pthread_mutex_lock(&mosq->callback_mutex);
|
787 | if(mosq->on_disconnect){
|
788 | mosq->in_callback = true;
|
789 | mosq->on_disconnect(mosq, mosq->userdata, 0);
|
790 | mosq->in_callback = false;
|
791 | }
|
792 | pthread_mutex_unlock(&mosq->current_out_packet_mutex);
|
793 | return MOSQ_ERR_SUCCESS;
|
794 | }
|
795 | #endif
|
796 |
|
797 |
|
798 | pthread_mutex_lock(&mosq->out_packet_mutex);
|
799 | mosq->current_out_packet = mosq->out_packet;
|
800 | if(mosq->out_packet){
|
801 | mosq->out_packet = mosq->out_packet->next;
|
802 | if(!mosq->out_packet){
|
803 | mosq->out_packet_last = NULL;
|
804 | }
|
805 | }
|
806 | pthread_mutex_unlock(&mosq->out_packet_mutex);
|
807 |
|
808 | _mosquitto_packet_cleanup(packet);
|
809 | _mosquitto_free(packet);
|
810 |
|
811 | pthread_mutex_lock(&mosq->msgtime_mutex);
|
812 | mosq->last_msg_out = mosquitto_time();
|
813 | pthread_mutex_unlock(&mosq->msgtime_mutex);
|
814 | }
|
815 | pthread_mutex_unlock(&mosq->current_out_packet_mutex);
|
816 | return MOSQ_ERR_SUCCESS;
|
817 | }
|
818 |
|
819 | #ifdef WITH_BROKER
|
820 | int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
|
821 | #else
|
822 | int _mosquitto_packet_read(struct mosquitto *mosq)
|
823 | #endif
|
824 | {
|
825 | uint8_t byte;
|
826 | ssize_t read_length;
|
827 | int rc = 0;
|
828 |
|
829 | if(!mosq) return MOSQ_ERR_INVAL;
|
830 | if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
|
831 | |
832 |
|
833 |
|
834 |
|
835 |
|
836 |
|
837 |
|
838 |
|
839 |
|
840 |
|
841 |
|
842 |
|
843 |
|
844 |
|
845 | if(!mosq->in_packet.command){
|
846 | read_length = _mosquitto_net_read(mosq, &byte, 1);
|
847 | if(read_length == 1){
|
848 | mosq->in_packet.command = byte;
|
849 | #ifdef WITH_BROKER
|
850 | # ifdef WITH_SYS_TREE
|
851 | g_bytes_received++;
|
852 | # endif
|
853 |
|
854 | if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
|
855 | #endif
|
856 | }else{
|
857 | if(read_length == 0) return MOSQ_ERR_CONN_LOST;
|
858 | #ifdef WIN32
|
859 | errno = WSAGetLastError();
|
860 | #endif
|
861 | if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
|
862 | return MOSQ_ERR_SUCCESS;
|
863 | }else{
|
864 | switch(errno){
|
865 | case COMPAT_ECONNRESET:
|
866 | return MOSQ_ERR_CONN_LOST;
|
867 | default:
|
868 | return MOSQ_ERR_ERRNO;
|
869 | }
|
870 | }
|
871 | }
|
872 | }
|
873 | if(!mosq->in_packet.have_remaining){
|
874 | do{
|
875 | read_length = _mosquitto_net_read(mosq, &byte, 1);
|
876 | if(read_length == 1){
|
877 | mosq->in_packet.remaining_count++;
|
878 | |
879 |
|
880 |
|
881 | if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
|
882 |
|
883 | #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
|
884 | g_bytes_received++;
|
885 | #endif
|
886 | mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
|
887 | mosq->in_packet.remaining_mult *= 128;
|
888 | }else{
|
889 | if(read_length == 0) return MOSQ_ERR_CONN_LOST;
|
890 | #ifdef WIN32
|
891 | errno = WSAGetLastError();
|
892 | #endif
|
893 | if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
|
894 | return MOSQ_ERR_SUCCESS;
|
895 | }else{
|
896 | switch(errno){
|
897 | case COMPAT_ECONNRESET:
|
898 | return MOSQ_ERR_CONN_LOST;
|
899 | default:
|
900 | return MOSQ_ERR_ERRNO;
|
901 | }
|
902 | }
|
903 | }
|
904 | }while((byte & 128) != 0);
|
905 |
|
906 | if(mosq->in_packet.remaining_length > 0){
|
907 | mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
|
908 | if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
|
909 | mosq->in_packet.to_process = mosq->in_packet.remaining_length;
|
910 | }
|
911 | mosq->in_packet.have_remaining = 1;
|
912 | }
|
913 | while(mosq->in_packet.to_process>0){
|
914 | read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
|
915 | if(read_length > 0){
|
916 | #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
|
917 | g_bytes_received += read_length;
|
918 | #endif
|
919 | mosq->in_packet.to_process -= read_length;
|
920 | mosq->in_packet.pos += read_length;
|
921 | }else{
|
922 | #ifdef WIN32
|
923 | errno = WSAGetLastError();
|
924 | #endif
|
925 | if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
|
926 | if(mosq->in_packet.to_process > 1000){
|
927 | |
928 |
|
929 |
|
930 |
|
931 |
|
932 | pthread_mutex_lock(&mosq->msgtime_mutex);
|
933 | mosq->last_msg_in = mosquitto_time();
|
934 | pthread_mutex_unlock(&mosq->msgtime_mutex);
|
935 | }
|
936 | return MOSQ_ERR_SUCCESS;
|
937 | }else{
|
938 | switch(errno){
|
939 | case COMPAT_ECONNRESET:
|
940 | return MOSQ_ERR_CONN_LOST;
|
941 | default:
|
942 | return MOSQ_ERR_ERRNO;
|
943 | }
|
944 | }
|
945 | }
|
946 | }
|
947 |
|
948 |
|
949 | mosq->in_packet.pos = 0;
|
950 | #ifdef WITH_BROKER
|
951 | # ifdef WITH_SYS_TREE
|
952 | g_msgs_received++;
|
953 | if(((mosq->in_packet.command)&0xF5) == PUBLISH){
|
954 | g_pub_msgs_received++;
|
955 | }
|
956 | # endif
|
957 | rc = mqtt3_packet_handle(db, mosq);
|
958 | #else
|
959 | rc = _mosquitto_packet_handle(mosq);
|
960 | #endif
|
961 |
|
962 |
|
963 | _mosquitto_packet_cleanup(&mosq->in_packet);
|
964 |
|
965 | pthread_mutex_lock(&mosq->msgtime_mutex);
|
966 | mosq->last_msg_in = mosquitto_time();
|
967 | pthread_mutex_unlock(&mosq->msgtime_mutex);
|
968 | return rc;
|
969 | }
|
970 |
|
971 | int _mosquitto_socket_nonblock(int sock)
|
972 | {
|
973 | #ifndef WIN32
|
974 | int opt;
|
975 |
|
976 | opt = fcntl(sock, F_GETFL, 0);
|
977 | if(opt == -1){
|
978 | COMPAT_CLOSE(sock);
|
979 | return 1;
|
980 | }
|
981 | if(fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
|
982 |
|
983 | COMPAT_CLOSE(sock);
|
984 | return 1;
|
985 | }
|
986 | #else
|
987 | opt = 1;
|
988 | if(ioctlsocket(sock, FIONBIO, &opt)){
|
989 | COMPAT_CLOSE(sock);
|
990 | return 1;
|
991 | }
|
992 | #endif
|
993 | return 0;
|
994 | }
|
995 |
|
996 |
|
997 | #ifndef WITH_BROKER
|
998 | int _mosquitto_socketpair(int *pairR, int *pairW)
|
999 | {
|
1000 | #ifdef WIN32
|
1001 | int family[2] = {AF_INET, AF_INET6};
|
1002 | int i;
|
1003 | struct sockaddr_storage ss;
|
1004 | struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
|
1005 | struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
|
1006 | socklen_t ss_len;
|
1007 | int spR, spW;
|
1008 |
|
1009 | int listensock;
|
1010 |
|
1011 | *pairR = -1;
|
1012 | *pairW = -1;
|
1013 |
|
1014 | for(i=0; i<2; i++){
|
1015 | memset(&ss, 0, sizeof(ss));
|
1016 | if(family[i] == AF_INET){
|
1017 | sa->sin_family = family[i];
|
1018 | sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
1019 | sa->sin_port = 0;
|
1020 | ss_len = sizeof(struct sockaddr_in);
|
1021 | }else if(family[i] == AF_INET6){
|
1022 | sa6->sin6_family = family[i];
|
1023 | sa6->sin6_addr = in6addr_loopback;
|
1024 | sa6->sin6_port = 0;
|
1025 | ss_len = sizeof(struct sockaddr_in6);
|
1026 | }else{
|
1027 | return MOSQ_ERR_INVAL;
|
1028 | }
|
1029 |
|
1030 | listensock = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
|
1031 | if(listensock == -1){
|
1032 | continue;
|
1033 | }
|
1034 |
|
1035 | if(bind(listensock, (struct sockaddr *)&ss, ss_len) == -1){
|
1036 | COMPAT_CLOSE(listensock);
|
1037 | continue;
|
1038 | }
|
1039 |
|
1040 | if(listen(listensock, 1) == -1){
|
1041 | COMPAT_CLOSE(listensock);
|
1042 | continue;
|
1043 | }
|
1044 | memset(&ss, 0, sizeof(ss));
|
1045 | ss_len = sizeof(ss);
|
1046 | if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
|
1047 | COMPAT_CLOSE(listensock);
|
1048 | continue;
|
1049 | }
|
1050 |
|
1051 | if(_mosquitto_socket_nonblock(listensock)){
|
1052 | continue;
|
1053 | }
|
1054 |
|
1055 | if(family[i] == AF_INET){
|
1056 | sa->sin_family = family[i];
|
1057 | sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
1058 | ss_len = sizeof(struct sockaddr_in);
|
1059 | }else if(family[i] == AF_INET6){
|
1060 | sa6->sin6_family = family[i];
|
1061 | sa6->sin6_addr = in6addr_loopback;
|
1062 | ss_len = sizeof(struct sockaddr_in6);
|
1063 | }
|
1064 |
|
1065 | spR = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
|
1066 | if(spR == -1){
|
1067 | COMPAT_CLOSE(listensock);
|
1068 | continue;
|
1069 | }
|
1070 | if(_mosquitto_socket_nonblock(spR)){
|
1071 | COMPAT_CLOSE(listensock);
|
1072 | continue;
|
1073 | }
|
1074 | if(connect(spR, (struct sockaddr *)&ss, ss_len) < 0){
|
1075 | #ifdef WIN32
|
1076 | errno = WSAGetLastError();
|
1077 | #endif
|
1078 | if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
|
1079 | COMPAT_CLOSE(spR);
|
1080 | COMPAT_CLOSE(listensock);
|
1081 | continue;
|
1082 | }
|
1083 | }
|
1084 | spW = accept(listensock, NULL, 0);
|
1085 | if(spW == -1){
|
1086 | #ifdef WIN32
|
1087 | errno = WSAGetLastError();
|
1088 | #endif
|
1089 | if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
|
1090 | COMPAT_CLOSE(spR);
|
1091 | COMPAT_CLOSE(listensock);
|
1092 | continue;
|
1093 | }
|
1094 | }
|
1095 |
|
1096 | if(_mosquitto_socket_nonblock(spW)){
|
1097 | COMPAT_CLOSE(spR);
|
1098 | COMPAT_CLOSE(listensock);
|
1099 | continue;
|
1100 | }
|
1101 | COMPAT_CLOSE(listensock);
|
1102 |
|
1103 | *pairR = spR;
|
1104 | *pairW = spW;
|
1105 | return MOSQ_ERR_SUCCESS;
|
1106 | }
|
1107 | return MOSQ_ERR_UNKNOWN;
|
1108 | #else
|
1109 | int sv[2];
|
1110 |
|
1111 | if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
|
1112 | return MOSQ_ERR_ERRNO;
|
1113 | }
|
1114 | if(_mosquitto_socket_nonblock(sv[0])){
|
1115 | COMPAT_CLOSE(sv[0]);
|
1116 | COMPAT_CLOSE(sv[1]);
|
1117 | return MOSQ_ERR_ERRNO;
|
1118 | }
|
1119 | if(_mosquitto_socket_nonblock(sv[1])){
|
1120 | COMPAT_CLOSE(sv[0]);
|
1121 | COMPAT_CLOSE(sv[1]);
|
1122 | return MOSQ_ERR_ERRNO;
|
1123 | }
|
1124 | *pairR = sv[0];
|
1125 | *pairW = sv[1];
|
1126 | return MOSQ_ERR_SUCCESS;
|
1127 | #endif
|
1128 | }
|
1129 | #endif
|