UNPKG

2.57 kBJavaScriptView Raw
1'use strict'
2
3var generateStream = require('./lib/generateStream')
4var parseStream = require('./lib/parseStream')
5var writeToStream = require('./lib/writeToStream')
6var Duplexify = require('duplexify')
7var inherits = require('inherits')
8
9function emitPacket (packet) {
10 this.emit(packet.cmd, packet)
11}
12
13function Connection (duplex, opts, cb) {
14 if (!(this instanceof Connection)) {
15 return new Connection(duplex, opts)
16 }
17
18 if (typeof opts === 'function') {
19 cb = opts
20 opts = {}
21 }
22
23 opts = opts || {}
24
25 this._generator = writeToStream(duplex, opts)
26 this._parser = parseStream(opts)
27
28 // defer piping, so consumer can attach event listeners
29 // otherwise we might lose events
30 process.nextTick(() => {
31 duplex.pipe(this._parser)
32 })
33
34 this._generator.on('error', this.emit.bind(this, 'error'))
35 this._parser.on('error', this.emit.bind(this, 'error'))
36
37 this.stream = duplex
38
39 duplex.on('error', this.emit.bind(this, 'error'))
40 duplex.on('close', this.emit.bind(this, 'close'))
41
42 Duplexify.call(this, this._generator, this._parser, { objectMode: true })
43
44 // MQTT.js basic default
45 if (opts.notData !== true) {
46 var that = this
47 this.once('data', function (connectPacket) {
48 that.setOptions(connectPacket, opts)
49 that.on('data', emitPacket)
50 if (cb) {
51 cb()
52 }
53 that.emit('data', connectPacket)
54 })
55 }
56}
57
58inherits(Connection, Duplexify)
59
60;['connect',
61 'connack',
62 'publish',
63 'puback',
64 'pubrec',
65 'pubrel',
66 'pubcomp',
67 'subscribe',
68 'suback',
69 'unsubscribe',
70 'unsuback',
71 'pingreq',
72 'pingresp',
73 'disconnect',
74 'auth'
75].forEach(function (cmd) {
76 Connection.prototype[cmd] = function (opts, cb) {
77 opts = opts || {}
78 opts.cmd = cmd
79
80 // Flush the buffer if needed
81 // UGLY hack, we should listen for the 'drain' event
82 // and start writing again, but this works too
83 this.write(opts)
84 if (cb) setImmediate(cb)
85 }
86})
87
88Connection.prototype.destroy = function () {
89 if (this.stream.destroy) this.stream.destroy()
90 else this.stream.end()
91}
92
93Connection.prototype.setOptions = function (packet, opts) {
94 let options = {}
95 Object.assign(options, packet)
96 // Specifically set the protocol version for client connections
97 if (options.cmd === 'connack') {
98 options.protocolVersion = opts && opts.protocolVersion ? opts.protocolVersion : 4
99 }
100 this.options = options
101 this._parser.setOptions(options)
102 this._generator.setOptions(options)
103}
104
105module.exports = Connection
106module.exports.parseStream = parseStream
107module.exports.generateStream = generateStream