1 | var stream = require('stream'),
|
2 | sys = require('sys'),
|
3 | util = require('util');
|
4 |
|
5 | function MemoryStream(data, options) {
|
6 |
|
7 | stream.Stream.call(this);
|
8 | var self = this;
|
9 |
|
10 | this.queue = [];
|
11 |
|
12 | if(data){
|
13 | if(!Array.isArray(data))
|
14 | data = [data];
|
15 |
|
16 | data.forEach(function(chunk){
|
17 | if ( ! (chunk instanceof Buffer)) {
|
18 | chunk = new Buffer(chunk);
|
19 | }
|
20 |
|
21 | self.queue.push(chunk);
|
22 | });
|
23 | }
|
24 |
|
25 | this.paused = false;
|
26 | this.reachmaxbuf = false;
|
27 |
|
28 | options = options || {};
|
29 |
|
30 | this.readableVal = options.hasOwnProperty('readable') ? options.readable : true;
|
31 |
|
32 | this.__defineGetter__("readable", function(){
|
33 | return self.readableVal;
|
34 | });
|
35 |
|
36 | this.__defineSetter__("readable", function(val){
|
37 | self.readableVal = val;
|
38 | if(val)
|
39 | self._next();
|
40 | });
|
41 |
|
42 | this.writable = options.hasOwnProperty('writable') ? options.writable : true;
|
43 | this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize : null;
|
44 | this.bufoverflow = options.hasOwnProperty('bufoveflow') ? options.bufoveflow : null;
|
45 |
|
46 | var self = this;
|
47 | process.nextTick(function(){
|
48 | self._next();
|
49 | });
|
50 | }
|
51 | module.exports = MemoryStream;
|
52 |
|
53 | util.inherits(MemoryStream, stream.Stream);
|
54 |
|
55 |
|
56 | MemoryStream.prototype._next = function() {
|
57 | var self = this;
|
58 | function next(){
|
59 | if( self.flush() && self.readable)
|
60 | process.nextTick(next);
|
61 | }
|
62 | next();
|
63 | };
|
64 |
|
65 | MemoryStream.prototype.getAll = function() {
|
66 | var self = this;
|
67 | var ret = '';
|
68 | this.queue.forEach(function(data){
|
69 | if (self._decoder) {
|
70 | var string = self._decoder.write(data);
|
71 | if (string.length) ret += data;
|
72 | } else {
|
73 | ret+=data;
|
74 | }
|
75 | });
|
76 | return ret;
|
77 | };
|
78 |
|
79 | MemoryStream.prototype.setEncoding = function(encoding) {
|
80 | var StringDecoder = require('string_decoder').StringDecoder;
|
81 | this._decoder = new StringDecoder(encoding);
|
82 | };
|
83 |
|
84 | MemoryStream.prototype.pipe = function(destination, options) {
|
85 |
|
86 | var pump = sys.pump || util.pump;
|
87 |
|
88 | pump(this, destination);
|
89 | };
|
90 |
|
91 | MemoryStream.prototype.pause = function() {
|
92 |
|
93 | this.paused = true;
|
94 | };
|
95 |
|
96 | MemoryStream.prototype.resume = function() {
|
97 |
|
98 | this.paused = false;
|
99 |
|
100 | this._next();
|
101 | };
|
102 |
|
103 | MemoryStream.prototype.end = function(chunk, encoding) {
|
104 |
|
105 | if (typeof chunk !== 'undefined') {
|
106 |
|
107 | this.write(chunk, encoding);
|
108 | }
|
109 |
|
110 | this.writable = false;
|
111 |
|
112 | if (this.queue.length === 0) {
|
113 |
|
114 | this.readable = false;
|
115 | }
|
116 |
|
117 | this.emit('end');
|
118 | };
|
119 |
|
120 | MemoryStream.prototype._getQueueSize = function() {
|
121 | var queuesize = 0;
|
122 | for(var i = 0; i < this.queue.length; i++ ){
|
123 | queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length : this.queue[i].length;
|
124 | }
|
125 | return queuesize;
|
126 | };
|
127 |
|
128 | MemoryStream.prototype.flush = function() {
|
129 |
|
130 | if ( ! this.paused && this.readable && this.queue.length > 0) {
|
131 | var data = this.queue.shift();
|
132 | var cb;
|
133 |
|
134 | if(Array.isArray(data)){
|
135 | cb = data[1];
|
136 | data = data[0];
|
137 | }
|
138 |
|
139 | if (this._decoder) {
|
140 | var string = this._decoder.write(data);
|
141 | if (string.length) this.emit('data', string);
|
142 | } else {
|
143 | this.emit('data', data);
|
144 | }
|
145 |
|
146 | if(cb) cb(null);
|
147 |
|
148 | if(this.reachmaxbuf && this.maxbufsize >= this._getQueueSize()){
|
149 | this.reachmaxbuf = false;
|
150 | this.emit('drain');
|
151 | }
|
152 |
|
153 | return true;
|
154 | }
|
155 |
|
156 | if(!this.writable && !this.queue.length){
|
157 | this.emit('end');
|
158 | }
|
159 |
|
160 | return false;
|
161 | };
|
162 |
|
163 | MemoryStream.prototype.write = function(chunk, encoding, callback) {
|
164 |
|
165 | if ( ! this.writable) {
|
166 |
|
167 | throw new Error('The memory stream is no longer writable.');
|
168 | }
|
169 |
|
170 | if (typeof encoding === 'function') {
|
171 |
|
172 | callback = encoding;
|
173 | encoding = undefined;
|
174 | }
|
175 |
|
176 | if ( ! (chunk instanceof Buffer)) {
|
177 |
|
178 | chunk = new Buffer(chunk, encoding);
|
179 | }
|
180 |
|
181 | var queuesize = chunk.length;
|
182 | if(this.maxbufsize || this.bufoverflow){
|
183 | queuesize += this._getQueueSize();
|
184 | if(this.bufoveflow && queuesize > this.bufoveflow){
|
185 | this.emit('error',"Buffer overflowed (" + this.bufoverflow + "/"+ queuesize + ")");
|
186 | return;
|
187 | }
|
188 | }
|
189 |
|
190 | if(typeof callback === 'function'){
|
191 | this.queue.push([chunk,callback]);
|
192 | }else{
|
193 | this.queue.push(chunk);
|
194 | }
|
195 |
|
196 | this._next();
|
197 |
|
198 | if(this.maxbufsize && queuesize > this.maxbufsize){
|
199 | this.reachmaxbuf = true;
|
200 | return false;
|
201 | }
|
202 |
|
203 | return true;
|
204 | };
|
205 |
|
206 | MemoryStream.prototype.destroy = function() {
|
207 |
|
208 | this.end();
|
209 |
|
210 | this.queue = [];
|
211 |
|
212 | this.readable = false;
|
213 | this.writable = false;
|
214 | };
|