UNPKG

1.74 kBJavaScriptView Raw
1'use strict'
2
3const stream = require('../lib/destroyable-stream')
4
5const FRAME_PREFIX_SIZE = 2 // uint16 is 2 bytes
6
7class AbstractDecoder extends stream.Transform {
8 constructor (messageType, options) {
9 super(Object.assign({
10 readableObjectMode: true,
11 writableObjectMode: false
12 }, options))
13
14 this._messageType = messageType
15
16 this._buffers = []
17 this._bufferedLength = 0
18 this._nextMessageLength = FRAME_PREFIX_SIZE
19 this._awaitFramePrefix = true
20 }
21
22 _transform (chunk, encoding, callback) {
23 // Join buffers if the concated buffer contains an object
24 if (this._bufferedLength > 0 &&
25 this._bufferedLength + chunk.length >= this._nextMessageLength) {
26 chunk = Buffer.concat(this._buffers.concat([chunk]))
27 this._buffers = []
28 this._bufferedLength = 0
29 }
30
31 // decode as long as there is an entire object
32 // This is implemented as a very basic state machine:
33 while (chunk.length >= this._nextMessageLength) {
34 switch (this._awaitFramePrefix) {
35 case true:
36 this._nextMessageLength = chunk.readUInt16BE(0)
37 chunk = chunk.slice(FRAME_PREFIX_SIZE)
38 this._awaitFramePrefix = false
39 break
40 case false:
41 this.push(
42 this._messageType.decode(chunk.slice(0, this._nextMessageLength))
43 )
44 chunk = chunk.slice(this._nextMessageLength)
45 this._nextMessageLength = FRAME_PREFIX_SIZE
46 this._awaitFramePrefix = true
47 break
48 }
49 }
50
51 // add remaining chunk if there is data left
52 if (chunk.length > 0) {
53 this._buffers.push(chunk)
54 this._bufferedLength += chunk.length
55 }
56
57 callback(null)
58 }
59}
60
61module.exports = AbstractDecoder