1 | const Streamz = require('streamz');
|
2 | const Promise = require('bluebird');
|
3 | const util = require('util');
|
4 |
|
5 | function Prescan(count,fn) {
|
6 | if (!(this instanceof Prescan))
|
7 | return new Prescan(count,fn);
|
8 |
|
9 | Streamz.call(this);
|
10 |
|
11 | this.count = count;
|
12 | this.fn = fn;
|
13 | this.buffer = [];
|
14 | this.i = 0;
|
15 | }
|
16 |
|
17 | util.inherits(Prescan,Streamz);
|
18 |
|
19 | Prescan.prototype.buffer = undefined;
|
20 |
|
21 | Prescan.prototype._push = function() {
|
22 | if (!this.buffer)
|
23 | return Promise.resolve();
|
24 |
|
25 | const buffer = this.buffer;
|
26 | this.buffer = undefined;
|
27 |
|
28 | return Promise.try(() =>this.fn(buffer))
|
29 | .then(() => buffer.forEach(d => this.push(d)));
|
30 | };
|
31 |
|
32 | Prescan.prototype._fn = function(d) {
|
33 | if (!this.buffer)
|
34 | return d;
|
35 |
|
36 | this.i += d.length || 1;
|
37 | this.buffer.push(d);
|
38 |
|
39 | if (this.i >= this.count)
|
40 | return this._push();
|
41 | };
|
42 |
|
43 | Prescan.prototype._flush = function(cb) {
|
44 | this._push()
|
45 | .then( () => setImmediate( () => Streamz.prototype._flush(cb)));
|
46 | };
|
47 |
|
48 | module.exports = Prescan; |
\ | No newline at end of file |