1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #include "config.h"
|
31 |
|
32 | #ifndef WIN32
|
33 | #include <unistd.h>
|
34 | #endif
|
35 |
|
36 | #include "mosquitto_internal.h"
|
37 | #include "net_mosq.h"
|
38 |
|
39 | void *_mosquitto_thread_main(void *obj);
|
40 |
|
41 | int mosquitto_loop_start(struct mosquitto *mosq)
|
42 | {
|
43 | #ifdef WITH_THREADING
|
44 | if(!mosq || mosq->threaded) return MOSQ_ERR_INVAL;
|
45 |
|
46 | mosq->threaded = true;
|
47 | pthread_create(&mosq->thread_id, NULL, _mosquitto_thread_main, mosq);
|
48 | return MOSQ_ERR_SUCCESS;
|
49 | #else
|
50 | return MOSQ_ERR_NOT_SUPPORTED;
|
51 | #endif
|
52 | }
|
53 |
|
54 | int mosquitto_loop_stop(struct mosquitto *mosq, bool force)
|
55 | {
|
56 | #ifdef WITH_THREADING
|
57 | # ifndef WITH_BROKER
|
58 | char sockpair_data = 0;
|
59 | # endif
|
60 |
|
61 | if(!mosq || !mosq->threaded) return MOSQ_ERR_INVAL;
|
62 |
|
63 |
|
64 | |
65 |
|
66 | if(mosq->sockpairW != INVALID_SOCKET){
|
67 | #ifndef WIN32
|
68 | if(write(mosq->sockpairW, &sockpair_data, 1)){
|
69 | }
|
70 | #else
|
71 | send(mosq->sockpairW, &sockpair_data, 1, 0);
|
72 | #endif
|
73 | }
|
74 |
|
75 | if(force){
|
76 | pthread_cancel(mosq->thread_id);
|
77 | }
|
78 | pthread_join(mosq->thread_id, NULL);
|
79 | mosq->thread_id = pthread_self();
|
80 | mosq->threaded = false;
|
81 |
|
82 | return MOSQ_ERR_SUCCESS;
|
83 | #else
|
84 | return MOSQ_ERR_NOT_SUPPORTED;
|
85 | #endif
|
86 | }
|
87 |
|
88 | #ifdef WITH_THREADING
|
89 | void *_mosquitto_thread_main(void *obj)
|
90 | {
|
91 | struct mosquitto *mosq = obj;
|
92 |
|
93 | if(!mosq) return NULL;
|
94 |
|
95 | pthread_mutex_lock(&mosq->state_mutex);
|
96 | if(mosq->state == mosq_cs_connect_async){
|
97 | pthread_mutex_unlock(&mosq->state_mutex);
|
98 | mosquitto_reconnect(mosq);
|
99 | }else{
|
100 | pthread_mutex_unlock(&mosq->state_mutex);
|
101 | }
|
102 |
|
103 | if(!mosq->keepalive){
|
104 |
|
105 | mosquitto_loop_forever(mosq, mosq->keepalive*1000*86400, 1);
|
106 | }else{
|
107 |
|
108 | mosquitto_loop_forever(mosq, mosq->keepalive*1000, 1);
|
109 | }
|
110 |
|
111 | return obj;
|
112 | }
|
113 | #endif
|
114 |
|