1 |
|
2 |
|
3 |
|
4 |
|
5 | module.exports = BlockStream
|
6 |
|
7 | var Stream = require("stream").Stream
|
8 | , inherits = require("inherits")
|
9 | , assert = require("assert").ok
|
10 | , debug = process.env.DEBUG ? console.error : function () {}
|
11 |
|
12 | function BlockStream (size, opt) {
|
13 | this.writable = this.readable = true
|
14 | this._opt = opt || {}
|
15 | this._chunkSize = size || 512
|
16 | this._offset = 0
|
17 | this._buffer = []
|
18 | this._bufferLength = 0
|
19 | if (this._opt.nopad) this._zeroes = false
|
20 | else {
|
21 | this._zeroes = new Buffer(this._chunkSize)
|
22 | for (var i = 0; i < this._chunkSize; i ++) {
|
23 | this._zeroes[i] = 0
|
24 | }
|
25 | }
|
26 | }
|
27 |
|
28 | inherits(BlockStream, Stream)
|
29 |
|
30 | BlockStream.prototype.write = function (c) {
|
31 |
|
32 | if (this._ended) throw new Error("BlockStream: write after end")
|
33 | if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
|
34 | if (c.length) {
|
35 | this._buffer.push(c)
|
36 | this._bufferLength += c.length
|
37 | }
|
38 |
|
39 | if (this._bufferLength >= this._chunkSize) {
|
40 | if (this._paused) {
|
41 |
|
42 | this._needDrain = true
|
43 | return false
|
44 | }
|
45 | this._emitChunk()
|
46 | }
|
47 | return true
|
48 | }
|
49 |
|
50 | BlockStream.prototype.pause = function () {
|
51 |
|
52 | this._paused = true
|
53 | }
|
54 |
|
55 | BlockStream.prototype.resume = function () {
|
56 |
|
57 | this._paused = false
|
58 | return this._emitChunk()
|
59 | }
|
60 |
|
61 | BlockStream.prototype.end = function (chunk) {
|
62 |
|
63 | if (typeof chunk === "function") cb = chunk, chunk = null
|
64 | if (chunk) this.write(chunk)
|
65 | this._ended = true
|
66 | this.flush()
|
67 | }
|
68 |
|
69 | BlockStream.prototype.flush = function () {
|
70 | this._emitChunk(true)
|
71 | }
|
72 |
|
73 | BlockStream.prototype._emitChunk = function (flush) {
|
74 |
|
75 |
|
76 |
|
77 | if (flush && this._zeroes) {
|
78 |
|
79 |
|
80 | var padBytes = (this._bufferLength % this._chunkSize)
|
81 | if (padBytes !== 0) padBytes = this._chunkSize - padBytes
|
82 | if (padBytes > 0) {
|
83 |
|
84 | this._buffer.push(this._zeroes.slice(0, padBytes))
|
85 | this._bufferLength += padBytes
|
86 |
|
87 | }
|
88 | }
|
89 |
|
90 | if (this._emitting || this._paused) return
|
91 | this._emitting = true
|
92 |
|
93 |
|
94 | var bufferIndex = 0
|
95 | while (this._bufferLength >= this._chunkSize &&
|
96 | (flush || !this._paused)) {
|
97 |
|
98 |
|
99 | var out
|
100 | , outOffset = 0
|
101 | , outHas = this._chunkSize
|
102 |
|
103 | while (outHas > 0 && (flush || !this._paused) ) {
|
104 |
|
105 | var cur = this._buffer[bufferIndex]
|
106 | , curHas = cur.length - this._offset
|
107 |
|
108 |
|
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 | if (out || curHas < outHas) {
|
115 | out = out || new Buffer(this._chunkSize)
|
116 | cur.copy(out, outOffset,
|
117 | this._offset, this._offset + Math.min(curHas, outHas))
|
118 | } else if (cur.length === outHas && this._offset === 0) {
|
119 |
|
120 | out = cur
|
121 | } else {
|
122 |
|
123 | out = cur.slice(this._offset, this._offset + outHas)
|
124 | }
|
125 |
|
126 | if (curHas > outHas) {
|
127 |
|
128 |
|
129 | this._offset += outHas
|
130 | outHas = 0
|
131 | } else {
|
132 |
|
133 |
|
134 | outHas -= curHas
|
135 | outOffset += curHas
|
136 | bufferIndex ++
|
137 | this._offset = 0
|
138 | }
|
139 | }
|
140 |
|
141 | this._bufferLength -= this._chunkSize
|
142 | assert(out.length === this._chunkSize)
|
143 |
|
144 |
|
145 | this.emit("data", out)
|
146 | out = null
|
147 | }
|
148 |
|
149 |
|
150 |
|
151 | this._buffer = this._buffer.slice(bufferIndex)
|
152 | if (this._paused) {
|
153 |
|
154 | this._needsDrain = true
|
155 | this._emitting = false
|
156 | return
|
157 | }
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 | var l = this._buffer.length
|
164 | if (flush && !this._zeroes && l) {
|
165 | if (l === 1) {
|
166 | if (this._offset) {
|
167 | this.emit("data", this._buffer[0].slice(this._offset))
|
168 | } else {
|
169 | this.emit("data", this._buffer[0])
|
170 | }
|
171 | } else {
|
172 | var outHas = this._bufferLength
|
173 | , out = new Buffer(outHas)
|
174 | , outOffset = 0
|
175 | for (var i = 0; i < l; i ++) {
|
176 | var cur = this._buffer[i]
|
177 | , curHas = cur.length - this._offset
|
178 | cur.copy(out, outOffset, this._offset)
|
179 | this._offset = 0
|
180 | outOffset += curHas
|
181 | this._bufferLength -= curHas
|
182 | }
|
183 | this.emit("data", out)
|
184 | }
|
185 |
|
186 | this._buffer.length = 0
|
187 | this._bufferLength = 0
|
188 | this._offset = 0
|
189 | }
|
190 |
|
191 |
|
192 |
|
193 |
|
194 | if (this._needDrain) {
|
195 |
|
196 | this._needDrain = false
|
197 | this.emit("drain")
|
198 | }
|
199 |
|
200 | if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
|
201 |
|
202 | this._endEmitted = true
|
203 | this.emit("end")
|
204 | }
|
205 |
|
206 | this._emitting = false
|
207 |
|
208 |
|
209 | }
|