UNPKG

12.4 kBtext/x-cView Raw
1/*
2Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#define _GNU_SOURCE
31
32#include <config.h>
33
34#include <assert.h>
35#ifndef WIN32
36#include <poll.h>
37#else
38#include <process.h>
39#include <winsock2.h>
40#include <ws2tcpip.h>
41#endif
42
43#include <errno.h>
44#include <signal.h>
45#include <stdio.h>
46#include <string.h>
47
48#include <mosquitto_broker.h>
49#include <memory_mosq.h>
50#include <time_mosq.h>
51#include <util_mosq.h>
52
53extern bool flag_reload;
54#ifdef WITH_PERSISTENCE
55extern bool flag_db_backup;
56#endif
57extern bool flag_tree_print;
58extern int run;
59#ifdef WITH_SYS_TREE
60extern int g_clients_expired;
61#endif
62
63static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds);
64static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
65
66int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max)
67{
68 time_t start_time = mosquitto_time();
69 time_t last_backup = mosquitto_time();
70 time_t last_store_clean = mosquitto_time();
71 time_t now;
72 int time_count;
73 int fdcount;
74#ifndef WIN32
75 sigset_t sigblock, origsig;
76#endif
77 int i;
78 struct pollfd *pollfds = NULL;
79 int pollfd_count = 0;
80 int pollfd_index;
81#ifdef WITH_BRIDGE
82 int bridge_sock;
83 int rc;
84#endif
85
86#ifndef WIN32
87 sigemptyset(&sigblock);
88 sigaddset(&sigblock, SIGINT);
89#endif
90
91 while(run){
92#ifdef WITH_SYS_TREE
93 if(db->config->sys_interval > 0){
94 mqtt3_db_sys_update(db, db->config->sys_interval, start_time);
95 }
96#endif
97
98 if(listensock_count + db->context_count > pollfd_count || !pollfds){
99 pollfd_count = listensock_count + db->context_count;
100 pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count);
101 if(!pollfds){
102 _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
103 return MOSQ_ERR_NOMEM;
104 }
105 }
106
107 memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count);
108
109 pollfd_index = 0;
110 for(i=0; i<listensock_count; i++){
111 pollfds[pollfd_index].fd = listensock[i];
112 pollfds[pollfd_index].events = POLLIN;
113 pollfds[pollfd_index].revents = 0;
114 pollfd_index++;
115 }
116
117 time_count = 0;
118 for(i=0; i<db->context_count; i++){
119 if(db->contexts[i]){
120 if(time_count > 0){
121 time_count--;
122 }else{
123 time_count = 1000;
124 now = mosquitto_time();
125 }
126 db->contexts[i]->pollfd_index = -1;
127
128 if(db->contexts[i]->sock != INVALID_SOCKET){
129#ifdef WITH_BRIDGE
130 if(db->contexts[i]->bridge){
131 _mosquitto_check_keepalive(db->contexts[i]);
132 if(db->contexts[i]->bridge->round_robin == false
133 && db->contexts[i]->bridge->cur_address != 0
134 && now > db->contexts[i]->bridge->primary_retry){
135
136 /* FIXME - this should be non-blocking */
137 if(_mosquitto_try_connect(db->contexts[i]->bridge->addresses[0].address, db->contexts[i]->bridge->addresses[0].port, &bridge_sock, NULL, true) == MOSQ_ERR_SUCCESS){
138 COMPAT_CLOSE(bridge_sock);
139 _mosquitto_socket_close(db->contexts[i]);
140 db->contexts[i]->bridge->cur_address = db->contexts[i]->bridge->address_count-1;
141 }
142 }
143 }
144#endif
145
146 /* Local bridges never time out in this fashion. */
147 if(!(db->contexts[i]->keepalive)
148 || db->contexts[i]->bridge
149 || now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){
150
151 if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){
152 pollfds[pollfd_index].fd = db->contexts[i]->sock;
153 pollfds[pollfd_index].events = POLLIN;
154 pollfds[pollfd_index].revents = 0;
155 if(db->contexts[i]->current_out_packet){
156 pollfds[pollfd_index].events |= POLLOUT;
157 }
158 db->contexts[i]->pollfd_index = pollfd_index;
159 pollfd_index++;
160 }else{
161 mqtt3_context_disconnect(db, db->contexts[i]);
162 }
163 }else{
164 if(db->config->connection_messages == true){
165 _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->contexts[i]->id);
166 }
167 /* Client has exceeded keepalive*1.5 */
168 mqtt3_context_disconnect(db, db->contexts[i]);
169 }
170 }else{
171#ifdef WITH_BRIDGE
172 if(db->contexts[i]->bridge){
173 /* Want to try to restart the bridge connection */
174 if(!db->contexts[i]->bridge->restart_t){
175 db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout;
176 db->contexts[i]->bridge->cur_address++;
177 if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
178 db->contexts[i]->bridge->cur_address = 0;
179 }
180 if(db->contexts[i]->bridge->round_robin == false && db->contexts[i]->bridge->cur_address != 0){
181 db->contexts[i]->bridge->primary_retry = now + 5;
182 }
183 }else{
184 if(db->contexts[i]->bridge->start_type == bst_lazy && db->contexts[i]->bridge->lazy_reconnect){
185 rc = mqtt3_bridge_connect(db, db->contexts[i]);
186 if(rc){
187 db->contexts[i]->bridge->cur_address++;
188 if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
189 db->contexts[i]->bridge->cur_address = 0;
190 }
191 }
192 }
193 if(db->contexts[i]->bridge->start_type == bst_automatic && now > db->contexts[i]->bridge->restart_t){
194 db->contexts[i]->bridge->restart_t = 0;
195 rc = mqtt3_bridge_connect(db, db->contexts[i]);
196 if(rc == MOSQ_ERR_SUCCESS){
197 pollfds[pollfd_index].fd = db->contexts[i]->sock;
198 pollfds[pollfd_index].events = POLLIN;
199 pollfds[pollfd_index].revents = 0;
200 if(db->contexts[i]->current_out_packet){
201 pollfds[pollfd_index].events |= POLLOUT;
202 }
203 db->contexts[i]->pollfd_index = pollfd_index;
204 pollfd_index++;
205 }else{
206 /* Retry later. */
207 db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout;
208
209 db->contexts[i]->bridge->cur_address++;
210 if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
211 db->contexts[i]->bridge->cur_address = 0;
212 }
213 }
214 }
215 }
216 }else{
217#endif
218 if(db->contexts[i]->clean_session == true){
219 mqtt3_context_cleanup(db, db->contexts[i], true);
220 db->contexts[i] = NULL;
221 }else if(db->config->persistent_client_expiration > 0){
222 /* This is a persistent client, check to see if the
223 * last time it connected was longer than
224 * persistent_client_expiration seconds ago. If so,
225 * expire it and clean up.
226 */
227 if(now > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){
228 _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db->contexts[i]->id);
229#ifdef WITH_SYS_TREE
230 g_clients_expired++;
231#endif
232 db->contexts[i]->clean_session = true;
233 mqtt3_context_cleanup(db, db->contexts[i], true);
234 db->contexts[i] = NULL;
235 }
236 }
237#ifdef WITH_BRIDGE
238 }
239#endif
240 }
241 }
242 }
243
244 mqtt3_db_message_timeout_check(db, db->config->retry_interval);
245
246#ifndef WIN32
247 sigprocmask(SIG_SETMASK, &sigblock, &origsig);
248 fdcount = poll(pollfds, pollfd_index, 100);
249 sigprocmask(SIG_SETMASK, &origsig, NULL);
250#else
251 fdcount = WSAPoll(pollfds, pollfd_index, 100);
252#endif
253 if(fdcount == -1){
254 loop_handle_errors(db, pollfds);
255 }else{
256 loop_handle_reads_writes(db, pollfds);
257
258 for(i=0; i<listensock_count; i++){
259 if(pollfds[i].revents & (POLLIN | POLLPRI)){
260 while(mqtt3_socket_accept(db, listensock[i]) != -1){
261 }
262 }
263 }
264 }
265#ifdef WITH_PERSISTENCE
266 if(db->config->persistence && db->config->autosave_interval){
267 if(db->config->autosave_on_changes){
268 if(db->persistence_changes > db->config->autosave_interval){
269 mqtt3_db_backup(db, false, false);
270 db->persistence_changes = 0;
271 }
272 }else{
273 if(last_backup + db->config->autosave_interval < mosquitto_time()){
274 mqtt3_db_backup(db, false, false);
275 last_backup = mosquitto_time();
276 }
277 }
278 }
279#endif
280 if(!db->config->store_clean_interval || last_store_clean + db->config->store_clean_interval < mosquitto_time()){
281 mqtt3_db_store_clean(db);
282 last_store_clean = mosquitto_time();
283 }
284#ifdef WITH_PERSISTENCE
285 if(flag_db_backup){
286 mqtt3_db_backup(db, false, false);
287 flag_db_backup = false;
288 }
289#endif
290 if(flag_reload){
291 _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
292 mqtt3_config_read(db->config, true);
293 mosquitto_security_cleanup(db, true);
294 mosquitto_security_init(db, true);
295 mosquitto_security_apply(db);
296 mqtt3_log_init(db->config->log_type, db->config->log_dest);
297 flag_reload = false;
298 }
299 if(flag_tree_print){
300 mqtt3_sub_tree_print(&db->subs, 0);
301 flag_tree_print = false;
302 }
303 }
304
305 if(pollfds) _mosquitto_free(pollfds);
306 return MOSQ_ERR_SUCCESS;
307}
308
309static void do_disconnect(struct mosquitto_db *db, int context_index)
310{
311 if(db->config->connection_messages == true){
312 if(db->contexts[context_index]->state != mosq_cs_disconnecting){
313 _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", db->contexts[context_index]->id);
314 }else{
315 _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", db->contexts[context_index]->id);
316 }
317 }
318 mqtt3_context_disconnect(db, db->contexts[context_index]);
319}
320
321/* Error ocurred, probably an fd has been closed.
322 * Loop through and check them all.
323 */
324static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds)
325{
326 int i;
327
328 for(i=0; i<db->context_count; i++){
329 if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
330 if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){
331 do_disconnect(db, i);
332 }
333 }
334 }
335}
336
337static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
338{
339 int i;
340
341 for(i=0; i<db->context_count; i++){
342 if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
343 assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
344#ifdef WITH_TLS
345 if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT ||
346 db->contexts[i]->want_write ||
347 (db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){
348#else
349 if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT){
350#endif
351 if(_mosquitto_packet_write(db->contexts[i])){
352 do_disconnect(db, i);
353 }
354 }
355 }
356 if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
357 assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
358#ifdef WITH_TLS
359 if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN ||
360 (db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){
361#else
362 if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN){
363#endif
364 if(_mosquitto_packet_read(db, db->contexts[i])){
365 do_disconnect(db, i);
366 }
367 }
368 }
369 if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
370 if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){
371 do_disconnect(db, i);
372 }
373 }
374 }
375}
376