UNPKG

14.2 kBJavaScriptView Raw
1// A bit simpler than readable streams.
2// Implement an async ._write(chunk, encoding, cb), and it'll handle all
3// the drain event emission and buffering.
4
5
6import {inherits, deprecate} from 'util';
7import {Buffer} from 'buffer';
8Writable.WritableState = WritableState;
9import {EventEmitter} from 'events';
10import {Duplex} from './duplex';
11import {nextTick} from 'process';
12inherits(Writable, EventEmitter);
13
14function nop() {}
15
16function WriteReq(chunk, encoding, cb) {
17 this.chunk = chunk;
18 this.encoding = encoding;
19 this.callback = cb;
20 this.next = null;
21}
22
23function WritableState(options, stream) {
24 Object.defineProperty(this, 'buffer', {
25 get: deprecate(function () {
26 return this.getBuffer();
27 }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
28 });
29 options = options || {};
30
31 // object stream flag to indicate whether or not this stream
32 // contains buffers or objects.
33 this.objectMode = !!options.objectMode;
34
35 if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
36
37 // the point at which write() starts returning false
38 // Note: 0 is a valid value, means that we always return false if
39 // the entire buffer is not flushed immediately on write()
40 var hwm = options.highWaterMark;
41 var defaultHwm = this.objectMode ? 16 : 16 * 1024;
42 this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
43
44 // cast to ints.
45 this.highWaterMark = ~ ~this.highWaterMark;
46
47 this.needDrain = false;
48 // at the start of calling end()
49 this.ending = false;
50 // when end() has been called, and returned
51 this.ended = false;
52 // when 'finish' is emitted
53 this.finished = false;
54
55 // should we decode strings into buffers before passing to _write?
56 // this is here so that some node-core streams can optimize string
57 // handling at a lower level.
58 var noDecode = options.decodeStrings === false;
59 this.decodeStrings = !noDecode;
60
61 // Crypto is kind of old and crusty. Historically, its default string
62 // encoding is 'binary' so we have to make this configurable.
63 // Everything else in the universe uses 'utf8', though.
64 this.defaultEncoding = options.defaultEncoding || 'utf8';
65
66 // not an actual buffer we keep track of, but a measurement
67 // of how much we're waiting to get pushed to some underlying
68 // socket or file.
69 this.length = 0;
70
71 // a flag to see when we're in the middle of a write.
72 this.writing = false;
73
74 // when true all writes will be buffered until .uncork() call
75 this.corked = 0;
76
77 // a flag to be able to tell if the onwrite cb is called immediately,
78 // or on a later tick. We set this to true at first, because any
79 // actions that shouldn't happen until "later" should generally also
80 // not happen before the first write call.
81 this.sync = true;
82
83 // a flag to know if we're processing previously buffered items, which
84 // may call the _write() callback in the same tick, so that we don't
85 // end up in an overlapped onwrite situation.
86 this.bufferProcessing = false;
87
88 // the callback that's passed to _write(chunk,cb)
89 this.onwrite = function (er) {
90 onwrite(stream, er);
91 };
92
93 // the callback that the user supplies to write(chunk,encoding,cb)
94 this.writecb = null;
95
96 // the amount that is being written when _write is called.
97 this.writelen = 0;
98
99 this.bufferedRequest = null;
100 this.lastBufferedRequest = null;
101
102 // number of pending user-supplied write callbacks
103 // this must be 0 before 'finish' can be emitted
104 this.pendingcb = 0;
105
106 // emit prefinish if the only thing we're waiting for is _write cbs
107 // This is relevant for synchronous Transform streams
108 this.prefinished = false;
109
110 // True if the error was already emitted and should not be thrown again
111 this.errorEmitted = false;
112
113 // count buffered requests
114 this.bufferedRequestCount = 0;
115
116 // allocate the first CorkedRequest, there is always
117 // one allocated and free to use, and we maintain at most two
118 this.corkedRequestsFree = new CorkedRequest(this);
119}
120
121WritableState.prototype.getBuffer = function writableStateGetBuffer() {
122 var current = this.bufferedRequest;
123 var out = [];
124 while (current) {
125 out.push(current);
126 current = current.next;
127 }
128 return out;
129};
130
131export default Writable;
132export function Writable(options) {
133
134 // Writable ctor is applied to Duplexes, though they're not
135 // instanceof Writable, they're instanceof Readable.
136 if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
137
138 this._writableState = new WritableState(options, this);
139
140 // legacy.
141 this.writable = true;
142
143 if (options) {
144 if (typeof options.write === 'function') this._write = options.write;
145
146 if (typeof options.writev === 'function') this._writev = options.writev;
147 }
148
149 EventEmitter.call(this);
150}
151
152// Otherwise people can pipe Writable streams, which is just wrong.
153Writable.prototype.pipe = function () {
154 this.emit('error', new Error('Cannot pipe, not readable'));
155};
156
157function writeAfterEnd(stream, cb) {
158 var er = new Error('write after end');
159 // TODO: defer error events consistently everywhere, not just the cb
160 stream.emit('error', er);
161 nextTick(cb, er);
162}
163
164// If we get something that is not a buffer, string, null, or undefined,
165// and we're not in objectMode, then that's an error.
166// Otherwise stream chunks are all considered to be of length=1, and the
167// watermarks determine how many objects to keep in the buffer, rather than
168// how many bytes or characters.
169function validChunk(stream, state, chunk, cb) {
170 var valid = true;
171 var er = false;
172 // Always throw error if a null is written
173 // if we are not in object mode then throw
174 // if it is not a buffer, string, or undefined.
175 if (chunk === null) {
176 er = new TypeError('May not write null values to stream');
177 } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
178 er = new TypeError('Invalid non-string/buffer chunk');
179 }
180 if (er) {
181 stream.emit('error', er);
182 nextTick(cb, er);
183 valid = false;
184 }
185 return valid;
186}
187
188Writable.prototype.write = function (chunk, encoding, cb) {
189 var state = this._writableState;
190 var ret = false;
191
192 if (typeof encoding === 'function') {
193 cb = encoding;
194 encoding = null;
195 }
196
197 if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
198
199 if (typeof cb !== 'function') cb = nop;
200
201 if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
202 state.pendingcb++;
203 ret = writeOrBuffer(this, state, chunk, encoding, cb);
204 }
205
206 return ret;
207};
208
209Writable.prototype.cork = function () {
210 var state = this._writableState;
211
212 state.corked++;
213};
214
215Writable.prototype.uncork = function () {
216 var state = this._writableState;
217
218 if (state.corked) {
219 state.corked--;
220
221 if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
222 }
223};
224
225Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
226 // node::ParseEncoding() requires lower case.
227 if (typeof encoding === 'string') encoding = encoding.toLowerCase();
228 if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
229 this._writableState.defaultEncoding = encoding;
230 return this;
231};
232
233function decodeChunk(state, chunk, encoding) {
234 if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
235 chunk = Buffer.from(chunk, encoding);
236 }
237 return chunk;
238}
239
240// if we're already writing something, then just put this
241// in the queue, and wait our turn. Otherwise, call _write
242// If we return false, then we need a drain event, so set that flag.
243function writeOrBuffer(stream, state, chunk, encoding, cb) {
244 chunk = decodeChunk(state, chunk, encoding);
245
246 if (Buffer.isBuffer(chunk)) encoding = 'buffer';
247 var len = state.objectMode ? 1 : chunk.length;
248
249 state.length += len;
250
251 var ret = state.length < state.highWaterMark;
252 // we must ensure that previous needDrain will not be reset to false.
253 if (!ret) state.needDrain = true;
254
255 if (state.writing || state.corked) {
256 var last = state.lastBufferedRequest;
257 state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
258 if (last) {
259 last.next = state.lastBufferedRequest;
260 } else {
261 state.bufferedRequest = state.lastBufferedRequest;
262 }
263 state.bufferedRequestCount += 1;
264 } else {
265 doWrite(stream, state, false, len, chunk, encoding, cb);
266 }
267
268 return ret;
269}
270
271function doWrite(stream, state, writev, len, chunk, encoding, cb) {
272 state.writelen = len;
273 state.writecb = cb;
274 state.writing = true;
275 state.sync = true;
276 if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
277 state.sync = false;
278}
279
280function onwriteError(stream, state, sync, er, cb) {
281 --state.pendingcb;
282 if (sync) nextTick(cb, er);else cb(er);
283
284 stream._writableState.errorEmitted = true;
285 stream.emit('error', er);
286}
287
288function onwriteStateUpdate(state) {
289 state.writing = false;
290 state.writecb = null;
291 state.length -= state.writelen;
292 state.writelen = 0;
293}
294
295function onwrite(stream, er) {
296 var state = stream._writableState;
297 var sync = state.sync;
298 var cb = state.writecb;
299
300 onwriteStateUpdate(state);
301
302 if (er) onwriteError(stream, state, sync, er, cb);else {
303 // Check if we're actually ready to finish, but don't emit yet
304 var finished = needFinish(state);
305
306 if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
307 clearBuffer(stream, state);
308 }
309
310 if (sync) {
311 /*<replacement>*/
312 nextTick(afterWrite, stream, state, finished, cb);
313 /*</replacement>*/
314 } else {
315 afterWrite(stream, state, finished, cb);
316 }
317 }
318}
319
320function afterWrite(stream, state, finished, cb) {
321 if (!finished) onwriteDrain(stream, state);
322 state.pendingcb--;
323 cb();
324 finishMaybe(stream, state);
325}
326
327// Must force callback to be called on nextTick, so that we don't
328// emit 'drain' before the write() consumer gets the 'false' return
329// value, and has a chance to attach a 'drain' listener.
330function onwriteDrain(stream, state) {
331 if (state.length === 0 && state.needDrain) {
332 state.needDrain = false;
333 stream.emit('drain');
334 }
335}
336
337// if there's something in the buffer waiting, then process it
338function clearBuffer(stream, state) {
339 state.bufferProcessing = true;
340 var entry = state.bufferedRequest;
341
342 if (stream._writev && entry && entry.next) {
343 // Fast case, write everything using _writev()
344 var l = state.bufferedRequestCount;
345 var buffer = new Array(l);
346 var holder = state.corkedRequestsFree;
347 holder.entry = entry;
348
349 var count = 0;
350 while (entry) {
351 buffer[count] = entry;
352 entry = entry.next;
353 count += 1;
354 }
355
356 doWrite(stream, state, true, state.length, buffer, '', holder.finish);
357
358 // doWrite is almost always async, defer these to save a bit of time
359 // as the hot path ends with doWrite
360 state.pendingcb++;
361 state.lastBufferedRequest = null;
362 if (holder.next) {
363 state.corkedRequestsFree = holder.next;
364 holder.next = null;
365 } else {
366 state.corkedRequestsFree = new CorkedRequest(state);
367 }
368 } else {
369 // Slow case, write chunks one-by-one
370 while (entry) {
371 var chunk = entry.chunk;
372 var encoding = entry.encoding;
373 var cb = entry.callback;
374 var len = state.objectMode ? 1 : chunk.length;
375
376 doWrite(stream, state, false, len, chunk, encoding, cb);
377 entry = entry.next;
378 // if we didn't call the onwrite immediately, then
379 // it means that we need to wait until it does.
380 // also, that means that the chunk and cb are currently
381 // being processed, so move the buffer counter past them.
382 if (state.writing) {
383 break;
384 }
385 }
386
387 if (entry === null) state.lastBufferedRequest = null;
388 }
389
390 state.bufferedRequestCount = 0;
391 state.bufferedRequest = entry;
392 state.bufferProcessing = false;
393}
394
395Writable.prototype._write = function (chunk, encoding, cb) {
396 cb(new Error('not implemented'));
397};
398
399Writable.prototype._writev = null;
400
401Writable.prototype.end = function (chunk, encoding, cb) {
402 var state = this._writableState;
403
404 if (typeof chunk === 'function') {
405 cb = chunk;
406 chunk = null;
407 encoding = null;
408 } else if (typeof encoding === 'function') {
409 cb = encoding;
410 encoding = null;
411 }
412
413 if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
414
415 // .end() fully uncorks
416 if (state.corked) {
417 state.corked = 1;
418 this.uncork();
419 }
420
421 // ignore unnecessary end() calls.
422 if (!state.ending && !state.finished) endWritable(this, state, cb);
423};
424
425function needFinish(state) {
426 return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
427}
428
429function prefinish(stream, state) {
430 if (!state.prefinished) {
431 state.prefinished = true;
432 stream.emit('prefinish');
433 }
434}
435
436function finishMaybe(stream, state) {
437 var need = needFinish(state);
438 if (need) {
439 if (state.pendingcb === 0) {
440 prefinish(stream, state);
441 state.finished = true;
442 stream.emit('finish');
443 } else {
444 prefinish(stream, state);
445 }
446 }
447 return need;
448}
449
450function endWritable(stream, state, cb) {
451 state.ending = true;
452 finishMaybe(stream, state);
453 if (cb) {
454 if (state.finished) nextTick(cb);else stream.once('finish', cb);
455 }
456 state.ended = true;
457 stream.writable = false;
458}
459
460// It seems a linked list but it is not
461// there will be only 2 of these for each stream
462function CorkedRequest(state) {
463 var _this = this;
464
465 this.next = null;
466 this.entry = null;
467
468 this.finish = function (err) {
469 var entry = _this.entry;
470 _this.entry = null;
471 while (entry) {
472 var cb = entry.callback;
473 state.pendingcb--;
474 cb(err);
475 entry = entry.next;
476 }
477 if (state.corkedRequestsFree) {
478 state.corkedRequestsFree.next = _this;
479 } else {
480 state.corkedRequestsFree = _this;
481 }
482 };
483}