1 | var stream = require('stream')
|
2 | var inherits = require('util').inherits
|
3 |
|
4 | inherits(Values, stream.Readable)
|
5 |
|
6 | function Values (v) {
|
7 | this.i = 0
|
8 | this.values = v
|
9 | stream.Readable.call(this, {objectMode: true})
|
10 | }
|
11 |
|
12 | Values.prototype._read = function () {
|
13 | if(this.i >= this.values.length)
|
14 | this.push(null)
|
15 | else
|
16 | this.push(this.values[this.i++])
|
17 | }
|
18 |
|
19 |
|
20 | inherits(Async, stream.Transform)
|
21 |
|
22 | function Async (fn) {
|
23 | this._map = fn
|
24 | stream.Transform.call(this, {objectMode: true})
|
25 | }
|
26 |
|
27 | Async.prototype._transform = function (chunk, _, callback) {
|
28 | var self = this
|
29 | this._map(chunk, function (err, data) {
|
30 | self.push(JSON.parse(data))
|
31 |
|
32 | setImmediate(callback)
|
33 | })
|
34 | }
|
35 | Async.prototype._flush = function (callback) {
|
36 | this.push(null)
|
37 | setImmediate(callback)
|
38 | }
|
39 |
|
40 | inherits(Collect, stream.Writable)
|
41 |
|
42 | function Collect (cb) {
|
43 | this._ary = []
|
44 | this._cb = cb
|
45 | stream.Writable.call(this, {objectMode: true})
|
46 | }
|
47 |
|
48 | Collect.prototype._write = function (chunk, _, callback) {
|
49 | this._ary.push(chunk)
|
50 | setImmediate(callback)
|
51 | }
|
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 |
|
62 |
|
63 |
|
64 |
|
65 |
|
66 | Collect.prototype.end = function () {
|
67 | this._cb(null, this._ary)
|
68 | }
|
69 |
|
70 | var bench = require('fastbench')
|
71 | const values = [
|
72 | JSON.stringify({ hello: 'world' }),
|
73 | JSON.stringify({ foo: 'bar' }),
|
74 | JSON.stringify({ bin: 'baz' })
|
75 | ]
|
76 |
|
77 | const run = bench([
|
78 | function pull3 (done) {
|
79 | var c = new Collect(function (err, array) {
|
80 | if (err) return console.error(err)
|
81 | if(array.length < 3) throw new Error('wrong array')
|
82 | setImmediate(done)
|
83 | })
|
84 |
|
85 | new Values(values)
|
86 | .pipe(new Async(function (val, done) {
|
87 | done(null, val)
|
88 | }))
|
89 | .pipe(c)
|
90 | }]
|
91 | , N=100000)
|
92 |
|
93 | var heap = process.memoryUsage().heapUsed
|
94 | run(function () {
|
95 | console.log((process.memoryUsage().heapUsed - heap)/N)
|
96 | })
|
97 |
|