UNPKG

2.9 kBtext/x-cView Raw
1/* This provides a crude manner of testing the performance of a broker in messages/s. */
2
3#include <stdbool.h>
4#include <stdint.h>
5#include <stdio.h>
6#include <stdlib.h>
7#include <sys/time.h>
8#include <mosquitto.h>
9
10#include <msgsps_common.h>
11
12static bool run = true;
13static int message_count = 0;
14static struct timeval start, stop;
15
16void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
17{
18 printf("rc: %d\n", rc);
19 gettimeofday(&start, NULL);
20}
21
22void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result)
23{
24 run = false;
25}
26
27void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
28{
29 message_count++;
30 //printf("%d ", message_count);
31 if(message_count == MESSAGE_COUNT){
32 gettimeofday(&stop, NULL);
33 mosquitto_disconnect((struct mosquitto *)obj);
34 }
35}
36
37int create_data(void)
38{
39 int i;
40 FILE *fptr, *rnd;
41 int rc = 0;
42 char buf[MESSAGE_SIZE];
43
44 fptr = fopen("msgsps_pub.dat", "rb");
45 if(fptr){
46 fseek(fptr, 0, SEEK_END);
47 if(ftell(fptr) >= MESSAGE_SIZE*MESSAGE_COUNT){
48 fclose(fptr);
49 return 0;
50 }
51 fclose(fptr);
52 }
53
54 fptr = fopen("msgsps_pub.dat", "wb");
55 if(!fptr) return 1;
56 rnd = fopen("/dev/urandom", "rb");
57 if(!rnd){
58 fclose(fptr);
59 return 1;
60 }
61
62 for(i=0; i<MESSAGE_COUNT; i++){
63 if(fread(buf, sizeof(char), MESSAGE_SIZE, rnd) != MESSAGE_SIZE){
64 rc = 1;
65 break;
66 }
67 if(fwrite(buf, sizeof(char), MESSAGE_SIZE, fptr) != MESSAGE_SIZE){
68 rc = 1;
69 break;
70 }
71 }
72 fclose(rnd);
73 fclose(fptr);
74
75 return rc;
76}
77
78int main(int argc, char *argv[])
79{
80 struct mosquitto *mosq;
81 int i;
82 double dstart, dstop, diff;
83 FILE *fptr;
84 uint8_t *buf;
85
86 buf = malloc(MESSAGE_SIZE*MESSAGE_COUNT);
87 if(!buf){
88 printf("Error: Out of memory.\n");
89 return 1;
90 }
91
92 start.tv_sec = 0;
93 start.tv_usec = 0;
94 stop.tv_sec = 0;
95 stop.tv_usec = 0;
96
97 if(create_data()){
98 printf("Error: Unable to create random input data.\n");
99 return 1;
100 }
101 fptr = fopen("msgsps_pub.dat", "rb");
102 if(!fptr){
103 printf("Error: Unable to open random input data.\n");
104 return 1;
105 }
106 fread(buf, sizeof(uint8_t), MESSAGE_SIZE*MESSAGE_COUNT, fptr);
107 fclose(fptr);
108
109 mosquitto_lib_init();
110
111 mosq = mosquitto_new("perftest", true, NULL);
112 mosquitto_connect_callback_set(mosq, my_connect_callback);
113 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
114 mosquitto_publish_callback_set(mosq, my_publish_callback);
115
116 mosquitto_connect(mosq, "127.0.0.1", 1884, 600);
117
118 i=0;
119 while(!mosquitto_loop(mosq, 1, 10) && run){
120 if(i<MESSAGE_COUNT){
121 mosquitto_publish(mosq, NULL, "perf/test", MESSAGE_SIZE, &buf[i*MESSAGE_SIZE], 0, false);
122 i++;
123 }
124 }
125 dstart = (double)start.tv_sec*1.0e6 + (double)start.tv_usec;
126 dstop = (double)stop.tv_sec*1.0e6 + (double)stop.tv_usec;
127 diff = (dstop-dstart)/1.0e6;
128
129 printf("Start: %g\nStop: %g\nDiff: %g\nMessages/s: %g\n", dstart, dstop, diff, (double)MESSAGE_COUNT/diff);
130
131 mosquitto_destroy(mosq);
132 mosquitto_lib_cleanup();
133
134 return 0;
135}