UNPKG

7.96 kBtext/x-cView Raw
1/*
2Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#include <assert.h>
31#include <stdio.h>
32#include <string.h>
33
34#include "mosquitto.h"
35#include "logging_mosq.h"
36#include "memory_mosq.h"
37#include "messages_mosq.h"
38#include "mqtt3_protocol.h"
39#include "net_mosq.h"
40#include "read_handle.h"
41#include "send_mosq.h"
42#include "util_mosq.h"
43#ifdef WITH_BROKER
44#include "mosquitto_broker.h"
45#endif
46
47int _mosquitto_handle_pingreq(struct mosquitto *mosq)
48{
49 assert(mosq);
50#ifdef WITH_STRICT_PROTOCOL
51 if(mosq->in_packet.remaining_length != 0){
52 return MOSQ_ERR_PROTOCOL;
53 }
54#endif
55#ifdef WITH_BROKER
56 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PINGREQ from %s", mosq->id);
57#else
58 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PINGREQ", mosq->id);
59#endif
60 return _mosquitto_send_pingresp(mosq);
61}
62
63int _mosquitto_handle_pingresp(struct mosquitto *mosq)
64{
65 assert(mosq);
66#ifdef WITH_STRICT_PROTOCOL
67 if(mosq->in_packet.remaining_length != 0){
68 return MOSQ_ERR_PROTOCOL;
69 }
70#endif
71 mosq->ping_t = 0; /* No longer waiting for a PINGRESP. */
72#ifdef WITH_BROKER
73 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PINGRESP from %s", mosq->id);
74#else
75 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PINGRESP", mosq->id);
76#endif
77 return MOSQ_ERR_SUCCESS;
78}
79
80int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type)
81{
82 uint16_t mid;
83 int rc;
84
85 assert(mosq);
86#ifdef WITH_STRICT_PROTOCOL
87 if(mosq->in_packet.remaining_length != 2){
88 return MOSQ_ERR_PROTOCOL;
89 }
90#endif
91 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
92 if(rc) return rc;
93#ifdef WITH_BROKER
94 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);
95
96 if(mid){
97 rc = mqtt3_db_message_delete(mosq, mid, mosq_md_out);
98 if(rc) return rc;
99 }
100#else
101 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid);
102
103 if(!_mosquitto_message_delete(mosq, mid, mosq_md_out)){
104 /* Only inform the client the message has been sent once. */
105 pthread_mutex_lock(&mosq->callback_mutex);
106 if(mosq->on_publish){
107 mosq->in_callback = true;
108 mosq->on_publish(mosq, mosq->userdata, mid);
109 mosq->in_callback = false;
110 }
111 pthread_mutex_unlock(&mosq->callback_mutex);
112 }
113#endif
114
115 return MOSQ_ERR_SUCCESS;
116}
117
118int _mosquitto_handle_pubrec(struct mosquitto *mosq)
119{
120 uint16_t mid;
121 int rc;
122
123 assert(mosq);
124#ifdef WITH_STRICT_PROTOCOL
125 if(mosq->in_packet.remaining_length != 2){
126 return MOSQ_ERR_PROTOCOL;
127 }
128#endif
129 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
130 if(rc) return rc;
131#ifdef WITH_BROKER
132 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);
133
134 rc = mqtt3_db_message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp);
135#else
136 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid);
137
138 rc = _mosquitto_message_out_update(mosq, mid, mosq_ms_wait_for_pubcomp);
139#endif
140 if(rc) return rc;
141 rc = _mosquitto_send_pubrel(mosq, mid, false);
142 if(rc) return rc;
143
144 return MOSQ_ERR_SUCCESS;
145}
146
147int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
148{
149 uint16_t mid;
150#ifndef WITH_BROKER
151 struct mosquitto_message_all *message = NULL;
152#endif
153 int rc;
154
155 assert(mosq);
156#ifdef WITH_STRICT_PROTOCOL
157 if(mosq->in_packet.remaining_length != 2){
158 return MOSQ_ERR_PROTOCOL;
159 }
160#endif
161 if(mosq->protocol == mosq_p_mqtt311){
162 if((mosq->in_packet.command&0x0F) != 0x02){
163 return MOSQ_ERR_PROTOCOL;
164 }
165 }
166 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
167 if(rc) return rc;
168#ifdef WITH_BROKER
169 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid);
170
171 if(mqtt3_db_message_release(db, mosq, mid, mosq_md_in)){
172 /* Message not found. Still send a PUBCOMP anyway because this could be
173 * due to a repeated PUBREL after a client has reconnected. */
174 }
175#else
176 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid);
177
178 if(!_mosquitto_message_remove(mosq, mid, mosq_md_in, &message)){
179 /* Only pass the message on if we have removed it from the queue - this
180 * prevents multiple callbacks for the same message. */
181 pthread_mutex_lock(&mosq->callback_mutex);
182 if(mosq->on_message){
183 mosq->in_callback = true;
184 mosq->on_message(mosq, mosq->userdata, &message->msg);
185 mosq->in_callback = false;
186 }
187 pthread_mutex_unlock(&mosq->callback_mutex);
188 _mosquitto_message_cleanup(&message);
189 }
190#endif
191 rc = _mosquitto_send_pubcomp(mosq, mid);
192 if(rc) return rc;
193
194 return MOSQ_ERR_SUCCESS;
195}
196
197int _mosquitto_handle_suback(struct mosquitto *mosq)
198{
199 uint16_t mid;
200 uint8_t qos;
201 int *granted_qos;
202 int qos_count;
203 int i = 0;
204 int rc;
205
206 assert(mosq);
207#ifdef WITH_BROKER
208 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received SUBACK from %s", mosq->id);
209#else
210 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received SUBACK", mosq->id);
211#endif
212 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
213 if(rc) return rc;
214
215 qos_count = mosq->in_packet.remaining_length - mosq->in_packet.pos;
216 granted_qos = _mosquitto_malloc(qos_count*sizeof(int));
217 if(!granted_qos) return MOSQ_ERR_NOMEM;
218 while(mosq->in_packet.pos < mosq->in_packet.remaining_length){
219 rc = _mosquitto_read_byte(&mosq->in_packet, &qos);
220 if(rc){
221 _mosquitto_free(granted_qos);
222 return rc;
223 }
224 granted_qos[i] = (int)qos;
225 i++;
226 }
227#ifndef WITH_BROKER
228 pthread_mutex_lock(&mosq->callback_mutex);
229 if(mosq->on_subscribe){
230 mosq->in_callback = true;
231 mosq->on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
232 mosq->in_callback = false;
233 }
234 pthread_mutex_unlock(&mosq->callback_mutex);
235#endif
236 _mosquitto_free(granted_qos);
237
238 return MOSQ_ERR_SUCCESS;
239}
240
241int _mosquitto_handle_unsuback(struct mosquitto *mosq)
242{
243 uint16_t mid;
244 int rc;
245
246 assert(mosq);
247#ifdef WITH_STRICT_PROTOCOL
248 if(mosq->in_packet.remaining_length != 2){
249 return MOSQ_ERR_PROTOCOL;
250 }
251#endif
252#ifdef WITH_BROKER
253 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBACK from %s", mosq->id);
254#else
255 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received UNSUBACK", mosq->id);
256#endif
257 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
258 if(rc) return rc;
259#ifndef WITH_BROKER
260 pthread_mutex_lock(&mosq->callback_mutex);
261 if(mosq->on_unsubscribe){
262 mosq->in_callback = true;
263 mosq->on_unsubscribe(mosq, mosq->userdata, mid);
264 mosq->in_callback = false;
265 }
266 pthread_mutex_unlock(&mosq->callback_mutex);
267#endif
268
269 return MOSQ_ERR_SUCCESS;
270}
271