1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 | #define _GNU_SOURCE
|
31 |
|
32 | #include <config.h>
|
33 |
|
34 | #include <assert.h>
|
35 | #ifndef WIN32
|
36 | #include <poll.h>
|
37 | #else
|
38 | #include <process.h>
|
39 | #include <winsock2.h>
|
40 | #include <ws2tcpip.h>
|
41 | #endif
|
42 |
|
43 | #include <errno.h>
|
44 | #include <signal.h>
|
45 | #include <stdio.h>
|
46 | #include <string.h>
|
47 |
|
48 | #include <mosquitto_broker.h>
|
49 | #include <memory_mosq.h>
|
50 | #include <time_mosq.h>
|
51 | #include <util_mosq.h>
|
52 |
|
53 | extern bool flag_reload;
|
54 | #ifdef WITH_PERSISTENCE
|
55 | extern bool flag_db_backup;
|
56 | #endif
|
57 | extern bool flag_tree_print;
|
58 | extern int run;
|
59 | #ifdef WITH_SYS_TREE
|
60 | extern int g_clients_expired;
|
61 | #endif
|
62 |
|
63 | static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds);
|
64 | static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
|
65 |
|
66 | int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max)
|
67 | {
|
68 | time_t start_time = mosquitto_time();
|
69 | time_t last_backup = mosquitto_time();
|
70 | time_t last_store_clean = mosquitto_time();
|
71 | time_t now;
|
72 | int time_count;
|
73 | int fdcount;
|
74 | #ifndef WIN32
|
75 | sigset_t sigblock, origsig;
|
76 | #endif
|
77 | int i;
|
78 | struct pollfd *pollfds = NULL;
|
79 | int pollfd_count = 0;
|
80 | int pollfd_index;
|
81 | #ifdef WITH_BRIDGE
|
82 | int bridge_sock;
|
83 | int rc;
|
84 | #endif
|
85 |
|
86 | #ifndef WIN32
|
87 | sigemptyset(&sigblock);
|
88 | sigaddset(&sigblock, SIGINT);
|
89 | #endif
|
90 |
|
91 | while(run){
|
92 | #ifdef WITH_SYS_TREE
|
93 | if(db->config->sys_interval > 0){
|
94 | mqtt3_db_sys_update(db, db->config->sys_interval, start_time);
|
95 | }
|
96 | #endif
|
97 |
|
98 | if(listensock_count + db->context_count > pollfd_count || !pollfds){
|
99 | pollfd_count = listensock_count + db->context_count;
|
100 | pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count);
|
101 | if(!pollfds){
|
102 | _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
103 | return MOSQ_ERR_NOMEM;
|
104 | }
|
105 | }
|
106 |
|
107 | memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count);
|
108 |
|
109 | pollfd_index = 0;
|
110 | for(i=0; i<listensock_count; i++){
|
111 | pollfds[pollfd_index].fd = listensock[i];
|
112 | pollfds[pollfd_index].events = POLLIN;
|
113 | pollfds[pollfd_index].revents = 0;
|
114 | pollfd_index++;
|
115 | }
|
116 |
|
117 | time_count = 0;
|
118 | for(i=0; i<db->context_count; i++){
|
119 | if(db->contexts[i]){
|
120 | if(time_count > 0){
|
121 | time_count--;
|
122 | }else{
|
123 | time_count = 1000;
|
124 | now = mosquitto_time();
|
125 | }
|
126 | db->contexts[i]->pollfd_index = -1;
|
127 |
|
128 | if(db->contexts[i]->sock != INVALID_SOCKET){
|
129 | #ifdef WITH_BRIDGE
|
130 | if(db->contexts[i]->bridge){
|
131 | _mosquitto_check_keepalive(db->contexts[i]);
|
132 | if(db->contexts[i]->bridge->round_robin == false
|
133 | && db->contexts[i]->bridge->cur_address != 0
|
134 | && now > db->contexts[i]->bridge->primary_retry){
|
135 |
|
136 |
|
137 | if(_mosquitto_try_connect(db->contexts[i]->bridge->addresses[0].address, db->contexts[i]->bridge->addresses[0].port, &bridge_sock, NULL, true) == MOSQ_ERR_SUCCESS){
|
138 | COMPAT_CLOSE(bridge_sock);
|
139 | _mosquitto_socket_close(db->contexts[i]);
|
140 | db->contexts[i]->bridge->cur_address = db->contexts[i]->bridge->address_count-1;
|
141 | }
|
142 | }
|
143 | }
|
144 | #endif
|
145 |
|
146 |
|
147 | if(!(db->contexts[i]->keepalive)
|
148 | || db->contexts[i]->bridge
|
149 | || now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){
|
150 |
|
151 | if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){
|
152 | pollfds[pollfd_index].fd = db->contexts[i]->sock;
|
153 | pollfds[pollfd_index].events = POLLIN;
|
154 | pollfds[pollfd_index].revents = 0;
|
155 | if(db->contexts[i]->current_out_packet){
|
156 | pollfds[pollfd_index].events |= POLLOUT;
|
157 | }
|
158 | db->contexts[i]->pollfd_index = pollfd_index;
|
159 | pollfd_index++;
|
160 | }else{
|
161 | mqtt3_context_disconnect(db, db->contexts[i]);
|
162 | }
|
163 | }else{
|
164 | if(db->config->connection_messages == true){
|
165 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->contexts[i]->id);
|
166 | }
|
167 |
|
168 | mqtt3_context_disconnect(db, db->contexts[i]);
|
169 | }
|
170 | }else{
|
171 | #ifdef WITH_BRIDGE
|
172 | if(db->contexts[i]->bridge){
|
173 |
|
174 | if(!db->contexts[i]->bridge->restart_t){
|
175 | db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout;
|
176 | db->contexts[i]->bridge->cur_address++;
|
177 | if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
|
178 | db->contexts[i]->bridge->cur_address = 0;
|
179 | }
|
180 | if(db->contexts[i]->bridge->round_robin == false && db->contexts[i]->bridge->cur_address != 0){
|
181 | db->contexts[i]->bridge->primary_retry = now + 5;
|
182 | }
|
183 | }else{
|
184 | if(db->contexts[i]->bridge->start_type == bst_lazy && db->contexts[i]->bridge->lazy_reconnect){
|
185 | rc = mqtt3_bridge_connect(db, db->contexts[i]);
|
186 | if(rc){
|
187 | db->contexts[i]->bridge->cur_address++;
|
188 | if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
|
189 | db->contexts[i]->bridge->cur_address = 0;
|
190 | }
|
191 | }
|
192 | }
|
193 | if(db->contexts[i]->bridge->start_type == bst_automatic && now > db->contexts[i]->bridge->restart_t){
|
194 | db->contexts[i]->bridge->restart_t = 0;
|
195 | rc = mqtt3_bridge_connect(db, db->contexts[i]);
|
196 | if(rc == MOSQ_ERR_SUCCESS){
|
197 | pollfds[pollfd_index].fd = db->contexts[i]->sock;
|
198 | pollfds[pollfd_index].events = POLLIN;
|
199 | pollfds[pollfd_index].revents = 0;
|
200 | if(db->contexts[i]->current_out_packet){
|
201 | pollfds[pollfd_index].events |= POLLOUT;
|
202 | }
|
203 | db->contexts[i]->pollfd_index = pollfd_index;
|
204 | pollfd_index++;
|
205 | }else{
|
206 |
|
207 | db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout;
|
208 |
|
209 | db->contexts[i]->bridge->cur_address++;
|
210 | if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
|
211 | db->contexts[i]->bridge->cur_address = 0;
|
212 | }
|
213 | }
|
214 | }
|
215 | }
|
216 | }else{
|
217 | #endif
|
218 | if(db->contexts[i]->clean_session == true){
|
219 | mqtt3_context_cleanup(db, db->contexts[i], true);
|
220 | db->contexts[i] = NULL;
|
221 | }else if(db->config->persistent_client_expiration > 0){
|
222 | |
223 |
|
224 |
|
225 |
|
226 |
|
227 | if(now > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){
|
228 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db->contexts[i]->id);
|
229 | #ifdef WITH_SYS_TREE
|
230 | g_clients_expired++;
|
231 | #endif
|
232 | db->contexts[i]->clean_session = true;
|
233 | mqtt3_context_cleanup(db, db->contexts[i], true);
|
234 | db->contexts[i] = NULL;
|
235 | }
|
236 | }
|
237 | #ifdef WITH_BRIDGE
|
238 | }
|
239 | #endif
|
240 | }
|
241 | }
|
242 | }
|
243 |
|
244 | mqtt3_db_message_timeout_check(db, db->config->retry_interval);
|
245 |
|
246 | #ifndef WIN32
|
247 | sigprocmask(SIG_SETMASK, &sigblock, &origsig);
|
248 | fdcount = poll(pollfds, pollfd_index, 100);
|
249 | sigprocmask(SIG_SETMASK, &origsig, NULL);
|
250 | #else
|
251 | fdcount = WSAPoll(pollfds, pollfd_index, 100);
|
252 | #endif
|
253 | if(fdcount == -1){
|
254 | loop_handle_errors(db, pollfds);
|
255 | }else{
|
256 | loop_handle_reads_writes(db, pollfds);
|
257 |
|
258 | for(i=0; i<listensock_count; i++){
|
259 | if(pollfds[i].revents & (POLLIN | POLLPRI)){
|
260 | while(mqtt3_socket_accept(db, listensock[i]) != -1){
|
261 | }
|
262 | }
|
263 | }
|
264 | }
|
265 | #ifdef WITH_PERSISTENCE
|
266 | if(db->config->persistence && db->config->autosave_interval){
|
267 | if(db->config->autosave_on_changes){
|
268 | if(db->persistence_changes > db->config->autosave_interval){
|
269 | mqtt3_db_backup(db, false, false);
|
270 | db->persistence_changes = 0;
|
271 | }
|
272 | }else{
|
273 | if(last_backup + db->config->autosave_interval < mosquitto_time()){
|
274 | mqtt3_db_backup(db, false, false);
|
275 | last_backup = mosquitto_time();
|
276 | }
|
277 | }
|
278 | }
|
279 | #endif
|
280 | if(!db->config->store_clean_interval || last_store_clean + db->config->store_clean_interval < mosquitto_time()){
|
281 | mqtt3_db_store_clean(db);
|
282 | last_store_clean = mosquitto_time();
|
283 | }
|
284 | #ifdef WITH_PERSISTENCE
|
285 | if(flag_db_backup){
|
286 | mqtt3_db_backup(db, false, false);
|
287 | flag_db_backup = false;
|
288 | }
|
289 | #endif
|
290 | if(flag_reload){
|
291 | _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
|
292 | mqtt3_config_read(db->config, true);
|
293 | mosquitto_security_cleanup(db, true);
|
294 | mosquitto_security_init(db, true);
|
295 | mosquitto_security_apply(db);
|
296 | mqtt3_log_init(db->config->log_type, db->config->log_dest);
|
297 | flag_reload = false;
|
298 | }
|
299 | if(flag_tree_print){
|
300 | mqtt3_sub_tree_print(&db->subs, 0);
|
301 | flag_tree_print = false;
|
302 | }
|
303 | }
|
304 |
|
305 | if(pollfds) _mosquitto_free(pollfds);
|
306 | return MOSQ_ERR_SUCCESS;
|
307 | }
|
308 |
|
309 | static void do_disconnect(struct mosquitto_db *db, int context_index)
|
310 | {
|
311 | if(db->config->connection_messages == true){
|
312 | if(db->contexts[context_index]->state != mosq_cs_disconnecting){
|
313 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", db->contexts[context_index]->id);
|
314 | }else{
|
315 | _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", db->contexts[context_index]->id);
|
316 | }
|
317 | }
|
318 | mqtt3_context_disconnect(db, db->contexts[context_index]);
|
319 | }
|
320 |
|
321 |
|
322 |
|
323 |
|
324 | static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds)
|
325 | {
|
326 | int i;
|
327 |
|
328 | for(i=0; i<db->context_count; i++){
|
329 | if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
|
330 | if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){
|
331 | do_disconnect(db, i);
|
332 | }
|
333 | }
|
334 | }
|
335 | }
|
336 |
|
337 | static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
|
338 | {
|
339 | int i;
|
340 |
|
341 | for(i=0; i<db->context_count; i++){
|
342 | if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
|
343 | assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
|
344 | #ifdef WITH_TLS
|
345 | if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT ||
|
346 | db->contexts[i]->want_write ||
|
347 | (db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){
|
348 | #else
|
349 | if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT){
|
350 | #endif
|
351 | if(_mosquitto_packet_write(db->contexts[i])){
|
352 | do_disconnect(db, i);
|
353 | }
|
354 | }
|
355 | }
|
356 | if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
|
357 | assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
|
358 | #ifdef WITH_TLS
|
359 | if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN ||
|
360 | (db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){
|
361 | #else
|
362 | if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN){
|
363 | #endif
|
364 | if(_mosquitto_packet_read(db, db->contexts[i])){
|
365 | do_disconnect(db, i);
|
366 | }
|
367 | }
|
368 | }
|
369 | if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
|
370 | if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){
|
371 | do_disconnect(db, i);
|
372 | }
|
373 | }
|
374 | }
|
375 | }
|
376 |
|