UNPKG

4.19 kBtext/x-cView Raw
1#include <fcntl.h>
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <time.h>
6#include <unistd.h>
7
8#include <mosquitto.h>
9
10struct msg_list{
11 struct msg_list *next;
12 struct mosquitto_message msg;
13 bool sent;
14};
15
16struct sub{
17 uint16_t mid;
18 char *topic;
19 int qos;
20 bool complete;
21};
22
23struct sub subs[3];
24struct msg_list *messages_received = NULL;
25struct msg_list *messages_sent = NULL;
26int sent_count = 0;
27int received_count = 0;
28
29void on_message(void *obj, const struct mosquitto_message *msg)
30{
31 struct msg_list *tail, *new_list;
32
33 received_count++;
34
35 new_list = malloc(sizeof(struct msg_list));
36 if(!new_list){
37 fprintf(stderr, "Error allocating list memory.\n");
38 return;
39 }
40 new_list->next = NULL;
41 if(!mosquitto_message_copy(&new_list->msg, msg)){
42 if(messages_received){
43 tail = messages_received;
44 while(tail->next){
45 tail = tail->next;
46 }
47 tail->next = new_list;
48 }else{
49 messages_received = new_list;
50 }
51 }else{
52 free(new_list);
53 return;
54 }
55}
56
57void on_publish(void *obj, uint16_t mid)
58{
59 struct msg_list *tail = messages_sent;
60
61 sent_count++;
62
63 while(tail){
64 if(tail->msg.mid == mid){
65 tail->sent = true;
66 return;
67 }
68 tail = tail->next;
69 }
70
71 fprintf(stderr, "ERROR: Invalid on_publish() callback for mid %d\n", mid);
72}
73
74void on_subscribe(void *obj, uint16_t mid, int qos_count, const uint8_t *granted_qos)
75{
76 int i;
77 for(i=0; i<3; i++){
78 if(subs[i].mid == mid){
79 if(subs[i].complete){
80 fprintf(stderr, "WARNING: Duplicate on_subscribe() callback for mid %d\n", mid);
81 }
82 subs[i].complete = true;
83 return;
84 }
85 }
86 fprintf(stderr, "ERROR: Invalid on_subscribe() callback for mid %d\n", mid);
87}
88
89void on_disconnect(void *obj)
90{
91 printf("Disconnected cleanly.\n");
92}
93
94void rand_publish(struct mosquitto *mosq, const char *topic, int qos)
95{
96 int fd = open("/dev/urandom", O_RDONLY);
97 uint8_t buf[100];
98 uint16_t mid;
99 struct msg_list *new_list, *tail;
100
101 if(fd >= 0){
102 if(read(fd, buf, 100) == 100){
103 if(!mosquitto_publish(mosq, &mid, topic, 100, buf, qos, false)){
104 new_list = malloc(sizeof(struct msg_list));
105 if(new_list){
106 new_list->msg.mid = mid;
107 new_list->msg.topic = strdup(topic);
108 new_list->msg.payloadlen = 100;
109 new_list->msg.payload = malloc(100);
110 memcpy(new_list->msg.payload, buf, 100);
111 new_list->msg.retain = false;
112 new_list->next = NULL;
113 new_list->sent = false;
114
115 if(messages_sent){
116 tail = messages_sent;
117 while(tail->next){
118 tail = tail->next;
119 }
120 tail->next = new_list;
121 }else{
122 messages_sent = new_list;
123 }
124 }
125 }
126 }
127 close(fd);
128 }
129}
130
131int main(int argc, char *argv[])
132{
133 struct mosquitto *mosq;
134 int i;
135 time_t start;
136
137 mosquitto_lib_init();
138
139 mosq = mosquitto_new("qos-test", NULL);
140 mosquitto_log_init(mosq, MOSQ_LOG_ALL, MOSQ_LOG_STDOUT);
141 mosquitto_message_callback_set(mosq, on_message);
142 mosquitto_publish_callback_set(mosq, on_publish);
143 mosquitto_subscribe_callback_set(mosq, on_subscribe);
144 mosquitto_disconnect_callback_set(mosq, on_disconnect);
145
146 mosquitto_connect(mosq, "127.0.0.1", 1883, 60, true);
147 subs[0].topic = "qos-test/0";
148 subs[0].qos = 0;
149 subs[0].complete = false;
150 subs[1].topic = "qos-test/1";
151 subs[1].qos = 1;
152 subs[1].complete = false;
153 subs[2].topic = "qos-test/2";
154 subs[2].qos = 2;
155 subs[2].complete = false;
156 mosquitto_subscribe(mosq, &subs[0].mid, subs[0].topic, subs[0].qos);
157 mosquitto_subscribe(mosq, &subs[1].mid, subs[1].topic, subs[1].qos);
158 mosquitto_subscribe(mosq, &subs[2].mid, subs[2].topic, subs[2].qos);
159
160 for(i=0; i<1; i++){
161 rand_publish(mosq, "qos-test/0", 0);
162 rand_publish(mosq, "qos-test/0", 1);
163 rand_publish(mosq, "qos-test/0", 2);
164 rand_publish(mosq, "qos-test/1", 0);
165 rand_publish(mosq, "qos-test/1", 1);
166 rand_publish(mosq, "qos-test/1", 2);
167 rand_publish(mosq, "qos-test/2", 0);
168 rand_publish(mosq, "qos-test/2", 1);
169 rand_publish(mosq, "qos-test/2", 2);
170 }
171 start = time(NULL);
172 while(!mosquitto_loop(mosq, -1)){
173 if(time(NULL)-start > 20){
174 mosquitto_disconnect(mosq);
175 }
176 }
177
178 mosquitto_destroy(mosq);
179
180 mosquitto_lib_cleanup();
181
182 printf("Sent messages: %d\n", sent_count);
183 printf("Received messages: %d\n", received_count);
184 return 0;
185}
186