1 | 'use strict'
|
2 |
|
3 | const stream = require('../lib/destroyable-stream')
|
4 |
|
5 | const FRAME_PREFIX_SIZE = 2
|
6 |
|
7 | class AbstractDecoder extends stream.Transform {
|
8 | constructor (messageType, options) {
|
9 | super(Object.assign({
|
10 | readableObjectMode: true,
|
11 | writableObjectMode: false
|
12 | }, options))
|
13 |
|
14 | this._messageType = messageType
|
15 |
|
16 | this._buffers = []
|
17 | this._bufferedLength = 0
|
18 | this._nextMessageLength = FRAME_PREFIX_SIZE
|
19 | this._awaitFramePrefix = true
|
20 | }
|
21 |
|
22 | _transform (chunk, encoding, callback) {
|
23 |
|
24 | if (this._bufferedLength > 0 &&
|
25 | this._bufferedLength + chunk.length >= this._nextMessageLength) {
|
26 | chunk = Buffer.concat(this._buffers.concat([chunk]))
|
27 | this._buffers = []
|
28 | this._bufferedLength = 0
|
29 | }
|
30 |
|
31 |
|
32 |
|
33 | while (chunk.length >= this._nextMessageLength) {
|
34 | switch (this._awaitFramePrefix) {
|
35 | case true:
|
36 | this._nextMessageLength = chunk.readUInt16BE(0)
|
37 | chunk = chunk.slice(FRAME_PREFIX_SIZE)
|
38 | this._awaitFramePrefix = false
|
39 | break
|
40 | case false:
|
41 | this.push(
|
42 | this._messageType.decode(chunk.slice(0, this._nextMessageLength))
|
43 | )
|
44 | chunk = chunk.slice(this._nextMessageLength)
|
45 | this._nextMessageLength = FRAME_PREFIX_SIZE
|
46 | this._awaitFramePrefix = true
|
47 | break
|
48 | }
|
49 | }
|
50 |
|
51 |
|
52 | if (chunk.length > 0) {
|
53 | this._buffers.push(chunk)
|
54 | this._bufferedLength += chunk.length
|
55 | }
|
56 |
|
57 | callback(null)
|
58 | }
|
59 | }
|
60 |
|
61 | module.exports = AbstractDecoder
|