UNPKG

4.18 kBJavaScriptView Raw
1var stream = require('stream'),
2 sys = require('sys'),
3 util = require('util');
4
5function MemoryStream(data, options) {
6
7 stream.Stream.call(this);
8 var self = this;
9
10 this.queue = [];
11
12 if(data){
13 if(!Array.isArray(data))
14 data = [data];
15
16 data.forEach(function(chunk){
17 if ( ! (chunk instanceof Buffer)) {
18 chunk = new Buffer(chunk);
19 }
20
21 self.queue.push(chunk);
22 });
23 }
24
25 this.paused = false;
26 this.reachmaxbuf = false;
27
28 options = options || {};
29
30 this.readableVal = options.hasOwnProperty('readable') ? options.readable : true;
31
32 this.__defineGetter__("readable", function(){
33 return self.readableVal;
34 });
35
36 this.__defineSetter__("readable", function(val){
37 self.readableVal = val;
38 if(val)
39 self._next();
40 });
41
42 this.writable = options.hasOwnProperty('writable') ? options.writable : true;
43 this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize : null;
44 this.bufoverflow = options.hasOwnProperty('bufoveflow') ? options.bufoveflow : null;
45
46 var self = this;
47 process.nextTick(function(){
48 self._next();
49 });
50}
51module.exports = MemoryStream;
52
53util.inherits(MemoryStream, stream.Stream);
54
55
56MemoryStream.prototype._next = function() {
57 var self = this;
58 function next(){
59 if( self.flush() && self.readable)
60 process.nextTick(next);
61 }
62 next();
63};
64
65MemoryStream.prototype.getAll = function() {
66 var self = this;
67 var ret = '';
68 this.queue.forEach(function(data){
69 if (self._decoder) {
70 var string = self._decoder.write(data);
71 if (string.length) ret += data;
72 } else {
73 ret+=data;
74 }
75 });
76 return ret;
77};
78
79MemoryStream.prototype.setEncoding = function(encoding) {
80 var StringDecoder = require('string_decoder').StringDecoder;
81 this._decoder = new StringDecoder(encoding);
82};
83
84MemoryStream.prototype.pipe = function(destination, options) {
85
86 var pump = sys.pump || util.pump;
87
88 pump(this, destination);
89};
90
91MemoryStream.prototype.pause = function() {
92
93 this.paused = true;
94};
95
96MemoryStream.prototype.resume = function() {
97
98 this.paused = false;
99
100 this._next();
101};
102
103MemoryStream.prototype.end = function(chunk, encoding) {
104
105 if (typeof chunk !== 'undefined') {
106
107 this.write(chunk, encoding);
108 }
109
110 this.writable = false;
111
112 if (this.queue.length === 0) {
113
114 this.readable = false;
115 }
116
117 this.emit('end');
118};
119
120MemoryStream.prototype._getQueueSize = function() {
121 var queuesize = 0;
122 for(var i = 0; i < this.queue.length; i++ ){
123 queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length : this.queue[i].length;
124 }
125 return queuesize;
126};
127
128MemoryStream.prototype.flush = function() {
129
130 if ( ! this.paused && this.readable && this.queue.length > 0) {
131 var data = this.queue.shift();
132 var cb;
133
134 if(Array.isArray(data)){
135 cb = data[1];
136 data = data[0];
137 }
138
139 if (this._decoder) {
140 var string = this._decoder.write(data);
141 if (string.length) this.emit('data', string);
142 } else {
143 this.emit('data', data);
144 }
145
146 if(cb) cb(null);
147
148 if(this.reachmaxbuf && this.maxbufsize >= this._getQueueSize()){
149 this.reachmaxbuf = false;
150 this.emit('drain');
151 }
152
153 return true;
154 }
155
156 if(!this.writable && !this.queue.length){
157 this.emit('end');
158 }
159
160 return false;
161};
162
163MemoryStream.prototype.write = function(chunk, encoding, callback) {
164
165 if ( ! this.writable) {
166
167 throw new Error('The memory stream is no longer writable.');
168 }
169
170 if (typeof encoding === 'function') {
171
172 callback = encoding;
173 encoding = undefined;
174 }
175
176 if ( ! (chunk instanceof Buffer)) {
177
178 chunk = new Buffer(chunk, encoding);
179 }
180
181 var queuesize = chunk.length;
182 if(this.maxbufsize || this.bufoverflow){
183 queuesize += this._getQueueSize();
184 if(this.bufoveflow && queuesize > this.bufoveflow){
185 this.emit('error',"Buffer overflowed (" + this.bufoverflow + "/"+ queuesize + ")");
186 return;
187 }
188 }
189
190 if(typeof callback === 'function'){
191 this.queue.push([chunk,callback]);
192 }else{
193 this.queue.push(chunk);
194 }
195
196 this._next();
197
198 if(this.maxbufsize && queuesize > this.maxbufsize){
199 this.reachmaxbuf = true;
200 return false;
201 }
202
203 return true;
204};
205
206MemoryStream.prototype.destroy = function() {
207
208 this.end();
209
210 this.queue = [];
211
212 this.readable = false;
213 this.writable = false;
214};