UNPKG

9.06 kBtext/x-cView Raw
1/*
2Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
133. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
16
17THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27POSSIBILITY OF SUCH DAMAGE.
28*/
29
30#include <assert.h>
31#include <stdio.h>
32#include <string.h>
33
34#include <config.h>
35
36#include <mosquitto_broker.h>
37#include <mqtt3_protocol.h>
38#include <memory_mosq.h>
39#include <read_handle.h>
40#include <send_mosq.h>
41#include <util_mosq.h>
42
43#ifdef WITH_SYS_TREE
44extern uint64_t g_pub_bytes_received;
45#endif
46
47int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context)
48{
49 if(!context) return MOSQ_ERR_INVAL;
50
51 switch((context->in_packet.command)&0xF0){
52 case PINGREQ:
53 return _mosquitto_handle_pingreq(context);
54 case PINGRESP:
55 return _mosquitto_handle_pingresp(context);
56 case PUBACK:
57 return _mosquitto_handle_pubackcomp(context, "PUBACK");
58 case PUBCOMP:
59 return _mosquitto_handle_pubackcomp(context, "PUBCOMP");
60 case PUBLISH:
61 return mqtt3_handle_publish(db, context);
62 case PUBREC:
63 return _mosquitto_handle_pubrec(context);
64 case PUBREL:
65 return _mosquitto_handle_pubrel(db, context);
66 case CONNECT:
67 return mqtt3_handle_connect(db, context);
68 case DISCONNECT:
69 return mqtt3_handle_disconnect(db, context);
70 case SUBSCRIBE:
71 return mqtt3_handle_subscribe(db, context);
72 case UNSUBSCRIBE:
73 return mqtt3_handle_unsubscribe(db, context);
74#ifdef WITH_BRIDGE
75 case CONNACK:
76 return mqtt3_handle_connack(db, context);
77 case SUBACK:
78 return _mosquitto_handle_suback(context);
79 case UNSUBACK:
80 return _mosquitto_handle_unsuback(context);
81#endif
82 default:
83 /* If we don't recognise the command, return an error straight away. */
84 return MOSQ_ERR_PROTOCOL;
85 }
86}
87
88int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context)
89{
90 char *topic;
91 void *payload = NULL;
92 uint32_t payloadlen;
93 uint8_t dup, qos, retain;
94 uint16_t mid = 0;
95 int rc = 0;
96 uint8_t header = context->in_packet.command;
97 int res = 0;
98 struct mosquitto_msg_store *stored = NULL;
99 int len;
100 char *topic_mount;
101#ifdef WITH_BRIDGE
102 char *topic_temp;
103 int i;
104 struct _mqtt3_bridge_topic *cur_topic;
105 bool match;
106#endif
107
108 dup = (header & 0x08)>>3;
109 qos = (header & 0x06)>>1;
110 if(qos == 3){
111 _mosquitto_log_printf(NULL, MOSQ_LOG_INFO,
112 "Invalid QoS in PUBLISH from %s, disconnecting.", context->id);
113 return 1;
114 }
115 retain = (header & 0x01);
116
117 if(_mosquitto_read_string(&context->in_packet, &topic)) return 1;
118 if(strlen(topic) == 0){
119 /* Invalid publish topic, disconnect client. */
120 _mosquitto_free(topic);
121 return 1;
122 }
123 if(!strlen(topic)){
124 _mosquitto_free(topic);
125 return 1;
126 }
127#ifdef WITH_BRIDGE
128 if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){
129 for(i=0; i<context->bridge->topic_count; i++){
130 cur_topic = &context->bridge->topics[i];
131 if((cur_topic->direction == bd_both || cur_topic->direction == bd_in)
132 && (cur_topic->remote_prefix || cur_topic->local_prefix)){
133
134 /* Topic mapping required on this topic if the message matches */
135
136 rc = mosquitto_topic_matches_sub(cur_topic->remote_topic, topic, &match);
137 if(rc){
138 _mosquitto_free(topic);
139 return rc;
140 }
141 if(match){
142 if(cur_topic->remote_prefix){
143 /* This prefix needs removing. */
144 if(!strncmp(cur_topic->remote_prefix, topic, strlen(cur_topic->remote_prefix))){
145 topic_temp = _mosquitto_strdup(topic+strlen(cur_topic->remote_prefix));
146 if(!topic_temp){
147 _mosquitto_free(topic);
148 return MOSQ_ERR_NOMEM;
149 }
150 _mosquitto_free(topic);
151 topic = topic_temp;
152 }
153 }
154
155 if(cur_topic->local_prefix){
156 /* This prefix needs adding. */
157 len = strlen(topic) + strlen(cur_topic->local_prefix)+1;
158 topic_temp = _mosquitto_calloc(len+1, sizeof(char));
159 if(!topic_temp){
160 _mosquitto_free(topic);
161 return MOSQ_ERR_NOMEM;
162 }
163 snprintf(topic_temp, len, "%s%s", cur_topic->local_prefix, topic);
164 _mosquitto_free(topic);
165 topic = topic_temp;
166 }
167 break;
168 }
169 }
170 }
171 }
172#endif
173 if(_mosquitto_topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS){
174 /* Invalid publish topic, just swallow it. */
175 _mosquitto_free(topic);
176 return 1;
177 }
178
179 if(qos > 0){
180 if(_mosquitto_read_uint16(&context->in_packet, &mid)){
181 _mosquitto_free(topic);
182 return 1;
183 }
184 }
185
186 payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
187#ifdef WITH_SYS_TREE
188 g_pub_bytes_received += payloadlen;
189#endif
190 if(context->listener && context->listener->mount_point){
191 len = strlen(context->listener->mount_point) + strlen(topic) + 1;
192 topic_mount = _mosquitto_calloc(len, sizeof(char));
193 if(!topic_mount){
194 _mosquitto_free(topic);
195 return MOSQ_ERR_NOMEM;
196 }
197 snprintf(topic_mount, len, "%s%s", context->listener->mount_point, topic);
198 _mosquitto_free(topic);
199 topic = topic_mount;
200 }
201
202 if(payloadlen){
203 if(db->config->message_size_limit && payloadlen > db->config->message_size_limit){
204 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
205 goto process_bad_message;
206 }
207 payload = _mosquitto_calloc(payloadlen+1, sizeof(uint8_t));
208 if(!payload){
209 _mosquitto_free(topic);
210 return 1;
211 }
212 if(_mosquitto_read_bytes(&context->in_packet, payload, payloadlen)){
213 _mosquitto_free(topic);
214 _mosquitto_free(payload);
215 return 1;
216 }
217 }
218
219 /* Check for topic access */
220 rc = mosquitto_acl_check(db, context, topic, MOSQ_ACL_WRITE);
221 if(rc == MOSQ_ERR_ACL_DENIED){
222 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
223 goto process_bad_message;
224 }else if(rc != MOSQ_ERR_SUCCESS){
225 _mosquitto_free(topic);
226 if(payload) _mosquitto_free(payload);
227 return rc;
228 }
229
230 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
231 if(qos > 0){
232 mqtt3_db_message_store_find(context, mid, &stored);
233 }
234 if(!stored){
235 dup = 0;
236 if(mqtt3_db_message_store(db, context->id, mid, topic, qos, payloadlen, payload, retain, &stored, 0)){
237 _mosquitto_free(topic);
238 if(payload) _mosquitto_free(payload);
239 return 1;
240 }
241 }else{
242 dup = 1;
243 }
244 switch(qos){
245 case 0:
246 if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1;
247 break;
248 case 1:
249 if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1;
250 if(_mosquitto_send_puback(context, mid)) rc = 1;
251 break;
252 case 2:
253 if(!dup){
254 res = mqtt3_db_message_insert(db, context, mid, mosq_md_in, qos, retain, stored);
255 }else{
256 res = 0;
257 }
258 /* mqtt3_db_message_insert() returns 2 to indicate dropped message
259 * due to queue. This isn't an error so don't disconnect them. */
260 if(!res){
261 if(_mosquitto_send_pubrec(context, mid)) rc = 1;
262 }else if(res == 1){
263 rc = 1;
264 }
265 break;
266 }
267 _mosquitto_free(topic);
268 if(payload) _mosquitto_free(payload);
269
270 return rc;
271process_bad_message:
272 _mosquitto_free(topic);
273 if(payload) _mosquitto_free(payload);
274 switch(qos){
275 case 0:
276 return MOSQ_ERR_SUCCESS;
277 case 1:
278 return _mosquitto_send_puback(context, mid);
279 case 2:
280 mqtt3_db_message_store_find(context, mid, &stored);
281 if(!stored){
282 if(mqtt3_db_message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0)){
283 return 1;
284 }
285 res = mqtt3_db_message_insert(db, context, mid, mosq_md_in, qos, false, stored);
286 }else{
287 res = 0;
288 }
289 if(!res){
290 res = _mosquitto_send_pubrec(context, mid);
291 }
292 return res;
293 }
294 return 1;
295}
296