UNPKG

4.55 kBJavaScriptView Raw
1(function() {
2 var EncodingError, Errors, FILTER_EXCLUDED, FILTER_INCLUDED, FILTER_STOPPED, ReadStream, Readable, consts, defaultOptions, extend, inherits, isFunction, isObject, isString, minimatch, util;
3
4 Readable = require('readable-stream').Readable;
5
6 minimatch = require('minimatch');
7
8 util = require('abstract-object/lib/util');
9
10 Errors = require('./errors');
11
12 consts = require('./consts');
13
14 inherits = util.inherits;
15
16 isFunction = util.isFunction;
17
18 isObject = util.isObject;
19
20 isString = util.isString;
21
22 extend = util._extend;
23
24 FILTER_INCLUDED = consts.FILTER_INCLUDED;
25
26 FILTER_EXCLUDED = consts.FILTER_EXCLUDED;
27
28 FILTER_STOPPED = consts.FILTER_STOPPED;
29
30 EncodingError = Errors.EncodingError;
31
32 defaultOptions = {
33 highWaterMark: 1e5
34 };
35
36
37 /*
38 readStream is used to search and read the [abstract-nosql](https://github.com/snowyu/abstract-nosql) database.
39
40 you must implement the iterator.next() and iterator.end() to use. (see [abstract-nosql](https://github.com/snowyu/abstract-nosql))
41
42 The resulting stream is a Node.js-style [Readable Stream](http://nodejs.org/docs/latest/api/stream.html#stream_readable_stream)
43 where `'data'` events emit objects with `'key'` and `'value'` pairs.
44
45 You can also use the `gt`, `lt` and `limit` options to control the
46 range of keys that are streamed.
47 */
48
49 module.exports = ReadStream = (function() {
50 inherits(ReadStream, Readable);
51
52 function ReadStream(db, aOptions, aMakeData) {
53 if (!(this instanceof ReadStream)) {
54 return new ReadStream(db, aOptions, aMakeData);
55 }
56 this._options = extend({}, defaultOptions);
57 this._options = extend(this._options, aOptions || {});
58 aOptions = this._options;
59 Readable.call(this, {
60 objectMode: true,
61 highWaterMark: aOptions.highWaterMark
62 });
63 this._waiting = false;
64 if (db == null) {
65 db = aOptions.db;
66 }
67 if (aMakeData) {
68 this._makeData = aMakeData;
69 } else {
70 this._makeData = aOptions.keys !== false && aOptions.values !== false ? function(key, value) {
71 return {
72 key: key,
73 value: value
74 };
75 } : aOptions.values === false ? function(key) {
76 return key;
77 } : aOptions.keys === false ? function(_, value) {
78 return value;
79 } : function() {};
80 }
81 if (db) {
82 if (!db.isOpen || db.isOpen()) {
83 this.setIterator(db.iterator(aOptions));
84 } else {
85 db.once('ready', (function(_this) {
86 return function() {
87 return _this.setIterator(db.iterator(aOptions));
88 };
89 })(this));
90 }
91 }
92 }
93
94 ReadStream.prototype.setIterator = function(aIterator) {
95 this._iterator = aIterator;
96 if (this._destroyed) {
97 return aIterator.end(function() {});
98 }
99 if (this._waiting) {
100 this._waiting = false;
101 return this._read();
102 }
103 return this;
104 };
105
106 ReadStream.prototype._read = function() {
107 var self;
108 if (this._destroyed) {
109 return;
110 }
111 if (!this._iterator) {
112 return this._waiting = true;
113 }
114 self = this;
115 return this._iterator.next(function(err, key, value) {
116 var e;
117 if (err || (key === void 0 && value === void 0)) {
118 if (!err && !self._destroyed) {
119 self.push(null);
120 }
121 return self._cleanup(err);
122 }
123 try {
124 value = self._makeData(key, value);
125 } catch (_error) {
126 e = _error;
127 return self._cleanup(new EncodingError(e));
128 }
129 if (!self._destroyed) {
130 return self.push(value);
131 }
132 });
133 };
134
135 ReadStream.prototype._cleanup = function(aError) {
136 if (this._destroyed) {
137 return;
138 }
139 this._destroyed = true;
140 if (aError) {
141 this.emit('error', err);
142 }
143 if (this._iterator) {
144 this.emit('last', this._iterator.last);
145 return this._iterator.end((function(_this) {
146 return function() {
147 _this._iterator = null;
148 return _this.emit('close');
149 };
150 })(this));
151 } else {
152 return this.emit('close');
153 }
154 };
155
156 ReadStream.prototype.destroy = function() {
157 return this.cleanup();
158 };
159
160 ReadStream.prototype.toString = function() {
161 return 'NoSQLReadStream';
162 };
163
164 return ReadStream;
165
166 })();
167
168}).call(this);
169
170//# sourceMappingURL=read-stream.js.map