1 | 'use strict'
|
2 |
|
3 | var mqtt = require('..')
|
4 | var should = require('should')
|
5 | var fork = require('child_process').fork
|
6 | var path = require('path')
|
7 | var abstractClientTests = require('./abstract_client')
|
8 | var net = require('net')
|
9 | var eos = require('end-of-stream')
|
10 | var Connection = require('mqtt-connection')
|
11 | var Server = require('./server')
|
12 | var port = 9876
|
13 | var server
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | function buildServer () {
|
19 | return new Server(function (client) {
|
20 | client.on('connect', function (packet) {
|
21 | if (packet.clientId === 'invalid') {
|
22 | client.connack({returnCode: 2})
|
23 | } else {
|
24 | client.connack({returnCode: 0})
|
25 | }
|
26 | })
|
27 |
|
28 | client.on('publish', function (packet) {
|
29 | setImmediate(function () {
|
30 | switch (packet.qos) {
|
31 | case 0:
|
32 | break
|
33 | case 1:
|
34 | client.puback(packet)
|
35 | break
|
36 | case 2:
|
37 | client.pubrec(packet)
|
38 | break
|
39 | }
|
40 | })
|
41 | })
|
42 |
|
43 | client.on('pubrel', function (packet) {
|
44 | client.pubcomp(packet)
|
45 | })
|
46 |
|
47 | client.on('pubrec', function (packet) {
|
48 | client.pubrel(packet)
|
49 | })
|
50 |
|
51 | client.on('pubcomp', function () {
|
52 |
|
53 | })
|
54 |
|
55 | client.on('subscribe', function (packet) {
|
56 | client.suback({
|
57 | messageId: packet.messageId,
|
58 | granted: packet.subscriptions.map(function (e) {
|
59 | return e.qos
|
60 | })
|
61 | })
|
62 | })
|
63 |
|
64 | client.on('unsubscribe', function (packet) {
|
65 | client.unsuback(packet)
|
66 | })
|
67 |
|
68 | client.on('pingreq', function () {
|
69 | client.pingresp()
|
70 | })
|
71 | })
|
72 | }
|
73 |
|
74 | server = buildServer().listen(port)
|
75 |
|
76 | describe('MqttClient', function () {
|
77 | describe('creating', function () {
|
78 | it('should allow instantiation of MqttClient without the \'new\' operator', function (done) {
|
79 | should(function () {
|
80 | var client
|
81 | try {
|
82 | client = mqtt.MqttClient(function () {
|
83 | throw Error('break')
|
84 | }, {})
|
85 | client.end()
|
86 | } catch (err) {
|
87 | if (err.message !== 'break') {
|
88 | throw err
|
89 | }
|
90 | done()
|
91 | }
|
92 | }).not.throw('Object #<Object> has no method \'_setupStream\'')
|
93 | })
|
94 | })
|
95 |
|
96 | var config = { protocol: 'mqtt', port: port }
|
97 | abstractClientTests(server, config)
|
98 |
|
99 | describe('message ids', function () {
|
100 | it('should increment the message id', function () {
|
101 | var client = mqtt.connect(config)
|
102 | var currentId = client._nextId()
|
103 |
|
104 | client._nextId().should.equal(currentId + 1)
|
105 | client.end()
|
106 | })
|
107 |
|
108 | it('should return 1 once the interal counter reached limit', function () {
|
109 | var client = mqtt.connect(config)
|
110 | client.nextId = 65535
|
111 |
|
112 | client._nextId().should.equal(65535)
|
113 | client._nextId().should.equal(1)
|
114 | client.end()
|
115 | })
|
116 | })
|
117 |
|
118 | describe('reconnecting', function () {
|
119 | it('should attempt to reconnect once server is down', function (done) {
|
120 | this.timeout(15000)
|
121 |
|
122 | var innerServer = fork(path.join(__dirname, 'helpers', 'server_process.js'))
|
123 | var client = mqtt.connect({ port: 3000, host: 'localhost', keepalive: 1 })
|
124 |
|
125 | client.once('connect', function () {
|
126 | innerServer.kill('SIGINT')
|
127 |
|
128 | client.once('close', function () {
|
129 | should.exist(client.reconnectTimer)
|
130 | client.end()
|
131 | done()
|
132 | })
|
133 | })
|
134 | })
|
135 |
|
136 | it('should reconnect to multiple host-ports combination if servers is passed', function (done) {
|
137 | this.timeout(15000)
|
138 |
|
139 | var server2 = buildServer().listen(port + 42)
|
140 |
|
141 | server2.on('client', function (c) {
|
142 | c.stream.destroy()
|
143 | server2.close()
|
144 | })
|
145 |
|
146 | server2.on('listening', function () {
|
147 | var client = mqtt.connect({
|
148 | servers: [
|
149 | { port: port + 42, host: 'localhost' },
|
150 | { port: port, host: 'localhost' }
|
151 | ],
|
152 | keepalive: 50
|
153 | })
|
154 |
|
155 | server.once('client', function () {
|
156 | client.end()
|
157 | done()
|
158 | })
|
159 |
|
160 | client.once('connect', function () {
|
161 | client.stream.destroy()
|
162 | })
|
163 | })
|
164 | })
|
165 |
|
166 | it('should reconnect if a connack is not received in an interval', function (done) {
|
167 | this.timeout(2000)
|
168 |
|
169 | var server2 = net.createServer().listen(port + 43)
|
170 |
|
171 | server2.on('connection', function (c) {
|
172 | eos(c, function () {
|
173 | server2.close()
|
174 | })
|
175 | })
|
176 |
|
177 | server2.on('listening', function () {
|
178 | var client = mqtt.connect({
|
179 | servers: [
|
180 | { port: port + 43, host: 'localhost_fake' },
|
181 | { port: port, host: 'localhost' }
|
182 | ],
|
183 | connectTimeout: 500
|
184 | })
|
185 |
|
186 | server.once('client', function () {
|
187 | client.end()
|
188 | done()
|
189 | })
|
190 |
|
191 | client.once('connect', function () {
|
192 | client.stream.destroy()
|
193 | })
|
194 | })
|
195 | })
|
196 |
|
197 | it('shoud not be cleared by the connack timer', function (done) {
|
198 | this.timeout(4000)
|
199 |
|
200 | var server2 = net.createServer().listen(port + 44)
|
201 |
|
202 | server2.on('connection', function (c) {
|
203 | c.destroy()
|
204 | })
|
205 |
|
206 | server2.once('listening', function () {
|
207 | var reconnects = 0
|
208 | var connectTimeout = 1000
|
209 | var reconnectPeriod = 100
|
210 | var expectedReconnects = Math.floor(connectTimeout / reconnectPeriod)
|
211 | var client = mqtt.connect({
|
212 | port: port + 44,
|
213 | host: 'localhost',
|
214 | connectTimeout: connectTimeout,
|
215 | reconnectPeriod: reconnectPeriod
|
216 | })
|
217 |
|
218 | client.on('reconnect', function () {
|
219 | reconnects++
|
220 | if (reconnects >= expectedReconnects) {
|
221 | client.end()
|
222 | done()
|
223 | }
|
224 | })
|
225 | })
|
226 | })
|
227 |
|
228 | it('shoud not keep requeueing the first message when offline', function (done) {
|
229 | this.timeout(2500)
|
230 |
|
231 | var server2 = buildServer().listen(port + 45)
|
232 | var client = mqtt.connect({
|
233 | port: port + 45,
|
234 | host: 'localhost',
|
235 | connectTimeout: 350,
|
236 | reconnectPeriod: 300
|
237 | })
|
238 |
|
239 | server2.on('client', function (c) {
|
240 | client.publish('hello', 'world', { qos: 1 }, function () {
|
241 | c.destroy()
|
242 | server2.close()
|
243 | client.publish('hello', 'world', { qos: 1 })
|
244 | })
|
245 | })
|
246 |
|
247 | setTimeout(function () {
|
248 | if (client.queue.length === 0) {
|
249 | client.end(true)
|
250 | done()
|
251 | } else {
|
252 | client.end(true)
|
253 | }
|
254 | }, 2000)
|
255 | })
|
256 |
|
257 | it('should not send the same subcribe multiple times on a flaky connection', function (done) {
|
258 | this.timeout(3500)
|
259 |
|
260 | var KILL_COUNT = 4
|
261 | var killedConnections = 0
|
262 | var subIds = {}
|
263 | var client = mqtt.connect({
|
264 | port: port + 46,
|
265 | host: 'localhost',
|
266 | connectTimeout: 350,
|
267 | reconnectPeriod: 300
|
268 | })
|
269 |
|
270 | var server2 = new Server(function (client) {
|
271 | client.on('error', function () {})
|
272 | client.on('connect', function (packet) {
|
273 | if (packet.clientId === 'invalid') {
|
274 | client.connack({returnCode: 2})
|
275 | } else {
|
276 | client.connack({returnCode: 0})
|
277 | }
|
278 | })
|
279 | }).listen(port + 46)
|
280 |
|
281 | server2.on('client', function (c) {
|
282 | client.subscribe('topic', function () {
|
283 | done()
|
284 | client.end(true)
|
285 | c.destroy()
|
286 | server2.close()
|
287 | })
|
288 |
|
289 | c.on('subscribe', function (packet) {
|
290 | if (killedConnections < KILL_COUNT) {
|
291 |
|
292 | killedConnections++
|
293 | c.destroy()
|
294 | } else {
|
295 |
|
296 | if (!subIds[packet.messageId]) {
|
297 | subIds[packet.messageId] = 0
|
298 | }
|
299 | subIds[packet.messageId]++
|
300 | if (subIds[packet.messageId] > 1) {
|
301 | done(new Error('Multiple duplicate acked subscriptions received for messageId ' + packet.messageId))
|
302 | client.end(true)
|
303 | c.destroy()
|
304 | server2.destroy()
|
305 | }
|
306 |
|
307 | c.suback({
|
308 | messageId: packet.messageId,
|
309 | granted: packet.subscriptions.map(function (e) {
|
310 | return e.qos
|
311 | })
|
312 | })
|
313 | }
|
314 | })
|
315 | })
|
316 | })
|
317 |
|
318 | it('should not send the same publish multiple times on a flaky connection', function (done) {
|
319 | this.timeout(3500)
|
320 |
|
321 | var KILL_COUNT = 4
|
322 | var killedConnections = 0
|
323 | var pubIds = {}
|
324 | var client = mqtt.connect({
|
325 | port: port + 47,
|
326 | host: 'localhost',
|
327 | connectTimeout: 350,
|
328 | reconnectPeriod: 300
|
329 | })
|
330 |
|
331 | var server2 = net.createServer(function (stream) {
|
332 | var client = new Connection(stream)
|
333 | client.on('error', function () {})
|
334 | client.on('connect', function (packet) {
|
335 | if (packet.clientId === 'invalid') {
|
336 | client.connack({returnCode: 2})
|
337 | } else {
|
338 | client.connack({returnCode: 0})
|
339 | }
|
340 | })
|
341 |
|
342 | this.emit('client', client)
|
343 | }).listen(port + 47)
|
344 |
|
345 | server2.on('client', function (c) {
|
346 | client.publish('topic', 'data', { qos: 1 }, function () {
|
347 | done()
|
348 | client.end(true)
|
349 | c.destroy()
|
350 | server2.destroy()
|
351 | })
|
352 |
|
353 | c.on('publish', function onPublish (packet) {
|
354 | if (killedConnections < KILL_COUNT) {
|
355 |
|
356 | killedConnections++
|
357 | c.destroy()
|
358 |
|
359 |
|
360 | c.removeListener('publish', onPublish)
|
361 | } else {
|
362 |
|
363 | if (!pubIds[packet.messageId]) {
|
364 | pubIds[packet.messageId] = 0
|
365 | }
|
366 |
|
367 | pubIds[packet.messageId]++
|
368 |
|
369 | if (pubIds[packet.messageId] > 1) {
|
370 | done(new Error('Multiple duplicate acked publishes received for messageId ' + packet.messageId))
|
371 | client.end(true)
|
372 | c.destroy()
|
373 | server2.destroy()
|
374 | }
|
375 |
|
376 | c.puback(packet)
|
377 | }
|
378 | })
|
379 | })
|
380 | })
|
381 | })
|
382 | })
|