UNPKG

23.8 kBJavaScriptView Raw
1const bl = require('bl')
2const EventEmitter = require('events')
3const Packet = require('./packet')
4const constants = require('./constants')
5const debug = require('debug')('mqtt-packet:parser')
6
7class Parser extends EventEmitter {
8 constructor () {
9 super()
10 this.parser = this.constructor.parser
11 }
12
13 static parser (opt) {
14 if (!(this instanceof Parser)) return (new Parser()).parser(opt)
15
16 this.settings = opt || {}
17
18 this._states = [
19 '_parseHeader',
20 '_parseLength',
21 '_parsePayload',
22 '_newPacket'
23 ]
24
25 this._resetState()
26 return this
27 }
28
29 _resetState () {
30 debug('_resetState: resetting packet, error, _list, and _stateCounter')
31 this.packet = new Packet()
32 this.error = null
33 this._list = bl()
34 this._stateCounter = 0
35 }
36
37 parse (buf) {
38 if (this.error) this._resetState()
39
40 this._list.append(buf)
41 debug('parse: current state: %s', this._states[this._stateCounter])
42 while ((this.packet.length !== -1 || this._list.length > 0) &&
43 this[this._states[this._stateCounter]]() &&
44 !this.error) {
45 this._stateCounter++
46 debug('parse: state complete. _stateCounter is now: %d', this._stateCounter)
47 debug('parse: packet.length: %d, buffer list length: %d', this.packet.length, this._list.length)
48 if (this._stateCounter >= this._states.length) this._stateCounter = 0
49 }
50 debug('parse: exited while loop. packet: %d, buffer list length: %d', this.packet.length, this._list.length)
51 return this._list.length
52 }
53
54 _parseHeader () {
55 // There is at least one byte in the buffer
56 const zero = this._list.readUInt8(0)
57 const cmdIndex = zero >> constants.CMD_SHIFT
58 this.packet.cmd = constants.types[cmdIndex]
59 const headerFlags = zero & 0xf
60 const requiredHeaderFlags = constants.requiredHeaderFlags[cmdIndex]
61 if (requiredHeaderFlags != null && headerFlags !== requiredHeaderFlags) {
62 // Where a flag bit is marked as “Reserved” in Table 2.2 - Flag Bits, it is reserved for future use and MUST be set to the value listed in that table [MQTT-2.2.2-1]. If invalid flags are received, the receiver MUST close the Network Connection [MQTT-2.2.2-2]
63 return this._emitError(new Error(constants.requiredHeaderFlagsErrors[cmdIndex]))
64 }
65 this.packet.retain = (zero & constants.RETAIN_MASK) !== 0
66 this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK
67 if (this.packet.qos > 2) {
68 return this._emitError(new Error('Packet must not have both QoS bits set to 1'))
69 }
70 this.packet.dup = (zero & constants.DUP_MASK) !== 0
71 debug('_parseHeader: packet: %o', this.packet)
72
73 this._list.consume(1)
74
75 return true
76 }
77
78 _parseLength () {
79 // There is at least one byte in the list
80 const result = this._parseVarByteNum(true)
81
82 if (result) {
83 this.packet.length = result.value
84 this._list.consume(result.bytes)
85 }
86 debug('_parseLength %d', result.value)
87 return !!result
88 }
89
90 _parsePayload () {
91 debug('_parsePayload: payload %O', this._list)
92 let result = false
93
94 // Do we have a payload? Do we have enough data to complete the payload?
95 // PINGs have no payload
96 if (this.packet.length === 0 || this._list.length >= this.packet.length) {
97 this._pos = 0
98
99 switch (this.packet.cmd) {
100 case 'connect':
101 this._parseConnect()
102 break
103 case 'connack':
104 this._parseConnack()
105 break
106 case 'publish':
107 this._parsePublish()
108 break
109 case 'puback':
110 case 'pubrec':
111 case 'pubrel':
112 case 'pubcomp':
113 this._parseConfirmation()
114 break
115 case 'subscribe':
116 this._parseSubscribe()
117 break
118 case 'suback':
119 this._parseSuback()
120 break
121 case 'unsubscribe':
122 this._parseUnsubscribe()
123 break
124 case 'unsuback':
125 this._parseUnsuback()
126 break
127 case 'pingreq':
128 case 'pingresp':
129 // These are empty, nothing to do
130 break
131 case 'disconnect':
132 this._parseDisconnect()
133 break
134 case 'auth':
135 this._parseAuth()
136 break
137 default:
138 this._emitError(new Error('Not supported'))
139 }
140
141 result = true
142 }
143 debug('_parsePayload complete result: %s', result)
144 return result
145 }
146
147 _parseConnect () {
148 debug('_parseConnect')
149 let topic // Will topic
150 let payload // Will payload
151 let password // Password
152 let username // Username
153 const flags = {}
154 const packet = this.packet
155
156 // Parse protocolId
157 const protocolId = this._parseString()
158
159 if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId'))
160 if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') {
161 return this._emitError(new Error('Invalid protocolId'))
162 }
163
164 packet.protocolId = protocolId
165
166 // Parse constants version number
167 if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short'))
168
169 packet.protocolVersion = this._list.readUInt8(this._pos)
170
171 if (packet.protocolVersion >= 128) {
172 packet.bridgeMode = true
173 packet.protocolVersion = packet.protocolVersion - 128
174 }
175
176 if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4 && packet.protocolVersion !== 5) {
177 return this._emitError(new Error('Invalid protocol version'))
178 }
179
180 this._pos++
181
182 if (this._pos >= this._list.length) {
183 return this._emitError(new Error('Packet too short'))
184 }
185
186 if (this._list.readUInt8(this._pos) & 0x1) {
187 // The Server MUST validate that the reserved flag in the CONNECT Control Packet is set to zero and disconnect the Client if it is not zero [MQTT-3.1.2-3]
188 return this._emitError(new Error('Connect flag bit 0 must be 0, but got 1'))
189 }
190 // Parse connect flags
191 flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
192 flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)
193 flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK)
194
195 const willRetain = !!(this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK)
196 const willQos = (this._list.readUInt8(this._pos) &
197 constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT
198
199 if (flags.will) {
200 packet.will = {}
201 packet.will.retain = willRetain
202 packet.will.qos = willQos
203 } else {
204 if (willRetain) {
205 return this._emitError(new Error('Will Retain Flag must be set to zero when Will Flag is set to 0'))
206 }
207 if (willQos) {
208 return this._emitError(new Error('Will QoS must be set to zero when Will Flag is set to 0'))
209 }
210 }
211
212 packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
213 this._pos++
214
215 // Parse keepalive
216 packet.keepalive = this._parseNum()
217 if (packet.keepalive === -1) return this._emitError(new Error('Packet too short'))
218
219 // parse properties
220 if (packet.protocolVersion === 5) {
221 const properties = this._parseProperties()
222 if (Object.getOwnPropertyNames(properties).length) {
223 packet.properties = properties
224 }
225 }
226 // Parse clientId
227 const clientId = this._parseString()
228 if (clientId === null) return this._emitError(new Error('Packet too short'))
229 packet.clientId = clientId
230 debug('_parseConnect: packet.clientId: %s', packet.clientId)
231
232 if (flags.will) {
233 if (packet.protocolVersion === 5) {
234 const willProperties = this._parseProperties()
235 if (Object.getOwnPropertyNames(willProperties).length) {
236 packet.will.properties = willProperties
237 }
238 }
239 // Parse will topic
240 topic = this._parseString()
241 if (topic === null) return this._emitError(new Error('Cannot parse will topic'))
242 packet.will.topic = topic
243 debug('_parseConnect: packet.will.topic: %s', packet.will.topic)
244
245 // Parse will payload
246 payload = this._parseBuffer()
247 if (payload === null) return this._emitError(new Error('Cannot parse will payload'))
248 packet.will.payload = payload
249 debug('_parseConnect: packet.will.paylaod: %s', packet.will.payload)
250 }
251
252 // Parse username
253 if (flags.username) {
254 username = this._parseString()
255 if (username === null) return this._emitError(new Error('Cannot parse username'))
256 packet.username = username
257 debug('_parseConnect: packet.username: %s', packet.username)
258 }
259
260 // Parse password
261 if (flags.password) {
262 password = this._parseBuffer()
263 if (password === null) return this._emitError(new Error('Cannot parse password'))
264 packet.password = password
265 }
266 // need for right parse auth packet and self set up
267 this.settings = packet
268 debug('_parseConnect: complete')
269 return packet
270 }
271
272 _parseConnack () {
273 debug('_parseConnack')
274 const packet = this.packet
275
276 if (this._list.length < 1) return null
277 const flags = this._list.readUInt8(this._pos++)
278 if (flags > 1) {
279 return this._emitError(new Error('Invalid connack flags, bits 7-1 must be set to 0'))
280 }
281 packet.sessionPresent = !!(flags & constants.SESSIONPRESENT_MASK)
282
283 if (this.settings.protocolVersion === 5) {
284 if (this._list.length >= 2) {
285 packet.reasonCode = this._list.readUInt8(this._pos++)
286 } else {
287 packet.reasonCode = 0
288 }
289 } else {
290 if (this._list.length < 2) return null
291 packet.returnCode = this._list.readUInt8(this._pos++)
292 }
293
294 if (packet.returnCode === -1 || packet.reasonCode === -1) return this._emitError(new Error('Cannot parse return code'))
295 // mqtt 5 properties
296 if (this.settings.protocolVersion === 5) {
297 const properties = this._parseProperties()
298 if (Object.getOwnPropertyNames(properties).length) {
299 packet.properties = properties
300 }
301 }
302 debug('_parseConnack: complete')
303 }
304
305 _parsePublish () {
306 debug('_parsePublish')
307 const packet = this.packet
308 packet.topic = this._parseString()
309
310 if (packet.topic === null) return this._emitError(new Error('Cannot parse topic'))
311
312 // Parse messageId
313 if (packet.qos > 0) if (!this._parseMessageId()) { return }
314
315 // Properties mqtt 5
316 if (this.settings.protocolVersion === 5) {
317 const properties = this._parseProperties()
318 if (Object.getOwnPropertyNames(properties).length) {
319 packet.properties = properties
320 }
321 }
322
323 packet.payload = this._list.slice(this._pos, packet.length)
324 debug('_parsePublish: payload from buffer list: %o', packet.payload)
325 }
326
327 _parseSubscribe () {
328 debug('_parseSubscribe')
329 const packet = this.packet
330 let topic
331 let options
332 let qos
333 let rh
334 let rap
335 let nl
336 let subscription
337
338 packet.subscriptions = []
339
340 if (!this._parseMessageId()) { return }
341
342 // Properties mqtt 5
343 if (this.settings.protocolVersion === 5) {
344 const properties = this._parseProperties()
345 if (Object.getOwnPropertyNames(properties).length) {
346 packet.properties = properties
347 }
348 }
349
350 while (this._pos < packet.length) {
351 // Parse topic
352 topic = this._parseString()
353 if (topic === null) return this._emitError(new Error('Cannot parse topic'))
354 if (this._pos >= packet.length) return this._emitError(new Error('Malformed Subscribe Payload'))
355
356 options = this._parseByte()
357
358 if (this.settings.protocolVersion === 5) {
359 if (options & 0xc0) {
360 return this._emitError(new Error('Invalid subscribe topic flag bits, bits 7-6 must be 0'))
361 }
362 } else {
363 if (options & 0xfc) {
364 return this._emitError(new Error('Invalid subscribe topic flag bits, bits 7-2 must be 0'))
365 }
366 }
367
368 qos = options & constants.SUBSCRIBE_OPTIONS_QOS_MASK
369 if (qos > 2) {
370 return this._emitError(new Error('Invalid subscribe QoS, must be <= 2'))
371 }
372 nl = ((options >> constants.SUBSCRIBE_OPTIONS_NL_SHIFT) & constants.SUBSCRIBE_OPTIONS_NL_MASK) !== 0
373 rap = ((options >> constants.SUBSCRIBE_OPTIONS_RAP_SHIFT) & constants.SUBSCRIBE_OPTIONS_RAP_MASK) !== 0
374 rh = (options >> constants.SUBSCRIBE_OPTIONS_RH_SHIFT) & constants.SUBSCRIBE_OPTIONS_RH_MASK
375
376 if (rh > 2) {
377 return this._emitError(new Error('Invalid retain handling, must be <= 2'))
378 }
379
380 subscription = { topic, qos }
381
382 // mqtt 5 options
383 if (this.settings.protocolVersion === 5) {
384 subscription.nl = nl
385 subscription.rap = rap
386 subscription.rh = rh
387 } else if (this.settings.bridgeMode) {
388 subscription.rh = 0
389 subscription.rap = true
390 subscription.nl = true
391 }
392
393 // Push pair to subscriptions
394 debug('_parseSubscribe: push subscription `%s` to subscription', subscription)
395 packet.subscriptions.push(subscription)
396 }
397 }
398
399 _parseSuback () {
400 debug('_parseSuback')
401 const packet = this.packet
402 this.packet.granted = []
403
404 if (!this._parseMessageId()) { return }
405
406 // Properties mqtt 5
407 if (this.settings.protocolVersion === 5) {
408 const properties = this._parseProperties()
409 if (Object.getOwnPropertyNames(properties).length) {
410 packet.properties = properties
411 }
412 }
413
414 // Parse granted QoSes
415 while (this._pos < this.packet.length) {
416 const code = this._list.readUInt8(this._pos++)
417 if (this.settings.protocolVersion === 5) {
418 if (!constants.MQTT5_SUBACK_CODES[code]) {
419 return this._emitError(new Error('Invalid suback code'))
420 }
421 } else {
422 if (code > 2) {
423 return this._emitError(new Error('Invalid suback QoS, must be <= 2'))
424 }
425 }
426 this.packet.granted.push(code)
427 }
428 }
429
430 _parseUnsubscribe () {
431 debug('_parseUnsubscribe')
432 const packet = this.packet
433
434 packet.unsubscriptions = []
435
436 // Parse messageId
437 if (!this._parseMessageId()) { return }
438
439 // Properties mqtt 5
440 if (this.settings.protocolVersion === 5) {
441 const properties = this._parseProperties()
442 if (Object.getOwnPropertyNames(properties).length) {
443 packet.properties = properties
444 }
445 }
446
447 while (this._pos < packet.length) {
448 // Parse topic
449 const topic = this._parseString()
450 if (topic === null) return this._emitError(new Error('Cannot parse topic'))
451
452 // Push topic to unsubscriptions
453 debug('_parseUnsubscribe: push topic `%s` to unsubscriptions', topic)
454 packet.unsubscriptions.push(topic)
455 }
456 }
457
458 _parseUnsuback () {
459 debug('_parseUnsuback')
460 const packet = this.packet
461 if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId'))
462 // Properties mqtt 5
463 if (this.settings.protocolVersion === 5) {
464 const properties = this._parseProperties()
465 if (Object.getOwnPropertyNames(properties).length) {
466 packet.properties = properties
467 }
468 // Parse granted QoSes
469 packet.granted = []
470 while (this._pos < this.packet.length) {
471 const code = this._list.readUInt8(this._pos++)
472 if (!constants.MQTT5_UNSUBACK_CODES[code]) {
473 return this._emitError(new Error('Invalid unsuback code'))
474 }
475 this.packet.granted.push(code)
476 }
477 }
478 }
479
480 // parse packets like puback, pubrec, pubrel, pubcomp
481 _parseConfirmation () {
482 debug('_parseConfirmation: packet.cmd: `%s`', this.packet.cmd)
483 const packet = this.packet
484
485 this._parseMessageId()
486
487 if (this.settings.protocolVersion === 5) {
488 if (packet.length > 2) {
489 // response code
490 packet.reasonCode = this._parseByte()
491 switch (this.packet.cmd) {
492 case 'puback':
493 case 'pubrec':
494 if (!constants.MQTT5_PUBACK_PUBREC_CODES[packet.reasonCode]) {
495 return this._emitError(new Error('Invalid ' + this.packet.cmd + ' reason code'))
496 }
497 break
498 case 'pubrel':
499 case 'pubcomp':
500 if (!constants.MQTT5_PUBREL_PUBCOMP_CODES[packet.reasonCode]) {
501 return this._emitError(new Error('Invalid ' + this.packet.cmd + ' reason code'))
502 }
503 break
504 }
505 debug('_parseConfirmation: packet.reasonCode `%d`', packet.reasonCode)
506 } else {
507 packet.reasonCode = 0
508 }
509
510 if (packet.length > 3) {
511 // properies mqtt 5
512 const properties = this._parseProperties()
513 if (Object.getOwnPropertyNames(properties).length) {
514 packet.properties = properties
515 }
516 }
517 }
518
519 return true
520 }
521
522 // parse disconnect packet
523 _parseDisconnect () {
524 const packet = this.packet
525 debug('_parseDisconnect')
526
527 if (this.settings.protocolVersion === 5) {
528 // response code
529 if (this._list.length > 0) {
530 packet.reasonCode = this._parseByte()
531 if (!constants.MQTT5_DISCONNECT_CODES[packet.reasonCode]) {
532 this._emitError(new Error('Invalid disconnect reason code'))
533 }
534 } else {
535 packet.reasonCode = 0
536 }
537 // properies mqtt 5
538 const properties = this._parseProperties()
539 if (Object.getOwnPropertyNames(properties).length) {
540 packet.properties = properties
541 }
542 }
543
544 debug('_parseDisconnect result: true')
545 return true
546 }
547
548 // parse auth packet
549 _parseAuth () {
550 debug('_parseAuth')
551 const packet = this.packet
552
553 if (this.settings.protocolVersion !== 5) {
554 return this._emitError(new Error('Not supported auth packet for this version MQTT'))
555 }
556
557 // response code
558 packet.reasonCode = this._parseByte()
559 if (!constants.MQTT5_AUTH_CODES[packet.reasonCode]) {
560 return this._emitError(new Error('Invalid auth reason code'))
561 }
562 // properies mqtt 5
563 const properties = this._parseProperties()
564 if (Object.getOwnPropertyNames(properties).length) {
565 packet.properties = properties
566 }
567
568 debug('_parseAuth: result: true')
569 return true
570 }
571
572 _parseMessageId () {
573 const packet = this.packet
574
575 packet.messageId = this._parseNum()
576
577 if (packet.messageId === null) {
578 this._emitError(new Error('Cannot parse messageId'))
579 return false
580 }
581
582 debug('_parseMessageId: packet.messageId %d', packet.messageId)
583 return true
584 }
585
586 _parseString (maybeBuffer) {
587 const length = this._parseNum()
588 const end = length + this._pos
589
590 if (length === -1 || end > this._list.length || end > this.packet.length) return null
591
592 const result = this._list.toString('utf8', this._pos, end)
593 this._pos += length
594 debug('_parseString: result: %s', result)
595 return result
596 }
597
598 _parseStringPair () {
599 debug('_parseStringPair')
600 return {
601 name: this._parseString(),
602 value: this._parseString()
603 }
604 }
605
606 _parseBuffer () {
607 const length = this._parseNum()
608 const end = length + this._pos
609
610 if (length === -1 || end > this._list.length || end > this.packet.length) return null
611
612 const result = this._list.slice(this._pos, end)
613
614 this._pos += length
615 debug('_parseBuffer: result: %o', result)
616 return result
617 }
618
619 _parseNum () {
620 if (this._list.length - this._pos < 2) return -1
621
622 const result = this._list.readUInt16BE(this._pos)
623 this._pos += 2
624 debug('_parseNum: result: %s', result)
625 return result
626 }
627
628 _parse4ByteNum () {
629 if (this._list.length - this._pos < 4) return -1
630
631 const result = this._list.readUInt32BE(this._pos)
632 this._pos += 4
633 debug('_parse4ByteNum: result: %s', result)
634 return result
635 }
636
637 _parseVarByteNum (fullInfoFlag) {
638 debug('_parseVarByteNum')
639 const maxBytes = 4
640 let bytes = 0
641 let mul = 1
642 let value = 0
643 let result = false
644 let current
645 const padding = this._pos ? this._pos : 0
646
647 while (bytes < maxBytes && (padding + bytes) < this._list.length) {
648 current = this._list.readUInt8(padding + bytes++)
649 value += mul * (current & constants.VARBYTEINT_MASK)
650 mul *= 0x80
651
652 if ((current & constants.VARBYTEINT_FIN_MASK) === 0) {
653 result = true
654 break
655 }
656 if (this._list.length <= bytes) {
657 break
658 }
659 }
660
661 if (!result && bytes === maxBytes && this._list.length >= bytes) {
662 this._emitError(new Error('Invalid variable byte integer'))
663 }
664
665 if (padding) {
666 this._pos += bytes
667 }
668
669 if (result) {
670 if (fullInfoFlag) {
671 result = { bytes, value }
672 } else {
673 result = value
674 }
675 } else {
676 result = false
677 }
678
679 debug('_parseVarByteNum: result: %o', result)
680 return result
681 }
682
683 _parseByte () {
684 let result
685 if (this._pos < this._list.length) {
686 result = this._list.readUInt8(this._pos)
687 this._pos++
688 }
689 debug('_parseByte: result: %o', result)
690 return result
691 }
692
693 _parseByType (type) {
694 debug('_parseByType: type: %s', type)
695 switch (type) {
696 case 'byte': {
697 return this._parseByte() !== 0
698 }
699 case 'int8': {
700 return this._parseByte()
701 }
702 case 'int16': {
703 return this._parseNum()
704 }
705 case 'int32': {
706 return this._parse4ByteNum()
707 }
708 case 'var': {
709 return this._parseVarByteNum()
710 }
711 case 'string': {
712 return this._parseString()
713 }
714 case 'pair': {
715 return this._parseStringPair()
716 }
717 case 'binary': {
718 return this._parseBuffer()
719 }
720 }
721 }
722
723 _parseProperties () {
724 debug('_parseProperties')
725 const length = this._parseVarByteNum()
726 const start = this._pos
727 const end = start + length
728 const result = {}
729 while (this._pos < end) {
730 const type = this._parseByte()
731 if (!type) {
732 this._emitError(new Error('Cannot parse property code type'))
733 return false
734 }
735 const name = constants.propertiesCodes[type]
736 if (!name) {
737 this._emitError(new Error('Unknown property'))
738 return false
739 }
740 // user properties process
741 if (name === 'userProperties') {
742 if (!result[name]) {
743 result[name] = Object.create(null)
744 }
745 const currentUserProperty = this._parseByType(constants.propertiesTypes[name])
746 if (result[name][currentUserProperty.name]) {
747 if (Array.isArray(result[name][currentUserProperty.name])) {
748 result[name][currentUserProperty.name].push(currentUserProperty.value)
749 } else {
750 const currentValue = result[name][currentUserProperty.name]
751 result[name][currentUserProperty.name] = [currentValue]
752 result[name][currentUserProperty.name].push(currentUserProperty.value)
753 }
754 } else {
755 result[name][currentUserProperty.name] = currentUserProperty.value
756 }
757 continue
758 }
759 if (result[name]) {
760 if (Array.isArray(result[name])) {
761 result[name].push(this._parseByType(constants.propertiesTypes[name]))
762 } else {
763 result[name] = [result[name]]
764 result[name].push(this._parseByType(constants.propertiesTypes[name]))
765 }
766 } else {
767 result[name] = this._parseByType(constants.propertiesTypes[name])
768 }
769 }
770 return result
771 }
772
773 _newPacket () {
774 debug('_newPacket')
775 if (this.packet) {
776 this._list.consume(this.packet.length)
777 debug('_newPacket: parser emit packet: packet.cmd: %s, packet.payload: %s, packet.length: %d', this.packet.cmd, this.packet.payload, this.packet.length)
778 this.emit('packet', this.packet)
779 }
780 debug('_newPacket: new packet')
781 this.packet = new Packet()
782
783 this._pos = 0
784
785 return true
786 }
787
788 _emitError (err) {
789 debug('_emitError')
790 this.error = err
791 this.emit('error', err)
792 }
793}
794
795module.exports = Parser