UNPKG

30.9 kBJavaScriptView Raw
1const protocol = require('./constants')
2const empty = Buffer.allocUnsafe(0)
3const zeroBuf = Buffer.from([0])
4const numbers = require('./numbers')
5const nextTick = require('process-nextick-args').nextTick
6const debug = require('debug')('mqtt-packet:writeToStream')
7
8const numCache = numbers.cache
9const generateNumber = numbers.generateNumber
10const generateCache = numbers.generateCache
11const genBufVariableByteInt = numbers.genBufVariableByteInt
12const generate4ByteBuffer = numbers.generate4ByteBuffer
13let writeNumber = writeNumberCached
14let toGenerate = true
15
16function generate (packet, stream, opts) {
17 debug('generate called')
18 if (stream.cork) {
19 stream.cork()
20 nextTick(uncork, stream)
21 }
22
23 if (toGenerate) {
24 toGenerate = false
25 generateCache()
26 }
27 debug('generate: packet.cmd: %s', packet.cmd)
28 switch (packet.cmd) {
29 case 'connect':
30 return connect(packet, stream, opts)
31 case 'connack':
32 return connack(packet, stream, opts)
33 case 'publish':
34 return publish(packet, stream, opts)
35 case 'puback':
36 case 'pubrec':
37 case 'pubrel':
38 case 'pubcomp':
39 return confirmation(packet, stream, opts)
40 case 'subscribe':
41 return subscribe(packet, stream, opts)
42 case 'suback':
43 return suback(packet, stream, opts)
44 case 'unsubscribe':
45 return unsubscribe(packet, stream, opts)
46 case 'unsuback':
47 return unsuback(packet, stream, opts)
48 case 'pingreq':
49 case 'pingresp':
50 return emptyPacket(packet, stream, opts)
51 case 'disconnect':
52 return disconnect(packet, stream, opts)
53 case 'auth':
54 return auth(packet, stream, opts)
55 default:
56 stream.emit('error', new Error('Unknown command'))
57 return false
58 }
59}
60/**
61 * Controls numbers cache.
62 * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
63 */
64Object.defineProperty(generate, 'cacheNumbers', {
65 get () {
66 return writeNumber === writeNumberCached
67 },
68 set (value) {
69 if (value) {
70 if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
71 writeNumber = writeNumberCached
72 } else {
73 toGenerate = false
74 writeNumber = writeNumberGenerated
75 }
76 }
77})
78
79function uncork (stream) {
80 stream.uncork()
81}
82
83function connect (packet, stream, opts) {
84 const settings = packet || {}
85 const protocolId = settings.protocolId || 'MQTT'
86 let protocolVersion = settings.protocolVersion || 4
87 const will = settings.will
88 let clean = settings.clean
89 const keepalive = settings.keepalive || 0
90 const clientId = settings.clientId || ''
91 const username = settings.username
92 const password = settings.password
93 /* mqtt5 new oprions */
94 const properties = settings.properties
95
96 if (clean === undefined) clean = true
97
98 let length = 0
99
100 // Must be a string and non-falsy
101 if (!protocolId ||
102 (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
103 stream.emit('error', new Error('Invalid protocolId'))
104 return false
105 } else length += protocolId.length + 2
106
107 // Must be 3 or 4 or 5
108 if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
109 stream.emit('error', new Error('Invalid protocol version'))
110 return false
111 } else length += 1
112
113 // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1
114 if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
115 (clientId || protocolVersion >= 4) && (clientId || clean)) {
116 length += Buffer.byteLength(clientId) + 2
117 } else {
118 if (protocolVersion < 4) {
119 stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
120 return false
121 }
122 if ((clean * 1) === 0) {
123 stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
124 return false
125 }
126 }
127
128 // Must be a two byte number
129 if (typeof keepalive !== 'number' ||
130 keepalive < 0 ||
131 keepalive > 65535 ||
132 keepalive % 1 !== 0) {
133 stream.emit('error', new Error('Invalid keepalive'))
134 return false
135 } else length += 2
136
137 // Connect flags
138 length += 1
139
140 let propertiesData
141 let willProperties
142
143 // Properties
144 if (protocolVersion === 5) {
145 propertiesData = getProperties(stream, properties)
146 if (!propertiesData) { return false }
147 length += propertiesData.length
148 }
149
150 // If will exists...
151 if (will) {
152 // It must be an object
153 if (typeof will !== 'object') {
154 stream.emit('error', new Error('Invalid will'))
155 return false
156 }
157 // It must have topic typeof string
158 if (!will.topic || typeof will.topic !== 'string') {
159 stream.emit('error', new Error('Invalid will topic'))
160 return false
161 } else {
162 length += Buffer.byteLength(will.topic) + 2
163 }
164
165 // Payload
166 length += 2 // payload length
167 if (will.payload) {
168 if (will.payload.length >= 0) {
169 if (typeof will.payload === 'string') {
170 length += Buffer.byteLength(will.payload)
171 } else {
172 length += will.payload.length
173 }
174 } else {
175 stream.emit('error', new Error('Invalid will payload'))
176 return false
177 }
178 }
179 // will properties
180 willProperties = {}
181 if (protocolVersion === 5) {
182 willProperties = getProperties(stream, will.properties)
183 if (!willProperties) { return false }
184 length += willProperties.length
185 }
186 }
187
188 // Username
189 let providedUsername = false
190 if (username != null) {
191 if (isStringOrBuffer(username)) {
192 providedUsername = true
193 length += Buffer.byteLength(username) + 2
194 } else {
195 stream.emit('error', new Error('Invalid username'))
196 return false
197 }
198 }
199
200 // Password
201 if (password != null) {
202 if (!providedUsername) {
203 stream.emit('error', new Error('Username is required to use password'))
204 return false
205 }
206
207 if (isStringOrBuffer(password)) {
208 length += byteLength(password) + 2
209 } else {
210 stream.emit('error', new Error('Invalid password'))
211 return false
212 }
213 }
214
215 // Generate header
216 stream.write(protocol.CONNECT_HEADER)
217
218 // Generate length
219 writeVarByteInt(stream, length)
220
221 // Generate protocol ID
222 writeStringOrBuffer(stream, protocolId)
223
224 if (settings.bridgeMode) {
225 protocolVersion += 128
226 }
227
228 stream.write(
229 protocolVersion === 131
230 ? protocol.VERSION131
231 : protocolVersion === 132
232 ? protocol.VERSION132
233 : protocolVersion === 4
234 ? protocol.VERSION4
235 : protocolVersion === 5
236 ? protocol.VERSION5
237 : protocol.VERSION3
238 )
239
240 // Connect flags
241 let flags = 0
242 flags |= (username != null) ? protocol.USERNAME_MASK : 0
243 flags |= (password != null) ? protocol.PASSWORD_MASK : 0
244 flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
245 flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
246 flags |= will ? protocol.WILL_FLAG_MASK : 0
247 flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
248
249 stream.write(Buffer.from([flags]))
250
251 // Keepalive
252 writeNumber(stream, keepalive)
253
254 // Properties
255 if (protocolVersion === 5) {
256 propertiesData.write()
257 }
258
259 // Client ID
260 writeStringOrBuffer(stream, clientId)
261
262 // Will
263 if (will) {
264 if (protocolVersion === 5) {
265 willProperties.write()
266 }
267 writeString(stream, will.topic)
268 writeStringOrBuffer(stream, will.payload)
269 }
270
271 // Username and password
272 if (username != null) {
273 writeStringOrBuffer(stream, username)
274 }
275 if (password != null) {
276 writeStringOrBuffer(stream, password)
277 }
278 // This is a small packet that happens only once on a stream
279 // We assume the stream is always free to receive more data after this
280 return true
281}
282
283function connack (packet, stream, opts) {
284 const version = opts ? opts.protocolVersion : 4
285 const settings = packet || {}
286 const rc = version === 5 ? settings.reasonCode : settings.returnCode
287 const properties = settings.properties
288 let length = 2 // length of rc and sessionHeader
289
290 // Check return code
291 if (typeof rc !== 'number') {
292 stream.emit('error', new Error('Invalid return code'))
293 return false
294 }
295 // mqtt5 properties
296 let propertiesData = null
297 if (version === 5) {
298 propertiesData = getProperties(stream, properties)
299 if (!propertiesData) { return false }
300 length += propertiesData.length
301 }
302
303 stream.write(protocol.CONNACK_HEADER)
304 // length
305 writeVarByteInt(stream, length)
306 stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
307
308 stream.write(Buffer.from([rc]))
309 if (propertiesData != null) {
310 propertiesData.write()
311 }
312 return true
313}
314
315function publish (packet, stream, opts) {
316 debug('publish: packet: %o', packet)
317 const version = opts ? opts.protocolVersion : 4
318 const settings = packet || {}
319 const qos = settings.qos || 0
320 const retain = settings.retain ? protocol.RETAIN_MASK : 0
321 const topic = settings.topic
322 const payload = settings.payload || empty
323 const id = settings.messageId
324 const properties = settings.properties
325
326 let length = 0
327
328 // Topic must be a non-empty string or Buffer
329 if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
330 else if (Buffer.isBuffer(topic)) length += topic.length + 2
331 else {
332 stream.emit('error', new Error('Invalid topic'))
333 return false
334 }
335
336 // Get the payload length
337 if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
338 else length += payload.length
339
340 // Message ID must a number if qos > 0
341 if (qos && typeof id !== 'number') {
342 stream.emit('error', new Error('Invalid messageId'))
343 return false
344 } else if (qos) length += 2
345
346 // mqtt5 properties
347 let propertiesData = null
348 if (version === 5) {
349 propertiesData = getProperties(stream, properties)
350 if (!propertiesData) { return false }
351 length += propertiesData.length
352 }
353
354 // Header
355 stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0])
356
357 // Remaining length
358 writeVarByteInt(stream, length)
359
360 // Topic
361 writeNumber(stream, byteLength(topic))
362 stream.write(topic)
363
364 // Message ID
365 if (qos > 0) writeNumber(stream, id)
366
367 // Properties
368 if (propertiesData != null) {
369 propertiesData.write()
370 }
371
372 // Payload
373 debug('publish: payload: %o', payload)
374 return stream.write(payload)
375}
376
377/* Puback, pubrec, pubrel and pubcomp */
378function confirmation (packet, stream, opts) {
379 const version = opts ? opts.protocolVersion : 4
380 const settings = packet || {}
381 const type = settings.cmd || 'puback'
382 const id = settings.messageId
383 const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
384 let qos = 0
385 const reasonCode = settings.reasonCode
386 const properties = settings.properties
387 let length = version === 5 ? 3 : 2
388
389 if (type === 'pubrel') qos = 1
390
391 // Check message ID
392 if (typeof id !== 'number') {
393 stream.emit('error', new Error('Invalid messageId'))
394 return false
395 }
396
397 // properies mqtt 5
398 let propertiesData = null
399 if (version === 5) {
400 // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1)
401 if (typeof properties === 'object') {
402 propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
403 if (!propertiesData) { return false }
404 length += propertiesData.length
405 }
406 }
407
408 // Header
409 stream.write(protocol.ACKS[type][qos][dup][0])
410
411 // Length
412 writeVarByteInt(stream, length)
413
414 // Message ID
415 writeNumber(stream, id)
416
417 // reason code in header
418 if (version === 5) {
419 stream.write(Buffer.from([reasonCode]))
420 }
421
422 // properies mqtt 5
423 if (propertiesData !== null) {
424 propertiesData.write()
425 }
426 return true
427}
428
429function subscribe (packet, stream, opts) {
430 debug('subscribe: packet: ')
431 const version = opts ? opts.protocolVersion : 4
432 const settings = packet || {}
433 const dup = settings.dup ? protocol.DUP_MASK : 0
434 const id = settings.messageId
435 const subs = settings.subscriptions
436 const properties = settings.properties
437
438 let length = 0
439
440 // Check message ID
441 if (typeof id !== 'number') {
442 stream.emit('error', new Error('Invalid messageId'))
443 return false
444 } else length += 2
445
446 // properies mqtt 5
447 let propertiesData = null
448 if (version === 5) {
449 propertiesData = getProperties(stream, properties)
450 if (!propertiesData) { return false }
451 length += propertiesData.length
452 }
453
454 // Check subscriptions
455 if (typeof subs === 'object' && subs.length) {
456 for (let i = 0; i < subs.length; i += 1) {
457 const itopic = subs[i].topic
458 const iqos = subs[i].qos
459
460 if (typeof itopic !== 'string') {
461 stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
462 return false
463 }
464 if (typeof iqos !== 'number') {
465 stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
466 return false
467 }
468
469 if (version === 5) {
470 const nl = subs[i].nl || false
471 if (typeof nl !== 'boolean') {
472 stream.emit('error', new Error('Invalid subscriptions - invalid No Local'))
473 return false
474 }
475 const rap = subs[i].rap || false
476 if (typeof rap !== 'boolean') {
477 stream.emit('error', new Error('Invalid subscriptions - invalid Retain as Published'))
478 return false
479 }
480 const rh = subs[i].rh || 0
481 if (typeof rh !== 'number' || rh > 2) {
482 stream.emit('error', new Error('Invalid subscriptions - invalid Retain Handling'))
483 return false
484 }
485 }
486
487 length += Buffer.byteLength(itopic) + 2 + 1
488 }
489 } else {
490 stream.emit('error', new Error('Invalid subscriptions'))
491 return false
492 }
493
494 // Generate header
495 debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER)
496 stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
497
498 // Generate length
499 writeVarByteInt(stream, length)
500
501 // Generate message ID
502 writeNumber(stream, id)
503
504 // properies mqtt 5
505 if (propertiesData !== null) {
506 propertiesData.write()
507 }
508
509 let result = true
510
511 // Generate subs
512 for (const sub of subs) {
513 const jtopic = sub.topic
514 const jqos = sub.qos
515 const jnl = +sub.nl
516 const jrap = +sub.rap
517 const jrh = sub.rh
518 let joptions
519
520 // Write topic string
521 writeString(stream, jtopic)
522
523 // options process
524 joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos]
525 if (version === 5) {
526 joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0
527 joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0
528 joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0
529 }
530 // Write options
531 result = stream.write(Buffer.from([joptions]))
532 }
533
534 return result
535}
536
537function suback (packet, stream, opts) {
538 const version = opts ? opts.protocolVersion : 4
539 const settings = packet || {}
540 const id = settings.messageId
541 const granted = settings.granted
542 const properties = settings.properties
543 let length = 0
544
545 // Check message ID
546 if (typeof id !== 'number') {
547 stream.emit('error', new Error('Invalid messageId'))
548 return false
549 } else length += 2
550
551 // Check granted qos vector
552 if (typeof granted === 'object' && granted.length) {
553 for (let i = 0; i < granted.length; i += 1) {
554 if (typeof granted[i] !== 'number') {
555 stream.emit('error', new Error('Invalid qos vector'))
556 return false
557 }
558 length += 1
559 }
560 } else {
561 stream.emit('error', new Error('Invalid qos vector'))
562 return false
563 }
564
565 // properies mqtt 5
566 let propertiesData = null
567 if (version === 5) {
568 propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
569 if (!propertiesData) { return false }
570 length += propertiesData.length
571 }
572
573 // header
574 stream.write(protocol.SUBACK_HEADER)
575
576 // Length
577 writeVarByteInt(stream, length)
578
579 // Message ID
580 writeNumber(stream, id)
581
582 // properies mqtt 5
583 if (propertiesData !== null) {
584 propertiesData.write()
585 }
586
587 return stream.write(Buffer.from(granted))
588}
589
590function unsubscribe (packet, stream, opts) {
591 const version = opts ? opts.protocolVersion : 4
592 const settings = packet || {}
593 const id = settings.messageId
594 const dup = settings.dup ? protocol.DUP_MASK : 0
595 const unsubs = settings.unsubscriptions
596 const properties = settings.properties
597
598 let length = 0
599
600 // Check message ID
601 if (typeof id !== 'number') {
602 stream.emit('error', new Error('Invalid messageId'))
603 return false
604 } else {
605 length += 2
606 }
607 // Check unsubs
608 if (typeof unsubs === 'object' && unsubs.length) {
609 for (let i = 0; i < unsubs.length; i += 1) {
610 if (typeof unsubs[i] !== 'string') {
611 stream.emit('error', new Error('Invalid unsubscriptions'))
612 return false
613 }
614 length += Buffer.byteLength(unsubs[i]) + 2
615 }
616 } else {
617 stream.emit('error', new Error('Invalid unsubscriptions'))
618 return false
619 }
620 // properies mqtt 5
621 let propertiesData = null
622 if (version === 5) {
623 propertiesData = getProperties(stream, properties)
624 if (!propertiesData) { return false }
625 length += propertiesData.length
626 }
627
628 // Header
629 stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
630
631 // Length
632 writeVarByteInt(stream, length)
633
634 // Message ID
635 writeNumber(stream, id)
636
637 // properies mqtt 5
638 if (propertiesData !== null) {
639 propertiesData.write()
640 }
641
642 // Unsubs
643 let result = true
644 for (let j = 0; j < unsubs.length; j++) {
645 result = writeString(stream, unsubs[j])
646 }
647
648 return result
649}
650
651function unsuback (packet, stream, opts) {
652 const version = opts ? opts.protocolVersion : 4
653 const settings = packet || {}
654 const id = settings.messageId
655 const dup = settings.dup ? protocol.DUP_MASK : 0
656 const granted = settings.granted
657 const properties = settings.properties
658 const type = settings.cmd
659 const qos = 0
660
661 let length = 2
662
663 // Check message ID
664 if (typeof id !== 'number') {
665 stream.emit('error', new Error('Invalid messageId'))
666 return false
667 }
668
669 // Check granted
670 if (version === 5) {
671 if (typeof granted === 'object' && granted.length) {
672 for (let i = 0; i < granted.length; i += 1) {
673 if (typeof granted[i] !== 'number') {
674 stream.emit('error', new Error('Invalid qos vector'))
675 return false
676 }
677 length += 1
678 }
679 } else {
680 stream.emit('error', new Error('Invalid qos vector'))
681 return false
682 }
683 }
684
685 // properies mqtt 5
686 let propertiesData = null
687 if (version === 5) {
688 propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
689 if (!propertiesData) { return false }
690 length += propertiesData.length
691 }
692
693 // Header
694 stream.write(protocol.ACKS[type][qos][dup][0])
695
696 // Length
697 writeVarByteInt(stream, length)
698
699 // Message ID
700 writeNumber(stream, id)
701
702 // properies mqtt 5
703 if (propertiesData !== null) {
704 propertiesData.write()
705 }
706
707 // payload
708 if (version === 5) {
709 stream.write(Buffer.from(granted))
710 }
711 return true
712}
713
714function emptyPacket (packet, stream, opts) {
715 return stream.write(protocol.EMPTY[packet.cmd])
716}
717
718function disconnect (packet, stream, opts) {
719 const version = opts ? opts.protocolVersion : 4
720 const settings = packet || {}
721 const reasonCode = settings.reasonCode
722 const properties = settings.properties
723 let length = version === 5 ? 1 : 0
724
725 // properies mqtt 5
726 let propertiesData = null
727 if (version === 5) {
728 propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
729 if (!propertiesData) { return false }
730 length += propertiesData.length
731 }
732
733 // Header
734 stream.write(Buffer.from([protocol.codes.disconnect << 4]))
735
736 // Length
737 writeVarByteInt(stream, length)
738
739 // reason code in header
740 if (version === 5) {
741 stream.write(Buffer.from([reasonCode]))
742 }
743
744 // properies mqtt 5
745 if (propertiesData !== null) {
746 propertiesData.write()
747 }
748
749 return true
750}
751
752function auth (packet, stream, opts) {
753 const version = opts ? opts.protocolVersion : 4
754 const settings = packet || {}
755 const reasonCode = settings.reasonCode
756 const properties = settings.properties
757 let length = version === 5 ? 1 : 0
758
759 if (version !== 5) stream.emit('error', new Error('Invalid mqtt version for auth packet'))
760
761 // properies mqtt 5
762 const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
763 if (!propertiesData) { return false }
764 length += propertiesData.length
765
766 // Header
767 stream.write(Buffer.from([protocol.codes.auth << 4]))
768
769 // Length
770 writeVarByteInt(stream, length)
771
772 // reason code in header
773 stream.write(Buffer.from([reasonCode]))
774
775 // properies mqtt 5
776 if (propertiesData !== null) {
777 propertiesData.write()
778 }
779 return true
780}
781
782/**
783 * writeVarByteInt - write an MQTT style variable byte integer to the buffer
784 *
785 * @param <Buffer> buffer - destination
786 * @param <Number> pos - offset
787 * @param <Number> length - length (>0)
788 * @returns <Number> number of bytes written
789 *
790 * @api private
791 */
792
793const varByteIntCache = {}
794function writeVarByteInt (stream, num) {
795 if (num > protocol.VARBYTEINT_MAX) {
796 stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
797 return false
798 }
799
800 let buffer = varByteIntCache[num]
801
802 if (!buffer) {
803 buffer = genBufVariableByteInt(num)
804 if (num < 16384) varByteIntCache[num] = buffer
805 }
806 debug('writeVarByteInt: writing to stream: %o', buffer)
807 return stream.write(buffer)
808}
809
810/**
811 * writeString - write a utf8 string to the buffer
812 *
813 * @param <Buffer> buffer - destination
814 * @param <Number> pos - offset
815 * @param <String> string - string to write
816 * @return <Number> number of bytes written
817 *
818 * @api private
819 */
820
821function writeString (stream, string) {
822 const strlen = Buffer.byteLength(string)
823 writeNumber(stream, strlen)
824
825 debug('writeString: %s', string)
826 return stream.write(string, 'utf8')
827}
828
829/**
830 * writeStringPair - write a utf8 string pairs to the buffer
831 *
832 * @param <Buffer> buffer - destination
833 * @param <String> name - string name to write
834 * @param <String> value - string value to write
835 * @return <Number> number of bytes written
836 *
837 * @api private
838 */
839function writeStringPair (stream, name, value) {
840 writeString(stream, name)
841 writeString(stream, value)
842}
843
844/**
845 * writeNumber - write a two byte number to the buffer
846 *
847 * @param <Buffer> buffer - destination
848 * @param <Number> pos - offset
849 * @param <String> number - number to write
850 * @return <Number> number of bytes written
851 *
852 * @api private
853 */
854function writeNumberCached (stream, number) {
855 debug('writeNumberCached: number: %d', number)
856 debug('writeNumberCached: %o', numCache[number])
857 return stream.write(numCache[number])
858}
859function writeNumberGenerated (stream, number) {
860 const generatedNumber = generateNumber(number)
861 debug('writeNumberGenerated: %o', generatedNumber)
862 return stream.write(generatedNumber)
863}
864function write4ByteNumber (stream, number) {
865 const generated4ByteBuffer = generate4ByteBuffer(number)
866 debug('write4ByteNumber: %o', generated4ByteBuffer)
867 return stream.write(generated4ByteBuffer)
868}
869/**
870 * writeStringOrBuffer - write a String or Buffer with the its length prefix
871 *
872 * @param <Buffer> buffer - destination
873 * @param <Number> pos - offset
874 * @param <String> toWrite - String or Buffer
875 * @return <Number> number of bytes written
876 */
877function writeStringOrBuffer (stream, toWrite) {
878 if (typeof toWrite === 'string') {
879 writeString(stream, toWrite)
880 } else if (toWrite) {
881 writeNumber(stream, toWrite.length)
882 stream.write(toWrite)
883 } else writeNumber(stream, 0)
884}
885
886function getProperties (stream, properties) {
887 /* connect properties */
888 if (typeof properties !== 'object' || properties.length != null) {
889 return {
890 length: 1,
891 write () {
892 writeProperties(stream, {}, 0)
893 }
894 }
895 }
896 let propertiesLength = 0
897 function getLengthProperty (name, value) {
898 const type = protocol.propertiesTypes[name]
899 let length = 0
900 switch (type) {
901 case 'byte': {
902 if (typeof value !== 'boolean') {
903 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
904 return false
905 }
906 length += 1 + 1
907 break
908 }
909 case 'int8': {
910 if (typeof value !== 'number' || value < 0 || value > 0xff) {
911 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
912 return false
913 }
914 length += 1 + 1
915 break
916 }
917 case 'binary': {
918 if (value && value === null) {
919 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
920 return false
921 }
922 length += 1 + Buffer.byteLength(value) + 2
923 break
924 }
925 case 'int16': {
926 if (typeof value !== 'number' || value < 0 || value > 0xffff) {
927 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
928 return false
929 }
930 length += 1 + 2
931 break
932 }
933 case 'int32': {
934 if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
935 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
936 return false
937 }
938 length += 1 + 4
939 break
940 }
941 case 'var': {
942 // var byte integer is max 24 bits packed in 32 bits
943 if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
944 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
945 return false
946 }
947 length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
948 break
949 }
950 case 'string': {
951 if (typeof value !== 'string') {
952 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
953 return false
954 }
955 length += 1 + 2 + Buffer.byteLength(value.toString())
956 break
957 }
958 case 'pair': {
959 if (typeof value !== 'object') {
960 stream.emit('error', new Error(`Invalid ${name}: ${value}`))
961 return false
962 }
963 length += Object.getOwnPropertyNames(value).reduce((result, name) => {
964 const currentValue = value[name]
965 if (Array.isArray(currentValue)) {
966 result += currentValue.reduce((currentLength, value) => {
967 currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString())
968 return currentLength
969 }, 0)
970 } else {
971 result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString())
972 }
973 return result
974 }, 0)
975 break
976 }
977 default: {
978 stream.emit('error', new Error(`Invalid property ${name}: ${value}`))
979 return false
980 }
981 }
982 return length
983 }
984 if (properties) {
985 for (const propName in properties) {
986 let propLength = 0
987 let propValueLength = 0
988 const propValue = properties[propName]
989 if (Array.isArray(propValue)) {
990 for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) {
991 propValueLength = getLengthProperty(propName, propValue[valueIndex])
992 if (!propValueLength) { return false }
993 propLength += propValueLength
994 }
995 } else {
996 propValueLength = getLengthProperty(propName, propValue)
997 if (!propValueLength) { return false }
998 propLength = propValueLength
999 }
1000 if (!propLength) return false
1001 propertiesLength += propLength
1002 }
1003 }
1004 const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength))
1005
1006 return {
1007 length: propertiesLengthLength + propertiesLength,
1008 write () {
1009 writeProperties(stream, properties, propertiesLength)
1010 }
1011 }
1012}
1013
1014function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {
1015 const mayEmptyProps = ['reasonString', 'userProperties']
1016 const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0
1017
1018 let propertiesData = getProperties(stream, properties)
1019 if (maximumPacketSize) {
1020 while (length + propertiesData.length > maximumPacketSize) {
1021 const currentMayEmptyProp = mayEmptyProps.shift()
1022 if (currentMayEmptyProp && properties[currentMayEmptyProp]) {
1023 delete properties[currentMayEmptyProp]
1024 propertiesData = getProperties(stream, properties)
1025 } else {
1026 return false
1027 }
1028 }
1029 }
1030 return propertiesData
1031}
1032
1033function writeProperty (stream, propName, value) {
1034 const type = protocol.propertiesTypes[propName]
1035 switch (type) {
1036 case 'byte': {
1037 stream.write(Buffer.from([protocol.properties[propName]]))
1038 stream.write(Buffer.from([+value]))
1039 break
1040 }
1041 case 'int8': {
1042 stream.write(Buffer.from([protocol.properties[propName]]))
1043 stream.write(Buffer.from([value]))
1044 break
1045 }
1046 case 'binary': {
1047 stream.write(Buffer.from([protocol.properties[propName]]))
1048 writeStringOrBuffer(stream, value)
1049 break
1050 }
1051 case 'int16': {
1052 stream.write(Buffer.from([protocol.properties[propName]]))
1053 writeNumber(stream, value)
1054 break
1055 }
1056 case 'int32': {
1057 stream.write(Buffer.from([protocol.properties[propName]]))
1058 write4ByteNumber(stream, value)
1059 break
1060 }
1061 case 'var': {
1062 stream.write(Buffer.from([protocol.properties[propName]]))
1063 writeVarByteInt(stream, value)
1064 break
1065 }
1066 case 'string': {
1067 stream.write(Buffer.from([protocol.properties[propName]]))
1068 writeString(stream, value)
1069 break
1070 }
1071 case 'pair': {
1072 Object.getOwnPropertyNames(value).forEach(name => {
1073 const currentValue = value[name]
1074 if (Array.isArray(currentValue)) {
1075 currentValue.forEach(value => {
1076 stream.write(Buffer.from([protocol.properties[propName]]))
1077 writeStringPair(stream, name.toString(), value.toString())
1078 })
1079 } else {
1080 stream.write(Buffer.from([protocol.properties[propName]]))
1081 writeStringPair(stream, name.toString(), currentValue.toString())
1082 }
1083 })
1084 break
1085 }
1086 default: {
1087 stream.emit('error', new Error(`Invalid property ${propName} value: ${value}`))
1088 return false
1089 }
1090 }
1091}
1092
1093function writeProperties (stream, properties, propertiesLength) {
1094 /* write properties to stream */
1095 writeVarByteInt(stream, propertiesLength)
1096 for (const propName in properties) {
1097 if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] !== null) {
1098 const value = properties[propName]
1099 if (Array.isArray(value)) {
1100 for (let valueIndex = 0; valueIndex < value.length; valueIndex++) {
1101 writeProperty(stream, propName, value[valueIndex])
1102 }
1103 } else {
1104 writeProperty(stream, propName, value)
1105 }
1106 }
1107 }
1108}
1109
1110function byteLength (bufOrString) {
1111 if (!bufOrString) return 0
1112 else if (bufOrString instanceof Buffer) return bufOrString.length
1113 else return Buffer.byteLength(bufOrString)
1114}
1115
1116function isStringOrBuffer (field) {
1117 return typeof field === 'string' || field instanceof Buffer
1118}
1119
1120module.exports = generate