UNPKG

40.6 kBJavaScriptView Raw
1'use strict'
2
3/**
4 * Testing dependencies
5 */
6var should = require('should')
7var sinon = require('sinon')
8var mqtt = require('../')
9var xtend = require('xtend')
10
11module.exports = function (server, config) {
12 function connect (opts) {
13 opts = xtend(config, opts)
14 return mqtt.connect(opts)
15 }
16
17 describe('closing', function () {
18 it('should emit close if stream closes', function (done) {
19 var client = connect()
20
21 client.once('connect', function () {
22 client.stream.end()
23 })
24 client.once('close', function () {
25 client.end()
26 done()
27 })
28 })
29
30 it('should mark the client as disconnected', function (done) {
31 var client = connect()
32
33 client.once('close', function () {
34 client.end()
35 if (!client.connected) {
36 done()
37 } else {
38 done(new Error('Not marked as disconnected'))
39 }
40 })
41 client.once('connect', function () {
42 client.stream.end()
43 })
44 })
45
46 it('should stop ping timer if stream closes', function (done) {
47 var client = connect()
48
49 client.once('close', function () {
50 should.not.exist(client.pingTimer)
51 client.end()
52 done()
53 })
54
55 client.once('connect', function () {
56 should.exist(client.pingTimer)
57 client.stream.end()
58 })
59 })
60
61 it('should emit close after end called', function (done) {
62 var client = connect()
63
64 client.once('close', function () {
65 done()
66 })
67
68 client.once('connect', function () {
69 client.end()
70 })
71 })
72
73 it('should return `this` if end called twice', function (done) {
74 var client = connect()
75
76 client.once('connect', function () {
77 client.end()
78 var value = client.end()
79 if (value === client) {
80 done()
81 } else {
82 done(new Error('Not returning client.'))
83 }
84 })
85 })
86
87 it('should stop ping timer after end called', function (done) {
88 var client = connect()
89
90 client.once('connect', function () {
91 should.exist(client.pingTimer)
92 client.end()
93 should.not.exist(client.pingTimer)
94 done()
95 })
96 })
97 })
98
99 describe('connecting', function () {
100 it('should connect to the broker', function (done) {
101 var client = connect()
102 client.on('error', done)
103
104 server.once('client', function () {
105 client.end()
106 done()
107 })
108 })
109
110 it('should send a default client id', function (done) {
111 var client = connect()
112 client.on('error', done)
113
114 server.once('client', function (serverClient) {
115 serverClient.once('connect', function (packet) {
116 packet.clientId.should.match(/mqttjs.*/)
117 serverClient.disconnect()
118 done()
119 })
120 })
121 })
122
123 it('should send be clean by default', function (done) {
124 var client = connect()
125 client.on('error', done)
126
127 server.once('client', function (serverClient) {
128 serverClient.once('connect', function (packet) {
129 packet.clean.should.be.true
130 serverClient.disconnect()
131 done()
132 })
133 })
134 })
135
136 it('should connect with the given client id', function (done) {
137 var client = connect({clientId: 'testclient'})
138 client.on('error', function (err) {
139 throw err
140 })
141
142 server.once('client', function (serverClient) {
143 serverClient.once('connect', function (packet) {
144 packet.clientId.should.match(/testclient/)
145 serverClient.disconnect()
146 done()
147 })
148 })
149 })
150
151 it('should connect with the client id and unclean state', function (done) {
152 var client = connect({clientId: 'testclient', clean: false})
153 client.on('error', function (err) {
154 throw err
155 })
156
157 server.once('client', function (serverClient) {
158 serverClient.once('connect', function (packet) {
159 packet.clientId.should.match(/testclient/)
160 packet.clean.should.be.false
161 serverClient.disconnect()
162 done()
163 })
164 })
165 })
166
167 it('should require a clientId with clean=false', function (done) {
168 try {
169 var client = connect({ clean: false })
170 client.on('error', function (err) {
171 done(err)
172 // done(new Error('should have thrown'));
173 })
174 } catch (err) {
175 done()
176 }
177 })
178
179 it('should default to localhost', function (done) {
180 var client = connect({clientId: 'testclient'})
181 client.on('error', function (err) {
182 throw err
183 })
184
185 server.once('client', function (serverClient) {
186 serverClient.once('connect', function (packet) {
187 packet.clientId.should.match(/testclient/)
188 serverClient.disconnect()
189 done()
190 })
191 })
192 })
193
194 it('should emit connect', function (done) {
195 var client = connect()
196 client.once('connect', function () {
197 client.end()
198 done()
199 })
200 client.once('error', done)
201 })
202
203 it('should provide connack packet with connect event', function (done) {
204 server.once('client', function (serverClient) {
205 serverClient.connack({returnCode: 0, sessionPresent: true})
206
207 server.once('client', function (serverClient) {
208 serverClient.connack({returnCode: 0, sessionPresent: false})
209 })
210 })
211
212 var client = connect()
213 client.once('connect', function (packet) {
214 should(packet.sessionPresent).be.equal(true)
215 client.once('connect', function (packet) {
216 should(packet.sessionPresent).be.equal(false)
217 client.end()
218 done()
219 })
220 })
221 })
222
223 it('should mark the client as connected', function (done) {
224 var client = connect()
225 client.once('connect', function () {
226 client.end()
227 if (client.connected) {
228 done()
229 } else {
230 done(new Error('Not marked as connected'))
231 }
232 })
233 })
234
235 it('should emit error', function (done) {
236 var client = connect({clientId: 'invalid'})
237 client.once('connect', function () {
238 done(new Error('Should not emit connect'))
239 })
240 client.once('error', function (/* error */) {
241 // to do
242 // check for error message
243 // and validate it is the expected one
244 client.end()
245 done()
246 })
247 })
248
249 it('should have different client ids', function (done) {
250 var client1 = connect()
251 var client2 = connect()
252
253 client1.options.clientId.should.not.equal(client2.options.clientId)
254 client1.end(true)
255 client2.end(true)
256 setImmediate(done)
257 })
258 })
259
260 describe('handling offline states', function () {
261 it('should emit offline events once when the client transitions from connected states to disconnected ones', function (done) {
262 var client = connect({reconnectPeriod: 20})
263
264 client.on('connect', function () {
265 this.stream.end()
266 })
267
268 client.on('offline', function () {
269 client.end(true, done)
270 })
271 })
272
273 it('should emit offline events once when the client (at first) can NOT connect to servers', function (done) {
274 // fake a port
275 var client = connect({ reconnectPeriod: 20, port: 4557 })
276
277 client.on('offline', function () {
278 client.end(true, done)
279 })
280 })
281 })
282
283 describe('topic validations when subscribing', function () {
284 it('should be ok for well-formated topics', function (done) {
285 var client = connect()
286 client.subscribe(
287 [
288 '+', '+/event', 'event/+', '#', 'event/#', 'system/event/+',
289 'system/+/event', 'system/registry/event/#', 'system/+/event/#',
290 'system/registry/event/new_device', 'system/+/+/new_device'
291 ],
292 function (err) {
293 client.end()
294 if (err) {
295 return done(new Error(err))
296 }
297 done()
298 }
299 )
300 })
301
302 it('should return an error (via callbacks) for topic #/event', function (done) {
303 var client = connect()
304 client.subscribe(['#/event', 'event#', 'event+'], function (err) {
305 client.end()
306 if (err) {
307 return done()
308 }
309 done(new Error('Validations do NOT work'))
310 })
311 })
312
313 it('should return an error (via callbacks) for topic #/event', function (done) {
314 var client = connect()
315 client.subscribe('#/event', function (err) {
316 client.end()
317 if (err) {
318 return done()
319 }
320 done(new Error('Validations do NOT work'))
321 })
322 })
323
324 it('should return an error (via callbacks) for topic event#', function (done) {
325 var client = connect()
326 client.subscribe('event#', function (err) {
327 client.end()
328 if (err) {
329 return done()
330 }
331 done(new Error('Validations do NOT work'))
332 })
333 })
334
335 it('should return an error (via callbacks) for topic system/#/event', function (done) {
336 var client = connect()
337 client.subscribe('system/#/event', function (err) {
338 client.end()
339 if (err) {
340 return done()
341 }
342 done(new Error('Validations do NOT work'))
343 })
344 })
345
346 it('should return an error (via callbacks) for topic system/+/#/event', function (done) {
347 var client = connect()
348 client.subscribe('system/+/#/event', function (err) {
349 client.end()
350 if (err) {
351 return done()
352 }
353 done(new Error('Validations do NOT work'))
354 })
355 })
356 })
357
358 describe('offline messages', function () {
359 it('should queue message until connected', function (done) {
360 var client = connect()
361
362 client.publish('test', 'test')
363 client.subscribe('test')
364 client.unsubscribe('test')
365 client.queue.length.should.equal(3)
366
367 client.once('connect', function () {
368 client.queue.length.should.equal(0)
369 client.end(true, done)
370 })
371 })
372
373 it('should not queue qos 0 messages if queueQoSZero is false', function (done) {
374 var client = connect({queueQoSZero: false})
375
376 client.publish('test', 'test', {qos: 0})
377 client.queue.length.should.equal(0)
378 client.end(true, done)
379 })
380
381 it('should still queue qos != 0 messages if queueQoSZero is false', function (done) {
382 var client = connect({queueQoSZero: false})
383
384 client.publish('test', 'test', {qos: 1})
385 client.publish('test', 'test', {qos: 2})
386 client.subscribe('test')
387 client.unsubscribe('test')
388 client.queue.length.should.equal(4)
389 client.end(true, done)
390 })
391
392 it('should call cb if an outgoing QoS 0 message is not sent', function (done) {
393 var client = connect({queueQoSZero: false})
394
395 client.publish('test', 'test', {qos: 0}, function () {
396 client.end(true, done)
397 })
398 })
399
400 if (!process.env.TRAVIS) {
401 it('should delay ending up until all inflight messages are delivered', function (done) {
402 var client = connect()
403
404 client.on('connect', function () {
405 client.subscribe('test', function () {
406 done()
407 })
408 client.publish('test', 'test', function () {
409 client.end()
410 })
411 })
412 })
413
414 it('wait QoS 1 publish messages', function (done) {
415 var client = connect()
416
417 client.on('connect', function () {
418 client.subscribe('test')
419 client.publish('test', 'test', { qos: 1 }, function () {
420 client.end()
421 })
422 client.on('message', function () {
423 done()
424 })
425 })
426
427 server.once('client', function (serverClient) {
428 serverClient.on('subscribe', function () {
429 serverClient.on('publish', function (packet) {
430 serverClient.publish(packet)
431 })
432 })
433 })
434 })
435
436 it('does not wait acks when force-closing', function (done) {
437 // non-running broker
438 var client = connect('mqtt://localhost:8993')
439
440 client.publish('test', 'test', { qos: 1 })
441 client.end(true, done)
442 })
443 }
444 })
445
446 describe('publishing', function () {
447 it('should publish a message (offline)', function (done) {
448 var client = connect()
449 var payload = 'test'
450 var topic = 'test'
451
452 client.publish(topic, payload)
453
454 server.once('client', function (serverClient) {
455 serverClient.once('publish', function (packet) {
456 packet.topic.should.equal(topic)
457 packet.payload.toString().should.equal(payload)
458 packet.qos.should.equal(0)
459 packet.retain.should.equal(false)
460 client.end()
461 done()
462 })
463 })
464 })
465
466 it('should publish a message (online)', function (done) {
467 var client = connect()
468 var payload = 'test'
469 var topic = 'test'
470
471 client.on('connect', function () {
472 client.publish(topic, payload)
473 })
474
475 server.once('client', function (serverClient) {
476 serverClient.once('publish', function (packet) {
477 packet.topic.should.equal(topic)
478 packet.payload.toString().should.equal(payload)
479 packet.qos.should.equal(0)
480 packet.retain.should.equal(false)
481 client.end()
482 done()
483 })
484 })
485 })
486
487 it('should emit a packetsend event', function (done) {
488 var client = connect()
489 var payload = 'test_payload'
490 var testTopic = 'testTopic'
491
492 client.on('packetsend', function (packet) {
493 if (packet.cmd === 'publish') {
494 packet.qos.should.equal(0)
495 packet.topic.should.equal(testTopic)
496 packet.payload.should.equal(payload)
497 packet.retain.should.equal(false)
498 client.end()
499 done()
500 }
501 })
502
503 client.publish(testTopic, payload)
504 })
505
506 it('should accept options', function (done) {
507 var client = connect()
508 var payload = 'test'
509 var topic = 'test'
510 var opts = {
511 retain: true,
512 qos: 1
513 }
514
515 client.once('connect', function () {
516 client.publish(topic, payload, opts)
517 })
518
519 server.once('client', function (serverClient) {
520 serverClient.once('publish', function (packet) {
521 packet.topic.should.equal(topic)
522 packet.payload.toString().should.equal(payload)
523 packet.qos.should.equal(opts.qos, 'incorrect qos')
524 packet.retain.should.equal(opts.retain, 'incorrect ret')
525 client.end()
526 done()
527 })
528 })
529 })
530
531 it('should fire a callback (qos 0)', function (done) {
532 var client = connect()
533
534 client.once('connect', function () {
535 client.publish('a', 'b', function () {
536 client.end()
537 done()
538 })
539 })
540 })
541
542 it('should fire a callback (qos 1)', function (done) {
543 var client = connect()
544 var opts = { qos: 1 }
545
546 client.once('connect', function () {
547 client.publish('a', 'b', opts, function () {
548 client.end()
549 done()
550 })
551 })
552 })
553
554 it('should fire a callback (qos 2)', function (done) {
555 var client = connect()
556 var opts = { qos: 2 }
557
558 client.once('connect', function () {
559 client.publish('a', 'b', opts, function () {
560 client.end()
561 done()
562 })
563 })
564 })
565
566 it('should support UTF-8 characters in topic', function (done) {
567 var client = connect()
568
569 client.once('connect', function () {
570 client.publish('中国', 'hello', function () {
571 client.end()
572 done()
573 })
574 })
575 })
576
577 it('should support UTF-8 characters in payload', function (done) {
578 var client = connect()
579
580 client.once('connect', function () {
581 client.publish('hello', '中国', function () {
582 client.end()
583 done()
584 })
585 })
586 })
587
588 it('Publish 10 QoS 2 and receive them', function (done) {
589 var client = connect()
590 var count = 0
591
592 client.on('connect', function () {
593 client.subscribe('test')
594 client.publish('test', 'test', { qos: 2 })
595 })
596
597 client.on('message', function () {
598 if (count >= 10) {
599 client.end()
600 done()
601 } else {
602 client.publish('test', 'test', { qos: 2 })
603 }
604 })
605
606 server.once('client', function (serverClient) {
607 serverClient.on('offline', function () {
608 client.end()
609 done('error went offline... didnt see this happen')
610 })
611
612 serverClient.on('subscribe', function () {
613 serverClient.on('publish', function (packet) {
614 serverClient.publish(packet)
615 })
616 })
617
618 serverClient.on('pubrel', function () {
619 count++
620 })
621 })
622 })
623 })
624
625 describe('unsubscribing', function () {
626 it('should send an unsubscribe packet (offline)', function (done) {
627 var client = connect()
628
629 client.unsubscribe('test')
630
631 server.once('client', function (serverClient) {
632 serverClient.once('unsubscribe', function (packet) {
633 packet.unsubscriptions.should.containEql('test')
634 client.end()
635 done()
636 })
637 })
638 })
639
640 it('should send an unsubscribe packet', function (done) {
641 var client = connect()
642 var topic = 'topic'
643
644 client.once('connect', function () {
645 client.unsubscribe(topic)
646 })
647
648 server.once('client', function (serverClient) {
649 serverClient.once('unsubscribe', function (packet) {
650 packet.unsubscriptions.should.containEql(topic)
651 client.end()
652 done()
653 })
654 })
655 })
656
657 it('should emit a packetsend event', function (done) {
658 var client = connect()
659 var testTopic = 'testTopic'
660
661 client.once('connect', function () {
662 client.subscribe(testTopic)
663 })
664
665 client.on('packetsend', function (packet) {
666 if (packet.cmd === 'subscribe') {
667 client.end()
668 done()
669 }
670 })
671 })
672
673 it('should emit a packetreceive event', function (done) {
674 var client = connect()
675 var testTopic = 'testTopic'
676
677 client.once('connect', function () {
678 client.subscribe(testTopic)
679 })
680
681 client.on('packetreceive', function (packet) {
682 if (packet.cmd === 'suback') {
683 client.end()
684 done()
685 }
686 })
687 })
688
689 it('should accept an array of unsubs', function (done) {
690 var client = connect()
691 var topics = ['topic1', 'topic2']
692
693 client.once('connect', function () {
694 client.unsubscribe(topics)
695 })
696
697 server.once('client', function (serverClient) {
698 serverClient.once('unsubscribe', function (packet) {
699 packet.unsubscriptions.should.eql(topics)
700 done()
701 })
702 })
703 })
704
705 it('should fire a callback on unsuback', function (done) {
706 var client = connect()
707 var topic = 'topic'
708
709 client.once('connect', function () {
710 client.unsubscribe(topic, done)
711 })
712
713 server.once('client', function (serverClient) {
714 serverClient.once('unsubscribe', function (packet) {
715 serverClient.unsuback(packet)
716 client.end()
717 })
718 })
719 })
720
721 it('should unsubscribe from a chinese topic', function (done) {
722 var client = connect()
723 var topic = '中国'
724
725 client.once('connect', function () {
726 client.unsubscribe(topic)
727 })
728
729 server.once('client', function (serverClient) {
730 serverClient.once('unsubscribe', function (packet) {
731 packet.unsubscriptions.should.containEql(topic)
732 client.end()
733 done()
734 })
735 })
736 })
737 })
738
739 describe('keepalive', function () {
740 var clock
741
742 beforeEach(function () {
743 clock = sinon.useFakeTimers()
744 })
745
746 afterEach(function () {
747 clock.restore()
748 })
749
750 it('should checkPing at keepalive interval', function (done) {
751 var interval = 3
752 var client = connect({ keepalive: interval })
753
754 client._checkPing = sinon.spy()
755
756 client.once('connect', function () {
757 clock.tick(interval * 1000)
758 client._checkPing.callCount.should.equal(1)
759
760 clock.tick(interval * 1000)
761 client._checkPing.callCount.should.equal(2)
762
763 clock.tick(interval * 1000)
764 client._checkPing.callCount.should.equal(3)
765
766 client.end()
767 done()
768 })
769 })
770
771 it('should not checkPing if publishing at a higher rate than keepalive', function (done) {
772 var intervalMs = 3000
773 var client = connect({keepalive: intervalMs / 1000})
774
775 client._checkPing = sinon.spy()
776
777 client.once('connect', function () {
778 client.publish('foo', 'bar')
779 clock.tick(intervalMs - 1)
780 client.publish('foo', 'bar')
781 clock.tick(2)
782 client._checkPing.callCount.should.equal(0)
783 client.end()
784 done()
785 })
786 })
787
788 it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function (done) {
789 var intervalMs = 3000
790 var client = connect({
791 keepalive: intervalMs / 1000,
792 reschedulePings: false
793 })
794
795 client._checkPing = sinon.spy()
796
797 client.once('connect', function () {
798 client.publish('foo', 'bar')
799 clock.tick(intervalMs - 1)
800 client.publish('foo', 'bar')
801 clock.tick(2)
802 client._checkPing.callCount.should.equal(1)
803 client.end()
804 done()
805 })
806 })
807 })
808
809 describe('pinging', function () {
810 it('should set a ping timer', function (done) {
811 var client = connect({keepalive: 3})
812 client.once('connect', function () {
813 should.exist(client.pingTimer)
814 client.end()
815 done()
816 })
817 })
818
819 it('should not set a ping timer keepalive=0', function (done) {
820 var client = connect({keepalive: 0})
821 client.on('connect', function () {
822 should.not.exist(client.pingTimer)
823 client.end()
824 done()
825 })
826 })
827
828 it('should reconnect if pingresp is not sent', function (done) {
829 var client = connect({keepalive: 1, reconnectPeriod: 100})
830
831 // Fake no pingresp being send by stubbing the _handlePingresp function
832 client._handlePingresp = function () {}
833
834 client.once('connect', function () {
835 client.once('connect', function () {
836 client.end()
837 done()
838 })
839 })
840 })
841
842 it('should not reconnect if pingresp is successful', function (done) {
843 var client = connect({keepalive: 100})
844 client.once('close', function () {
845 done(new Error('Client closed connection'))
846 })
847 setTimeout(done, 1000)
848 })
849
850 it('should defer the next ping when sending a control packet', function (done) {
851 var client = connect({keepalive: 1})
852
853 client.once('connect', function () {
854 client._checkPing = sinon.spy()
855
856 client.publish('foo', 'bar')
857 setTimeout(function () {
858 client._checkPing.callCount.should.equal(0)
859 client.publish('foo', 'bar')
860
861 setTimeout(function () {
862 client._checkPing.callCount.should.equal(0)
863 client.publish('foo', 'bar')
864
865 setTimeout(function () {
866 client._checkPing.callCount.should.equal(0)
867 done()
868 }, 75)
869 }, 75)
870 }, 75)
871 })
872 })
873 })
874
875 describe('subscribing', function () {
876 it('should send a subscribe message (offline)', function (done) {
877 var client = connect()
878
879 client.subscribe('test')
880
881 server.once('client', function (serverClient) {
882 serverClient.once('subscribe', function () {
883 done()
884 })
885 })
886 })
887
888 it('should send a subscribe message', function (done) {
889 var client = connect()
890 var topic = 'test'
891
892 client.once('connect', function () {
893 client.subscribe(topic)
894 })
895
896 server.once('client', function (serverClient) {
897 serverClient.once('subscribe', function (packet) {
898 packet.subscriptions.should.containEql({
899 topic: topic,
900 qos: 0
901 })
902 done()
903 })
904 })
905 })
906
907 it('should emit a packetsend event', function (done) {
908 var client = connect()
909 var testTopic = 'testTopic'
910
911 client.once('connect', function () {
912 client.subscribe(testTopic)
913 })
914
915 client.on('packetsend', function (packet) {
916 if (packet.cmd === 'subscribe') {
917 done()
918 }
919 })
920 })
921
922 it('should emit a packetreceive event', function (done) {
923 var client = connect()
924 var testTopic = 'testTopic'
925
926 client.once('connect', function () {
927 client.subscribe(testTopic)
928 })
929
930 client.on('packetreceive', function (packet) {
931 if (packet.cmd === 'suback') {
932 done()
933 }
934 })
935 })
936
937 it('should accept an array of subscriptions', function (done) {
938 var client = connect()
939 var subs = ['test1', 'test2']
940
941 client.once('connect', function () {
942 client.subscribe(subs)
943 })
944
945 server.once('client', function (serverClient) {
946 serverClient.once('subscribe', function (packet) {
947 // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
948 var expected = subs.map(function (i) {
949 return {topic: i, qos: 0}
950 })
951
952 packet.subscriptions.should.eql(expected)
953 done()
954 })
955 })
956 })
957
958 it('should accept an hash of subscriptions', function (done) {
959 var client = connect()
960 var topics = {
961 test1: 0,
962 test2: 1
963 }
964
965 client.once('connect', function () {
966 client.subscribe(topics)
967 })
968
969 server.once('client', function (serverClient) {
970 serverClient.once('subscribe', function (packet) {
971 var k
972 var expected = []
973
974 for (k in topics) {
975 if (topics.hasOwnProperty(k)) {
976 expected.push({
977 topic: k,
978 qos: topics[k]
979 })
980 }
981 }
982
983 packet.subscriptions.should.eql(expected)
984 done()
985 })
986 })
987 })
988
989 it('should accept an options parameter', function (done) {
990 var client = connect()
991 var topic = 'test'
992 var opts = {qos: 1}
993
994 client.once('connect', function () {
995 client.subscribe(topic, opts)
996 })
997
998 server.once('client', function (serverClient) {
999 serverClient.once('subscribe', function (packet) {
1000 var expected = [{
1001 topic: topic,
1002 qos: 1
1003 }]
1004
1005 packet.subscriptions.should.eql(expected)
1006 done()
1007 })
1008 })
1009 })
1010
1011 it('should fire a callback on suback', function (done) {
1012 var client = connect()
1013 var topic = 'test'
1014
1015 client.once('connect', function () {
1016 client.subscribe(topic, { qos: 2 }, function (err, granted) {
1017 if (err) {
1018 done(err)
1019 } else {
1020 should.exist(granted, 'granted not given')
1021 granted.should.containEql({topic: 'test', qos: 2})
1022 done()
1023 }
1024 })
1025 })
1026 })
1027
1028 it('should fire a callback with error if disconnected (options provided)', function (done) {
1029 var client = connect()
1030 var topic = 'test'
1031 client.once('connect', function () {
1032 client.end(true, function () {
1033 client.subscribe(topic, {qos: 2}, function (err, granted) {
1034 should.not.exist(granted, 'granted given')
1035 should.exist(err, 'no error given')
1036 done()
1037 })
1038 })
1039 })
1040 })
1041
1042 it('should fire a callback with error if disconnected (options not provided)', function (done) {
1043 var client = connect()
1044 var topic = 'test'
1045
1046 client.once('connect', function () {
1047 client.end(true, function () {
1048 client.subscribe(topic, function (err, granted) {
1049 should.not.exist(granted, 'granted given')
1050 should.exist(err, 'no error given')
1051 done()
1052 })
1053 })
1054 })
1055 })
1056
1057 it('should subscribe with a chinese topic', function (done) {
1058 var client = connect()
1059 var topic = '中国'
1060
1061 client.once('connect', function () {
1062 client.subscribe(topic)
1063 })
1064
1065 server.once('client', function (serverClient) {
1066 serverClient.once('subscribe', function (packet) {
1067 packet.subscriptions.should.containEql({
1068 topic: topic,
1069 qos: 0
1070 })
1071 done()
1072 })
1073 })
1074 })
1075 })
1076
1077 describe('receiving messages', function () {
1078 it('should fire the message event', function (done) {
1079 var client = connect()
1080 var testPacket = {
1081 topic: 'test',
1082 payload: 'message',
1083 retain: true,
1084 qos: 1,
1085 messageId: 5
1086 }
1087
1088 client.subscribe(testPacket.topic)
1089 client.once('message', function (topic, message, packet) {
1090 topic.should.equal(testPacket.topic)
1091 message.toString().should.equal(testPacket.payload)
1092 packet.should.equal(packet)
1093 client.end()
1094 done()
1095 })
1096
1097 server.once('client', function (serverClient) {
1098 serverClient.on('subscribe', function () {
1099 serverClient.publish(testPacket)
1100 })
1101 })
1102 })
1103
1104 it('should emit a packetreceive event', function (done) {
1105 var client = connect()
1106 var testPacket = {
1107 topic: 'test',
1108 payload: 'message',
1109 retain: true,
1110 qos: 1,
1111 messageId: 5
1112 }
1113
1114 client.subscribe(testPacket.topic)
1115 client.on('packetreceive', function (packet) {
1116 if (packet.cmd === 'publish') {
1117 packet.qos.should.equal(1)
1118 packet.topic.should.equal(testPacket.topic)
1119 packet.payload.toString().should.equal(testPacket.payload)
1120 packet.retain.should.equal(true)
1121 client.end()
1122 done()
1123 }
1124 })
1125
1126 server.once('client', function (serverClient) {
1127 serverClient.on('subscribe', function () {
1128 serverClient.publish(testPacket)
1129 })
1130 })
1131 })
1132
1133 it('should support binary data', function (done) {
1134 var client = connect({ encoding: 'binary' })
1135 var testPacket = {
1136 topic: 'test',
1137 payload: 'message',
1138 retain: true,
1139 qos: 1,
1140 messageId: 5
1141 }
1142
1143 client.subscribe(testPacket.topic)
1144 client.once('message', function (topic, message, packet) {
1145 topic.should.equal(testPacket.topic)
1146 message.should.be.an.instanceOf(Buffer)
1147 message.toString().should.equal(testPacket.payload)
1148 packet.should.equal(packet)
1149 done()
1150 })
1151
1152 server.once('client', function (serverClient) {
1153 serverClient.on('subscribe', function () {
1154 serverClient.publish(testPacket)
1155 })
1156 })
1157 })
1158
1159 it('should emit a message event (qos=2)', function (done) {
1160 var client = connect()
1161 var testPacket = {
1162 topic: 'test',
1163 payload: 'message',
1164 retain: true,
1165 qos: 2,
1166 messageId: 5
1167 }
1168
1169 server.testPublish = testPacket
1170
1171 client.subscribe(testPacket.topic)
1172 client.once('message', function (topic, message, packet) {
1173 topic.should.equal(testPacket.topic)
1174 message.toString().should.equal(testPacket.payload)
1175 packet.should.equal(packet)
1176 done()
1177 })
1178
1179 server.once('client', function (serverClient) {
1180 serverClient.on('subscribe', function () {
1181 serverClient.publish(testPacket)
1182 })
1183 })
1184 })
1185
1186 it('should emit a message event (qos=2) - repeated publish', function (done) {
1187 var client = connect()
1188 var testPacket = {
1189 topic: 'test',
1190 payload: 'message',
1191 retain: true,
1192 qos: 2,
1193 messageId: 5
1194 }
1195
1196 server.testPublish = testPacket
1197
1198 client.subscribe(testPacket.topic)
1199 client.on('message', function (topic, message, packet) {
1200 topic.should.equal(testPacket.topic)
1201 message.toString().should.equal(testPacket.payload)
1202 packet.should.equal(packet)
1203 done()
1204 })
1205
1206 server.once('client', function (serverClient) {
1207 serverClient.on('subscribe', function () {
1208 serverClient.publish(testPacket)
1209 // twice, should be ignored
1210 serverClient.publish(testPacket)
1211 })
1212 })
1213 })
1214
1215 it('should support chinese topic', function (done) {
1216 var client = connect({ encoding: 'binary' })
1217 var testPacket = {
1218 topic: '国',
1219 payload: 'message',
1220 retain: true,
1221 qos: 1,
1222 messageId: 5
1223 }
1224
1225 client.subscribe(testPacket.topic)
1226 client.once('message', function (topic, message, packet) {
1227 topic.should.equal(testPacket.topic)
1228 message.should.be.an.instanceOf(Buffer)
1229 message.toString().should.equal(testPacket.payload)
1230 packet.should.equal(packet)
1231 done()
1232 })
1233
1234 server.once('client', function (serverClient) {
1235 serverClient.on('subscribe', function () {
1236 serverClient.publish(testPacket)
1237 })
1238 })
1239 })
1240 })
1241
1242 describe('qos handling', function () {
1243 it('should follow qos 0 semantics (trivial)', function (done) {
1244 var client = connect()
1245 var testTopic = 'test'
1246 var testMessage = 'message'
1247
1248 client.once('connect', function () {
1249 client.subscribe(testTopic, {qos: 0})
1250 })
1251
1252 server.once('client', function (serverClient) {
1253 serverClient.once('subscribe', function () {
1254 serverClient.publish({
1255 topic: testTopic,
1256 payload: testMessage,
1257 qos: 0,
1258 retain: false
1259 })
1260 done()
1261 })
1262 })
1263 })
1264
1265 it('should follow qos 1 semantics', function (done) {
1266 var client = connect()
1267 var testTopic = 'test'
1268 var testMessage = 'message'
1269 var mid = 50
1270
1271 client.once('connect', function () {
1272 client.subscribe(testTopic, {qos: 1})
1273 })
1274
1275 server.once('client', function (serverClient) {
1276 serverClient.once('subscribe', function () {
1277 serverClient.publish({
1278 topic: testTopic,
1279 payload: testMessage,
1280 messageId: mid,
1281 qos: 1
1282 })
1283 })
1284
1285 serverClient.once('puback', function (packet) {
1286 packet.messageId.should.equal(mid)
1287 done()
1288 })
1289 })
1290 })
1291
1292 it('should follow qos 2 semantics', function (done) {
1293 var client = connect()
1294 var testTopic = 'test'
1295 var testMessage = 'message'
1296 var mid = 253
1297
1298 client.once('connect', function () {
1299 client.subscribe(testTopic, {qos: 2})
1300 })
1301
1302 server.once('client', function (serverClient) {
1303 serverClient.once('subscribe', function () {
1304 serverClient.publish({
1305 topic: testTopic,
1306 payload: testMessage,
1307 qos: 2,
1308 messageId: mid
1309 })
1310 })
1311
1312 serverClient.once('pubcomp', function () {
1313 done()
1314 })
1315 })
1316 })
1317 })
1318
1319 describe('auto reconnect', function () {
1320 it('should mark the client disconnecting if #end called', function () {
1321 var client = connect()
1322
1323 client.end()
1324 client.disconnecting.should.eql(true)
1325 })
1326
1327 it('should reconnect after stream disconnect', function (done) {
1328 var client = connect()
1329 var tryReconnect = true
1330
1331 client.on('connect', function () {
1332 if (tryReconnect) {
1333 client.stream.end()
1334 tryReconnect = false
1335 } else {
1336 client.end()
1337 done()
1338 }
1339 })
1340 })
1341
1342 it('should emit \'reconnect\' when reconnecting', function (done) {
1343 var client = connect()
1344 var tryReconnect = true
1345 var reconnectEvent = false
1346
1347 client.on('reconnect', function () {
1348 reconnectEvent = true
1349 })
1350
1351 client.on('connect', function () {
1352 if (tryReconnect) {
1353 client.stream.end()
1354 tryReconnect = false
1355 } else {
1356 reconnectEvent.should.equal(true)
1357 client.end()
1358 done()
1359 }
1360 })
1361 })
1362
1363 it('should emit \'offline\' after going offline', function (done) {
1364 var client = connect()
1365 var tryReconnect = true
1366 var offlineEvent = false
1367
1368 client.on('offline', function () {
1369 offlineEvent = true
1370 })
1371
1372 client.on('connect', function () {
1373 if (tryReconnect) {
1374 client.stream.end()
1375 tryReconnect = false
1376 } else {
1377 offlineEvent.should.equal(true)
1378 client.end()
1379 done()
1380 }
1381 })
1382 })
1383
1384 it('should not reconnect if it was ended by the user', function (done) {
1385 var client = connect()
1386
1387 client.on('connect', function () {
1388 client.end()
1389 done() // it will raise an exception if called two times
1390 })
1391 })
1392
1393 it('should setup a reconnect timer on disconnect', function (done) {
1394 var client = connect()
1395
1396 client.once('connect', function () {
1397 should.not.exist(client.reconnectTimer)
1398 client.stream.end()
1399 })
1400
1401 client.once('close', function () {
1402 should.exist(client.reconnectTimer)
1403 client.end()
1404 done()
1405 })
1406 })
1407
1408 it('should allow specification of a reconnect period', function (done) {
1409 var end
1410 var period = 200
1411 var client = connect({reconnectPeriod: period})
1412 var reconnect = false
1413 var start = Date.now()
1414
1415 client.on('connect', function () {
1416 if (!reconnect) {
1417 client.stream.end()
1418 reconnect = true
1419 } else {
1420 client.end()
1421 end = Date.now()
1422 if (end - start >= period) {
1423 // Connected in about 2 seconds, that's good enough
1424 done()
1425 } else {
1426 done(new Error('Strange reconnect period'))
1427 }
1428 }
1429 })
1430 })
1431
1432 it('should resend in-flight QoS 1 publish messages from the client', function (done) {
1433 var client = connect({reconnectPeriod: 200})
1434 var serverPublished = false
1435 var clientCalledBack = false
1436
1437 server.once('client', function (serverClient) {
1438 serverClient.on('connect', function () {
1439 setImmediate(function () {
1440 serverClient.stream.destroy()
1441 })
1442 })
1443
1444 server.once('client', function (serverClientNew) {
1445 serverClientNew.on('publish', function () {
1446 serverPublished = true
1447 check()
1448 })
1449 })
1450 })
1451
1452 client.publish('hello', 'world', { qos: 1 }, function () {
1453 clientCalledBack = true
1454 check()
1455 })
1456
1457 function check () {
1458 if (serverPublished && clientCalledBack) {
1459 client.end()
1460 done()
1461 }
1462 }
1463 })
1464
1465 it('should resend in-flight QoS 2 publish messages from the client', function (done) {
1466 var client = connect({reconnectPeriod: 200})
1467 var serverPublished = false
1468 var clientCalledBack = false
1469
1470 server.once('client', function (serverClient) {
1471 // ignore errors
1472 serverClient.on('error', function () {})
1473 serverClient.on('publish', function () {
1474 setImmediate(function () {
1475 serverClient.stream.destroy()
1476 })
1477 })
1478
1479 server.once('client', function (serverClientNew) {
1480 serverClientNew.on('pubrel', function () {
1481 serverPublished = true
1482 check()
1483 })
1484 })
1485 })
1486
1487 client.publish('hello', 'world', { qos: 2 }, function () {
1488 clientCalledBack = true
1489 check()
1490 })
1491
1492 function check () {
1493 if (serverPublished && clientCalledBack) {
1494 client.end()
1495 done()
1496 }
1497 }
1498 })
1499
1500 it('should resubscribe when reconnecting', function (done) {
1501 var client = connect({ reconnectPeriod: 100 })
1502 var tryReconnect = true
1503 var reconnectEvent = false
1504
1505 client.on('reconnect', function () {
1506 reconnectEvent = true
1507 })
1508
1509 client.on('connect', function () {
1510 if (tryReconnect) {
1511 client.subscribe('hello', function () {
1512 client.stream.end()
1513
1514 server.once('client', function (serverClient) {
1515 serverClient.on('subscribe', function () {
1516 client.end()
1517 done()
1518 })
1519 })
1520 })
1521
1522 tryReconnect = false
1523 } else {
1524 reconnectEvent.should.equal(true)
1525 }
1526 })
1527 })
1528 })
1529}