1 | "use strict";
|
2 |
|
3 | let util = require("util");
|
4 | let Stream = require("stream");
|
5 |
|
6 | let ChunkStream = (module.exports = function () {
|
7 | Stream.call(this);
|
8 |
|
9 | this._buffers = [];
|
10 | this._buffered = 0;
|
11 |
|
12 | this._reads = [];
|
13 | this._paused = false;
|
14 |
|
15 | this._encoding = "utf8";
|
16 | this.writable = true;
|
17 | });
|
18 | util.inherits(ChunkStream, Stream);
|
19 |
|
20 | ChunkStream.prototype.read = function (length, callback) {
|
21 | this._reads.push({
|
22 | length: Math.abs(length),
|
23 | allowLess: length < 0,
|
24 | func: callback,
|
25 | });
|
26 |
|
27 | process.nextTick(
|
28 | function () {
|
29 | this._process();
|
30 |
|
31 |
|
32 | if (this._paused && this._reads && this._reads.length > 0) {
|
33 | this._paused = false;
|
34 |
|
35 | this.emit("drain");
|
36 | }
|
37 | }.bind(this)
|
38 | );
|
39 | };
|
40 |
|
41 | ChunkStream.prototype.write = function (data, encoding) {
|
42 | if (!this.writable) {
|
43 | this.emit("error", new Error("Stream not writable"));
|
44 | return false;
|
45 | }
|
46 |
|
47 | let dataBuffer;
|
48 | if (Buffer.isBuffer(data)) {
|
49 | dataBuffer = data;
|
50 | } else {
|
51 | dataBuffer = Buffer.from(data, encoding || this._encoding);
|
52 | }
|
53 |
|
54 | this._buffers.push(dataBuffer);
|
55 | this._buffered += dataBuffer.length;
|
56 |
|
57 | this._process();
|
58 |
|
59 |
|
60 | if (this._reads && this._reads.length === 0) {
|
61 | this._paused = true;
|
62 | }
|
63 |
|
64 | return this.writable && !this._paused;
|
65 | };
|
66 |
|
67 | ChunkStream.prototype.end = function (data, encoding) {
|
68 | if (data) {
|
69 | this.write(data, encoding);
|
70 | }
|
71 |
|
72 | this.writable = false;
|
73 |
|
74 |
|
75 | if (!this._buffers) {
|
76 | return;
|
77 | }
|
78 |
|
79 |
|
80 | if (this._buffers.length === 0) {
|
81 | this._end();
|
82 | } else {
|
83 | this._buffers.push(null);
|
84 | this._process();
|
85 | }
|
86 | };
|
87 |
|
88 | ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
|
89 |
|
90 | ChunkStream.prototype._end = function () {
|
91 | if (this._reads.length > 0) {
|
92 | this.emit("error", new Error("Unexpected end of input"));
|
93 | }
|
94 |
|
95 | this.destroy();
|
96 | };
|
97 |
|
98 | ChunkStream.prototype.destroy = function () {
|
99 | if (!this._buffers) {
|
100 | return;
|
101 | }
|
102 |
|
103 | this.writable = false;
|
104 | this._reads = null;
|
105 | this._buffers = null;
|
106 |
|
107 | this.emit("close");
|
108 | };
|
109 |
|
110 | ChunkStream.prototype._processReadAllowingLess = function (read) {
|
111 |
|
112 | this._reads.shift();
|
113 |
|
114 |
|
115 | let smallerBuf = this._buffers[0];
|
116 |
|
117 |
|
118 | if (smallerBuf.length > read.length) {
|
119 | this._buffered -= read.length;
|
120 | this._buffers[0] = smallerBuf.slice(read.length);
|
121 |
|
122 | read.func.call(this, smallerBuf.slice(0, read.length));
|
123 | } else {
|
124 |
|
125 | this._buffered -= smallerBuf.length;
|
126 | this._buffers.shift();
|
127 |
|
128 | read.func.call(this, smallerBuf);
|
129 | }
|
130 | };
|
131 |
|
132 | ChunkStream.prototype._processRead = function (read) {
|
133 | this._reads.shift();
|
134 |
|
135 | let pos = 0;
|
136 | let count = 0;
|
137 | let data = Buffer.alloc(read.length);
|
138 |
|
139 |
|
140 | while (pos < read.length) {
|
141 | let buf = this._buffers[count++];
|
142 | let len = Math.min(buf.length, read.length - pos);
|
143 |
|
144 | buf.copy(data, pos, 0, len);
|
145 | pos += len;
|
146 |
|
147 |
|
148 | if (len !== buf.length) {
|
149 | this._buffers[--count] = buf.slice(len);
|
150 | }
|
151 | }
|
152 |
|
153 |
|
154 | if (count > 0) {
|
155 | this._buffers.splice(0, count);
|
156 | }
|
157 |
|
158 | this._buffered -= read.length;
|
159 |
|
160 | read.func.call(this, data);
|
161 | };
|
162 |
|
163 | ChunkStream.prototype._process = function () {
|
164 | try {
|
165 |
|
166 | while (this._buffered > 0 && this._reads && this._reads.length > 0) {
|
167 | let read = this._reads[0];
|
168 |
|
169 |
|
170 | if (read.allowLess) {
|
171 | this._processReadAllowingLess(read);
|
172 | } else if (this._buffered >= read.length) {
|
173 |
|
174 |
|
175 | this._processRead(read);
|
176 | } else {
|
177 |
|
178 |
|
179 | break;
|
180 | }
|
181 | }
|
182 |
|
183 | if (this._buffers && !this.writable) {
|
184 | this._end();
|
185 | }
|
186 | } catch (ex) {
|
187 | this.emit("error", ex);
|
188 | }
|
189 | };
|