UNPKG

11.1 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = void 0;
7var _bl = _interopRequireDefault(require("bl"));
8var _stream = require("stream");
9var _message = _interopRequireDefault(require("./message"));
10var _packet = require("./packet");
11var _errors = require("./errors");
12function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
13/**
14 IncomingMessageStream
15 Transform received TDS data into individual IncomingMessage streams.
16*/
17class IncomingMessageStream extends _stream.Transform {
18 constructor(debug) {
19 super({
20 readableObjectMode: true
21 });
22 this.debug = debug;
23 this.currentMessage = undefined;
24 this.bl = new _bl.default();
25 }
26 pause() {
27 super.pause();
28 if (this.currentMessage) {
29 this.currentMessage.pause();
30 }
31 return this;
32 }
33 resume() {
34 super.resume();
35 if (this.currentMessage) {
36 this.currentMessage.resume();
37 }
38 return this;
39 }
40 processBufferedData(callback) {
41 // The packet header is always 8 bytes of length.
42 while (this.bl.length >= _packet.HEADER_LENGTH) {
43 // Get the full packet length
44 const length = this.bl.readUInt16BE(2);
45 if (length < _packet.HEADER_LENGTH) {
46 return callback(new _errors.ConnectionError('Unable to process incoming packet'));
47 }
48 if (this.bl.length >= length) {
49 const data = this.bl.slice(0, length);
50 this.bl.consume(length);
51
52 // TODO: Get rid of creating `Packet` instances here.
53 const packet = new _packet.Packet(data);
54 this.debug.packet('Received', packet);
55 this.debug.data(packet);
56 let message = this.currentMessage;
57 if (message === undefined) {
58 this.currentMessage = message = new _message.default({
59 type: packet.type(),
60 resetConnection: false
61 });
62 this.push(message);
63 }
64 if (packet.isLast()) {
65 // Wait until the current message was fully processed before we
66 // continue processing any remaining messages.
67 message.once('end', () => {
68 this.currentMessage = undefined;
69 this.processBufferedData(callback);
70 });
71 message.end(packet.data());
72 return;
73 } else if (!message.write(packet.data())) {
74 // If too much data is buffering up in the
75 // current message, wait for it to drain.
76 message.once('drain', () => {
77 this.processBufferedData(callback);
78 });
79 return;
80 }
81 } else {
82 break;
83 }
84 }
85
86 // Not enough data to read the next packet. Stop here and wait for
87 // the next call to `_transform`.
88 callback();
89 }
90 _transform(chunk, _encoding, callback) {
91 this.bl.append(chunk);
92 this.processBufferedData(callback);
93 }
94}
95var _default = exports.default = IncomingMessageStream;
96module.exports = IncomingMessageStream;
97//# sourceMappingURL=data:application/json;charset=utf-8;base64,
\No newline at end of file