1 | var Readable = require('readable-stream').Readable
|
2 | var inherits = require('inherits')
|
3 |
|
4 | module.exports = from2
|
5 |
|
6 | from2.ctor = ctor
|
7 | from2.obj = obj
|
8 |
|
9 | var Proto = ctor()
|
10 |
|
11 | function toFunction(list) {
|
12 | list = list.slice()
|
13 | return function (_, cb) {
|
14 | var err = null
|
15 | var item = list.length ? list.shift() : null
|
16 | if (item instanceof Error) {
|
17 | err = item
|
18 | item = null
|
19 | }
|
20 |
|
21 | cb(err, item)
|
22 | }
|
23 | }
|
24 |
|
25 | function from2(opts, read) {
|
26 | if (typeof opts !== 'object' || Array.isArray(opts)) {
|
27 | read = opts
|
28 | opts = {}
|
29 | }
|
30 |
|
31 | var rs = new Proto(opts)
|
32 | rs._from = Array.isArray(read) ? toFunction(read) : (read || noop)
|
33 | return rs
|
34 | }
|
35 |
|
36 | function ctor(opts, read) {
|
37 | if (typeof opts === 'function') {
|
38 | read = opts
|
39 | opts = {}
|
40 | }
|
41 |
|
42 | opts = defaults(opts)
|
43 |
|
44 | inherits(Class, Readable)
|
45 | function Class(override) {
|
46 | if (!(this instanceof Class)) return new Class(override)
|
47 | this._reading = false
|
48 | this._callback = check
|
49 | this.destroyed = false
|
50 | Readable.call(this, override || opts)
|
51 |
|
52 | var self = this
|
53 | var hwm = this._readableState.highWaterMark
|
54 |
|
55 | function check(err, data) {
|
56 | if (self.destroyed) return
|
57 | if (err) return self.destroy(err)
|
58 | if (data === null) return self.push(null)
|
59 | self._reading = false
|
60 | if (self.push(data)) self._read(hwm)
|
61 | }
|
62 | }
|
63 |
|
64 | Class.prototype._from = read || noop
|
65 | Class.prototype._read = function(size) {
|
66 | if (this._reading || this.destroyed) return
|
67 | this._reading = true
|
68 | this._from(size, this._callback)
|
69 | }
|
70 |
|
71 | Class.prototype.destroy = function(err) {
|
72 | if (this.destroyed) return
|
73 | this.destroyed = true
|
74 |
|
75 | var self = this
|
76 | process.nextTick(function() {
|
77 | if (err) self.emit('error', err)
|
78 | self.emit('close')
|
79 | })
|
80 | }
|
81 |
|
82 | return Class
|
83 | }
|
84 |
|
85 | function obj(opts, read) {
|
86 | if (typeof opts === 'function' || Array.isArray(opts)) {
|
87 | read = opts
|
88 | opts = {}
|
89 | }
|
90 |
|
91 | opts = defaults(opts)
|
92 | opts.objectMode = true
|
93 | opts.highWaterMark = 16
|
94 |
|
95 | return from2(opts, read)
|
96 | }
|
97 |
|
98 | function noop () {}
|
99 |
|
100 | function defaults(opts) {
|
101 | opts = opts || {}
|
102 | return opts
|
103 | }
|