UNPKG

2.15 kBJavaScriptView Raw
1var stream = require('stream')
2var inherits = require('util').inherits
3
4inherits(Values, stream.Readable)
5
6function Values (v) {
7 this.i = 0
8 this.values = v
9 stream.Readable.call(this, {objectMode: true})
10}
11
12Values.prototype._read = function () {
13 if(this.i >= this.values.length)
14 this.push(null)
15 else
16 this.push(this.values[this.i++])
17}
18
19
20inherits(Async, stream.Transform)
21
22function Async (fn) {
23 this._map = fn
24 stream.Transform.call(this, {objectMode: true})
25}
26
27Async.prototype._transform = function (chunk, _, callback) {
28 var self = this
29 this._map(chunk, function (err, data) {
30 self.push(JSON.parse(data))
31 //it seems that this HAS to be async, which slows this down a lot.
32 setImmediate(callback)
33 })
34}
35Async.prototype._flush = function (callback) {
36 this.push(null)
37 setImmediate(callback)
38}
39
40inherits(Collect, stream.Writable)
41
42function Collect (cb) {
43 this._ary = []
44 this._cb = cb
45 stream.Writable.call(this, {objectMode: true})
46}
47
48Collect.prototype._write = function (chunk, _, callback) {
49 this._ary.push(chunk)
50 setImmediate(callback)
51}
52
53//I couldn't figure out which method you are ment to override to implement a writable
54//stream so I ended up just using .end and that worked.
55
56//Collect.prototype._destroy = Collect.prototype._final = function (callback) {
57// this._cb(this._ary)
58// callback()
59//}
60//
61//Collect.prototype._flush = function (callback) {
62// this._cb(this._ary)
63// callback()
64//}
65//
66Collect.prototype.end = function () {
67 this._cb(null, this._ary)
68}
69
70var bench = require('fastbench')
71const values = [
72 JSON.stringify({ hello: 'world' }),
73 JSON.stringify({ foo: 'bar' }),
74 JSON.stringify({ bin: 'baz' })
75]
76
77const run = bench([
78 function pull3 (done) {
79 var c = new Collect(function (err, array) {
80 if (err) return console.error(err)
81 if(array.length < 3) throw new Error('wrong array')
82 setImmediate(done)
83 })
84
85 new Values(values)
86 .pipe(new Async(function (val, done) {
87 done(null, val)
88 }))
89 .pipe(c)
90 }]
91, N=100000)
92
93var heap = process.memoryUsage().heapUsed
94run(function () {
95 console.log((process.memoryUsage().heapUsed - heap)/N)
96})
97