UNPKG

4.25 kBJavaScriptView Raw
1"use strict";
2
3let util = require("util");
4let Stream = require("stream");
5
6let ChunkStream = (module.exports = function () {
7 Stream.call(this);
8
9 this._buffers = [];
10 this._buffered = 0;
11
12 this._reads = [];
13 this._paused = false;
14
15 this._encoding = "utf8";
16 this.writable = true;
17});
18util.inherits(ChunkStream, Stream);
19
20ChunkStream.prototype.read = function (length, callback) {
21 this._reads.push({
22 length: Math.abs(length), // if length < 0 then at most this length
23 allowLess: length < 0,
24 func: callback,
25 });
26
27 process.nextTick(
28 function () {
29 this._process();
30
31 // its paused and there is not enought data then ask for more
32 if (this._paused && this._reads && this._reads.length > 0) {
33 this._paused = false;
34
35 this.emit("drain");
36 }
37 }.bind(this)
38 );
39};
40
41ChunkStream.prototype.write = function (data, encoding) {
42 if (!this.writable) {
43 this.emit("error", new Error("Stream not writable"));
44 return false;
45 }
46
47 let dataBuffer;
48 if (Buffer.isBuffer(data)) {
49 dataBuffer = data;
50 } else {
51 dataBuffer = Buffer.from(data, encoding || this._encoding);
52 }
53
54 this._buffers.push(dataBuffer);
55 this._buffered += dataBuffer.length;
56
57 this._process();
58
59 // ok if there are no more read requests
60 if (this._reads && this._reads.length === 0) {
61 this._paused = true;
62 }
63
64 return this.writable && !this._paused;
65};
66
67ChunkStream.prototype.end = function (data, encoding) {
68 if (data) {
69 this.write(data, encoding);
70 }
71
72 this.writable = false;
73
74 // already destroyed
75 if (!this._buffers) {
76 return;
77 }
78
79 // enqueue or handle end
80 if (this._buffers.length === 0) {
81 this._end();
82 } else {
83 this._buffers.push(null);
84 this._process();
85 }
86};
87
88ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
89
90ChunkStream.prototype._end = function () {
91 if (this._reads.length > 0) {
92 this.emit("error", new Error("Unexpected end of input"));
93 }
94
95 this.destroy();
96};
97
98ChunkStream.prototype.destroy = function () {
99 if (!this._buffers) {
100 return;
101 }
102
103 this.writable = false;
104 this._reads = null;
105 this._buffers = null;
106
107 this.emit("close");
108};
109
110ChunkStream.prototype._processReadAllowingLess = function (read) {
111 // ok there is any data so that we can satisfy this request
112 this._reads.shift(); // == read
113
114 // first we need to peek into first buffer
115 let smallerBuf = this._buffers[0];
116
117 // ok there is more data than we need
118 if (smallerBuf.length > read.length) {
119 this._buffered -= read.length;
120 this._buffers[0] = smallerBuf.slice(read.length);
121
122 read.func.call(this, smallerBuf.slice(0, read.length));
123 } else {
124 // ok this is less than maximum length so use it all
125 this._buffered -= smallerBuf.length;
126 this._buffers.shift(); // == smallerBuf
127
128 read.func.call(this, smallerBuf);
129 }
130};
131
132ChunkStream.prototype._processRead = function (read) {
133 this._reads.shift(); // == read
134
135 let pos = 0;
136 let count = 0;
137 let data = Buffer.alloc(read.length);
138
139 // create buffer for all data
140 while (pos < read.length) {
141 let buf = this._buffers[count++];
142 let len = Math.min(buf.length, read.length - pos);
143
144 buf.copy(data, pos, 0, len);
145 pos += len;
146
147 // last buffer wasn't used all so just slice it and leave
148 if (len !== buf.length) {
149 this._buffers[--count] = buf.slice(len);
150 }
151 }
152
153 // remove all used buffers
154 if (count > 0) {
155 this._buffers.splice(0, count);
156 }
157
158 this._buffered -= read.length;
159
160 read.func.call(this, data);
161};
162
163ChunkStream.prototype._process = function () {
164 try {
165 // as long as there is any data and read requests
166 while (this._buffered > 0 && this._reads && this._reads.length > 0) {
167 let read = this._reads[0];
168
169 // read any data (but no more than length)
170 if (read.allowLess) {
171 this._processReadAllowingLess(read);
172 } else if (this._buffered >= read.length) {
173 // ok we can meet some expectations
174
175 this._processRead(read);
176 } else {
177 // not enought data to satisfy first request in queue
178 // so we need to wait for more
179 break;
180 }
181 }
182
183 if (this._buffers && !this.writable) {
184 this._end();
185 }
186 } catch (ex) {
187 this.emit("error", ex);
188 }
189};