UNPKG

1.01 kBJavaScriptView Raw
1const Streamz = require('streamz');
2const Promise = require('bluebird');
3const util = require('util');
4
5function Prescan(count,fn) {
6 if (!(this instanceof Prescan))
7 return new Prescan(count,fn);
8
9 Streamz.call(this);
10 // Allow a custom collection function as first argument
11 this.count = count;
12 this.fn = fn;
13 this.buffer = [];
14 this.i = 0;
15}
16
17util.inherits(Prescan,Streamz);
18
19Prescan.prototype.buffer = undefined;
20
21Prescan.prototype._push = function() {
22 if (!this.buffer)
23 return Promise.resolve();
24
25 const buffer = this.buffer;
26 this.buffer = undefined;
27
28 return Promise.try(() =>this.fn(buffer))
29 .then(() => buffer.forEach(d => this.push(d)));
30};
31
32Prescan.prototype._fn = function(d) {
33 if (!this.buffer)
34 return d;
35
36 this.i += d.length || 1;
37 this.buffer.push(d);
38
39 if (this.i >= this.count)
40 return this._push();
41};
42
43Prescan.prototype._flush = function(cb) {
44 this._push()
45 .then( () => setImmediate( () => Streamz.prototype._flush(cb)));
46};
47
48module.exports = Prescan;
\No newline at end of file