UNPKG

10.1 kBJavaScriptView Raw
1'use strict'
2
3var mqtt = require('..')
4var should = require('should')
5var fork = require('child_process').fork
6var path = require('path')
7var abstractClientTests = require('./abstract_client')
8var net = require('net')
9var eos = require('end-of-stream')
10var Connection = require('mqtt-connection')
11var Server = require('./server')
12var port = 9876
13var server
14
15/**
16 * Test server
17 */
18function buildServer () {
19 return new Server(function (client) {
20 client.on('connect', function (packet) {
21 if (packet.clientId === 'invalid') {
22 client.connack({returnCode: 2})
23 } else {
24 client.connack({returnCode: 0})
25 }
26 })
27
28 client.on('publish', function (packet) {
29 setImmediate(function () {
30 switch (packet.qos) {
31 case 0:
32 break
33 case 1:
34 client.puback(packet)
35 break
36 case 2:
37 client.pubrec(packet)
38 break
39 }
40 })
41 })
42
43 client.on('pubrel', function (packet) {
44 client.pubcomp(packet)
45 })
46
47 client.on('pubrec', function (packet) {
48 client.pubrel(packet)
49 })
50
51 client.on('pubcomp', function () {
52 // Nothing to be done
53 })
54
55 client.on('subscribe', function (packet) {
56 client.suback({
57 messageId: packet.messageId,
58 granted: packet.subscriptions.map(function (e) {
59 return e.qos
60 })
61 })
62 })
63
64 client.on('unsubscribe', function (packet) {
65 client.unsuback(packet)
66 })
67
68 client.on('pingreq', function () {
69 client.pingresp()
70 })
71 })
72}
73
74server = buildServer().listen(port)
75
76describe('MqttClient', function () {
77 describe('creating', function () {
78 it('should allow instantiation of MqttClient without the \'new\' operator', function (done) {
79 should(function () {
80 var client
81 try {
82 client = mqtt.MqttClient(function () {
83 throw Error('break')
84 }, {})
85 client.end()
86 } catch (err) {
87 if (err.message !== 'break') {
88 throw err
89 }
90 done()
91 }
92 }).not.throw('Object #<Object> has no method \'_setupStream\'')
93 })
94 })
95
96 var config = { protocol: 'mqtt', port: port }
97 abstractClientTests(server, config)
98
99 describe('message ids', function () {
100 it('should increment the message id', function () {
101 var client = mqtt.connect(config)
102 var currentId = client._nextId()
103
104 client._nextId().should.equal(currentId + 1)
105 client.end()
106 })
107
108 it('should return 1 once the interal counter reached limit', function () {
109 var client = mqtt.connect(config)
110 client.nextId = 65535
111
112 client._nextId().should.equal(65535)
113 client._nextId().should.equal(1)
114 client.end()
115 })
116 })
117
118 describe('reconnecting', function () {
119 it('should attempt to reconnect once server is down', function (done) {
120 this.timeout(15000)
121
122 var innerServer = fork(path.join(__dirname, 'helpers', 'server_process.js'))
123 var client = mqtt.connect({ port: 3000, host: 'localhost', keepalive: 1 })
124
125 client.once('connect', function () {
126 innerServer.kill('SIGINT') // mocks server shutdown
127
128 client.once('close', function () {
129 should.exist(client.reconnectTimer)
130 client.end()
131 done()
132 })
133 })
134 })
135
136 it('should reconnect to multiple host-ports combination if servers is passed', function (done) {
137 this.timeout(15000)
138
139 var server2 = buildServer().listen(port + 42)
140
141 server2.on('client', function (c) {
142 c.stream.destroy()
143 server2.close()
144 })
145
146 server2.on('listening', function () {
147 var client = mqtt.connect({
148 servers: [
149 { port: port + 42, host: 'localhost' },
150 { port: port, host: 'localhost' }
151 ],
152 keepalive: 50
153 })
154
155 server.once('client', function () {
156 client.end()
157 done()
158 })
159
160 client.once('connect', function () {
161 client.stream.destroy()
162 })
163 })
164 })
165
166 it('should reconnect if a connack is not received in an interval', function (done) {
167 this.timeout(2000)
168
169 var server2 = net.createServer().listen(port + 43)
170
171 server2.on('connection', function (c) {
172 eos(c, function () {
173 server2.close()
174 })
175 })
176
177 server2.on('listening', function () {
178 var client = mqtt.connect({
179 servers: [
180 { port: port + 43, host: 'localhost_fake' },
181 { port: port, host: 'localhost' }
182 ],
183 connectTimeout: 500
184 })
185
186 server.once('client', function () {
187 client.end()
188 done()
189 })
190
191 client.once('connect', function () {
192 client.stream.destroy()
193 })
194 })
195 })
196
197 it('shoud not be cleared by the connack timer', function (done) {
198 this.timeout(4000)
199
200 var server2 = net.createServer().listen(port + 44)
201
202 server2.on('connection', function (c) {
203 c.destroy()
204 })
205
206 server2.once('listening', function () {
207 var reconnects = 0
208 var connectTimeout = 1000
209 var reconnectPeriod = 100
210 var expectedReconnects = Math.floor(connectTimeout / reconnectPeriod)
211 var client = mqtt.connect({
212 port: port + 44,
213 host: 'localhost',
214 connectTimeout: connectTimeout,
215 reconnectPeriod: reconnectPeriod
216 })
217
218 client.on('reconnect', function () {
219 reconnects++
220 if (reconnects >= expectedReconnects) {
221 client.end()
222 done()
223 }
224 })
225 })
226 })
227
228 it('shoud not keep requeueing the first message when offline', function (done) {
229 this.timeout(2500)
230
231 var server2 = buildServer().listen(port + 45)
232 var client = mqtt.connect({
233 port: port + 45,
234 host: 'localhost',
235 connectTimeout: 350,
236 reconnectPeriod: 300
237 })
238
239 server2.on('client', function (c) {
240 client.publish('hello', 'world', { qos: 1 }, function () {
241 c.destroy()
242 server2.close()
243 client.publish('hello', 'world', { qos: 1 })
244 })
245 })
246
247 setTimeout(function () {
248 if (client.queue.length === 0) {
249 client.end(true)
250 done()
251 } else {
252 client.end(true)
253 }
254 }, 2000)
255 })
256
257 it('should not send the same subcribe multiple times on a flaky connection', function (done) {
258 this.timeout(3500)
259
260 var KILL_COUNT = 4
261 var killedConnections = 0
262 var subIds = {}
263 var client = mqtt.connect({
264 port: port + 46,
265 host: 'localhost',
266 connectTimeout: 350,
267 reconnectPeriod: 300
268 })
269
270 var server2 = new Server(function (client) {
271 client.on('error', function () {})
272 client.on('connect', function (packet) {
273 if (packet.clientId === 'invalid') {
274 client.connack({returnCode: 2})
275 } else {
276 client.connack({returnCode: 0})
277 }
278 })
279 }).listen(port + 46)
280
281 server2.on('client', function (c) {
282 client.subscribe('topic', function () {
283 done()
284 client.end(true)
285 c.destroy()
286 server2.close()
287 })
288
289 c.on('subscribe', function (packet) {
290 if (killedConnections < KILL_COUNT) {
291 // Kill the first few sub attempts to simulate a flaky connection
292 killedConnections++
293 c.destroy()
294 } else {
295 // Keep track of acks
296 if (!subIds[packet.messageId]) {
297 subIds[packet.messageId] = 0
298 }
299 subIds[packet.messageId]++
300 if (subIds[packet.messageId] > 1) {
301 done(new Error('Multiple duplicate acked subscriptions received for messageId ' + packet.messageId))
302 client.end(true)
303 c.destroy()
304 server2.destroy()
305 }
306
307 c.suback({
308 messageId: packet.messageId,
309 granted: packet.subscriptions.map(function (e) {
310 return e.qos
311 })
312 })
313 }
314 })
315 })
316 })
317
318 it('should not send the same publish multiple times on a flaky connection', function (done) {
319 this.timeout(3500)
320
321 var KILL_COUNT = 4
322 var killedConnections = 0
323 var pubIds = {}
324 var client = mqtt.connect({
325 port: port + 47,
326 host: 'localhost',
327 connectTimeout: 350,
328 reconnectPeriod: 300
329 })
330
331 var server2 = net.createServer(function (stream) {
332 var client = new Connection(stream)
333 client.on('error', function () {})
334 client.on('connect', function (packet) {
335 if (packet.clientId === 'invalid') {
336 client.connack({returnCode: 2})
337 } else {
338 client.connack({returnCode: 0})
339 }
340 })
341
342 this.emit('client', client)
343 }).listen(port + 47)
344
345 server2.on('client', function (c) {
346 client.publish('topic', 'data', { qos: 1 }, function () {
347 done()
348 client.end(true)
349 c.destroy()
350 server2.destroy()
351 })
352
353 c.on('publish', function onPublish (packet) {
354 if (killedConnections < KILL_COUNT) {
355 // Kill the first few pub attempts to simulate a flaky connection
356 killedConnections++
357 c.destroy()
358
359 // to avoid receiving inflight messages
360 c.removeListener('publish', onPublish)
361 } else {
362 // Keep track of acks
363 if (!pubIds[packet.messageId]) {
364 pubIds[packet.messageId] = 0
365 }
366
367 pubIds[packet.messageId]++
368
369 if (pubIds[packet.messageId] > 1) {
370 done(new Error('Multiple duplicate acked publishes received for messageId ' + packet.messageId))
371 client.end(true)
372 c.destroy()
373 server2.destroy()
374 }
375
376 c.puback(packet)
377 }
378 })
379 })
380 })
381 })
382})