1 | #include <fcntl.h>
|
2 | #include <stdio.h>
|
3 | #include <stdlib.h>
|
4 | #include <string.h>
|
5 | #include <time.h>
|
6 | #include <unistd.h>
|
7 |
|
8 | #include <mosquitto.h>
|
9 |
|
10 | struct msg_list{
|
11 | struct msg_list *next;
|
12 | struct mosquitto_message msg;
|
13 | bool sent;
|
14 | };
|
15 |
|
16 | struct sub{
|
17 | uint16_t mid;
|
18 | char *topic;
|
19 | int qos;
|
20 | bool complete;
|
21 | };
|
22 |
|
23 | struct sub subs[3];
|
24 | struct msg_list *messages_received = NULL;
|
25 | struct msg_list *messages_sent = NULL;
|
26 | int sent_count = 0;
|
27 | int received_count = 0;
|
28 |
|
29 | void on_message(void *obj, const struct mosquitto_message *msg)
|
30 | {
|
31 | struct msg_list *tail, *new_list;
|
32 |
|
33 | received_count++;
|
34 |
|
35 | new_list = malloc(sizeof(struct msg_list));
|
36 | if(!new_list){
|
37 | fprintf(stderr, "Error allocating list memory.\n");
|
38 | return;
|
39 | }
|
40 | new_list->next = NULL;
|
41 | if(!mosquitto_message_copy(&new_list->msg, msg)){
|
42 | if(messages_received){
|
43 | tail = messages_received;
|
44 | while(tail->next){
|
45 | tail = tail->next;
|
46 | }
|
47 | tail->next = new_list;
|
48 | }else{
|
49 | messages_received = new_list;
|
50 | }
|
51 | }else{
|
52 | free(new_list);
|
53 | return;
|
54 | }
|
55 | }
|
56 |
|
57 | void on_publish(void *obj, uint16_t mid)
|
58 | {
|
59 | struct msg_list *tail = messages_sent;
|
60 |
|
61 | sent_count++;
|
62 |
|
63 | while(tail){
|
64 | if(tail->msg.mid == mid){
|
65 | tail->sent = true;
|
66 | return;
|
67 | }
|
68 | tail = tail->next;
|
69 | }
|
70 |
|
71 | fprintf(stderr, "ERROR: Invalid on_publish() callback for mid %d\n", mid);
|
72 | }
|
73 |
|
74 | void on_subscribe(void *obj, uint16_t mid, int qos_count, const uint8_t *granted_qos)
|
75 | {
|
76 | int i;
|
77 | for(i=0; i<3; i++){
|
78 | if(subs[i].mid == mid){
|
79 | if(subs[i].complete){
|
80 | fprintf(stderr, "WARNING: Duplicate on_subscribe() callback for mid %d\n", mid);
|
81 | }
|
82 | subs[i].complete = true;
|
83 | return;
|
84 | }
|
85 | }
|
86 | fprintf(stderr, "ERROR: Invalid on_subscribe() callback for mid %d\n", mid);
|
87 | }
|
88 |
|
89 | void on_disconnect(void *obj)
|
90 | {
|
91 | printf("Disconnected cleanly.\n");
|
92 | }
|
93 |
|
94 | void rand_publish(struct mosquitto *mosq, const char *topic, int qos)
|
95 | {
|
96 | int fd = open("/dev/urandom", O_RDONLY);
|
97 | uint8_t buf[100];
|
98 | uint16_t mid;
|
99 | struct msg_list *new_list, *tail;
|
100 |
|
101 | if(fd >= 0){
|
102 | if(read(fd, buf, 100) == 100){
|
103 | if(!mosquitto_publish(mosq, &mid, topic, 100, buf, qos, false)){
|
104 | new_list = malloc(sizeof(struct msg_list));
|
105 | if(new_list){
|
106 | new_list->msg.mid = mid;
|
107 | new_list->msg.topic = strdup(topic);
|
108 | new_list->msg.payloadlen = 100;
|
109 | new_list->msg.payload = malloc(100);
|
110 | memcpy(new_list->msg.payload, buf, 100);
|
111 | new_list->msg.retain = false;
|
112 | new_list->next = NULL;
|
113 | new_list->sent = false;
|
114 |
|
115 | if(messages_sent){
|
116 | tail = messages_sent;
|
117 | while(tail->next){
|
118 | tail = tail->next;
|
119 | }
|
120 | tail->next = new_list;
|
121 | }else{
|
122 | messages_sent = new_list;
|
123 | }
|
124 | }
|
125 | }
|
126 | }
|
127 | close(fd);
|
128 | }
|
129 | }
|
130 |
|
131 | int main(int argc, char *argv[])
|
132 | {
|
133 | struct mosquitto *mosq;
|
134 | int i;
|
135 | time_t start;
|
136 |
|
137 | mosquitto_lib_init();
|
138 |
|
139 | mosq = mosquitto_new("qos-test", NULL);
|
140 | mosquitto_log_init(mosq, MOSQ_LOG_ALL, MOSQ_LOG_STDOUT);
|
141 | mosquitto_message_callback_set(mosq, on_message);
|
142 | mosquitto_publish_callback_set(mosq, on_publish);
|
143 | mosquitto_subscribe_callback_set(mosq, on_subscribe);
|
144 | mosquitto_disconnect_callback_set(mosq, on_disconnect);
|
145 |
|
146 | mosquitto_connect(mosq, "127.0.0.1", 1883, 60, true);
|
147 | subs[0].topic = "qos-test/0";
|
148 | subs[0].qos = 0;
|
149 | subs[0].complete = false;
|
150 | subs[1].topic = "qos-test/1";
|
151 | subs[1].qos = 1;
|
152 | subs[1].complete = false;
|
153 | subs[2].topic = "qos-test/2";
|
154 | subs[2].qos = 2;
|
155 | subs[2].complete = false;
|
156 | mosquitto_subscribe(mosq, &subs[0].mid, subs[0].topic, subs[0].qos);
|
157 | mosquitto_subscribe(mosq, &subs[1].mid, subs[1].topic, subs[1].qos);
|
158 | mosquitto_subscribe(mosq, &subs[2].mid, subs[2].topic, subs[2].qos);
|
159 |
|
160 | for(i=0; i<1; i++){
|
161 | rand_publish(mosq, "qos-test/0", 0);
|
162 | rand_publish(mosq, "qos-test/0", 1);
|
163 | rand_publish(mosq, "qos-test/0", 2);
|
164 | rand_publish(mosq, "qos-test/1", 0);
|
165 | rand_publish(mosq, "qos-test/1", 1);
|
166 | rand_publish(mosq, "qos-test/1", 2);
|
167 | rand_publish(mosq, "qos-test/2", 0);
|
168 | rand_publish(mosq, "qos-test/2", 1);
|
169 | rand_publish(mosq, "qos-test/2", 2);
|
170 | }
|
171 | start = time(NULL);
|
172 | while(!mosquitto_loop(mosq, -1)){
|
173 | if(time(NULL)-start > 20){
|
174 | mosquitto_disconnect(mosq);
|
175 | }
|
176 | }
|
177 |
|
178 | mosquitto_destroy(mosq);
|
179 |
|
180 | mosquitto_lib_cleanup();
|
181 |
|
182 | printf("Sent messages: %d\n", sent_count);
|
183 | printf("Received messages: %d\n", received_count);
|
184 | return 0;
|
185 | }
|
186 |
|