1 | var util = require('util')
|
2 | var Stream = require('stream')
|
3 | var StringDecoder = require('string_decoder').StringDecoder
|
4 |
|
5 | module.exports = StringStream
|
6 | module.exports.AlignedStringDecoder = AlignedStringDecoder
|
7 |
|
8 | function StringStream(from, to) {
|
9 | if (!(this instanceof StringStream)) return new StringStream(from, to)
|
10 |
|
11 | Stream.call(this)
|
12 |
|
13 | if (from == null) from = 'utf8'
|
14 |
|
15 | this.readable = this.writable = true
|
16 | this.paused = false
|
17 | this.toEncoding = (to == null ? from : to)
|
18 | this.fromEncoding = (to == null ? '' : from)
|
19 | this.decoder = new AlignedStringDecoder(this.toEncoding)
|
20 | }
|
21 | util.inherits(StringStream, Stream)
|
22 |
|
23 | StringStream.prototype.write = function(data) {
|
24 | if (!this.writable) {
|
25 | var err = new Error('stream not writable')
|
26 | err.code = 'EPIPE'
|
27 | this.emit('error', err)
|
28 | return false
|
29 | }
|
30 | if (this.fromEncoding) {
|
31 | if (Buffer.isBuffer(data)) data = data.toString()
|
32 | data = new Buffer(data, this.fromEncoding)
|
33 | }
|
34 | var string = this.decoder.write(data)
|
35 | if (string.length) this.emit('data', string)
|
36 | return !this.paused
|
37 | }
|
38 |
|
39 | StringStream.prototype.flush = function() {
|
40 | if (this.decoder.flush) {
|
41 | var string = this.decoder.flush()
|
42 | if (string.length) this.emit('data', string)
|
43 | }
|
44 | }
|
45 |
|
46 | StringStream.prototype.end = function() {
|
47 | if (!this.writable && !this.readable) return
|
48 | this.flush()
|
49 | this.emit('end')
|
50 | this.writable = this.readable = false
|
51 | this.destroy()
|
52 | }
|
53 |
|
54 | StringStream.prototype.destroy = function() {
|
55 | this.decoder = null
|
56 | this.writable = this.readable = false
|
57 | this.emit('close')
|
58 | }
|
59 |
|
60 | StringStream.prototype.pause = function() {
|
61 | this.paused = true
|
62 | }
|
63 |
|
64 | StringStream.prototype.resume = function () {
|
65 | if (this.paused) this.emit('drain')
|
66 | this.paused = false
|
67 | }
|
68 |
|
69 | function AlignedStringDecoder(encoding) {
|
70 | StringDecoder.call(this, encoding)
|
71 |
|
72 | switch (this.encoding) {
|
73 | case 'base64':
|
74 | this.write = alignedWrite
|
75 | this.alignedBuffer = new Buffer(3)
|
76 | this.alignedBytes = 0
|
77 | break
|
78 | }
|
79 | }
|
80 | util.inherits(AlignedStringDecoder, StringDecoder)
|
81 |
|
82 | AlignedStringDecoder.prototype.flush = function() {
|
83 | if (!this.alignedBuffer || !this.alignedBytes) return ''
|
84 | var leftover = this.alignedBuffer.toString(this.encoding, 0, this.alignedBytes)
|
85 | this.alignedBytes = 0
|
86 | return leftover
|
87 | }
|
88 |
|
89 | function alignedWrite(buffer) {
|
90 | var rem = (this.alignedBytes + buffer.length) % this.alignedBuffer.length
|
91 | if (!rem && !this.alignedBytes) return buffer.toString(this.encoding)
|
92 |
|
93 | var returnBuffer = new Buffer(this.alignedBytes + buffer.length - rem)
|
94 |
|
95 | this.alignedBuffer.copy(returnBuffer, 0, 0, this.alignedBytes)
|
96 | buffer.copy(returnBuffer, this.alignedBytes, 0, buffer.length - rem)
|
97 |
|
98 | buffer.copy(this.alignedBuffer, 0, buffer.length - rem, buffer.length)
|
99 | this.alignedBytes = rem
|
100 |
|
101 | return returnBuffer.toString(this.encoding)
|
102 | }
|