UNPKG

3.33 kBJavaScriptView Raw
1
2/**
3 * Module dependencies
4 */
5
6var Stream = require('stream').Stream;
7var fs = require('fs');
8
9/**
10 * expose
11 * @ignore
12 */
13
14module.exports = exports = GridReadStream;
15
16/**
17 * GridReadStream
18 *
19 * @param {Grid} grid
20 * @param {String} filename (optional)
21 * @param {Object} options (optional)
22 */
23
24function GridReadStream (grid, filename, options) {
25 if (!(this instanceof GridReadStream))
26 return new GridReadStream(grid, filename, options);
27
28 Stream.call(this);
29 this.paused = false;
30 this.readable = true;
31
32 this._grid = grid;
33
34 // lookup using string filename or _id
35
36 this.name = '';
37
38 var _id;
39 if (filename && filename.toHexString) {
40 this.id = _id = filename;
41 } else if (_id = grid.tryParseObjectId(filename)) {
42 this.id = _id;
43 } else if ('string' == typeof filename) {
44 // do not set this.id, onl passing _id so driver is happy
45 _id = this.name = filename;
46 }
47
48 this.options = filename && 'Object' == filename.constructor.name
49 ? filename
50 : options || {};
51
52 this.mode = 'r';
53
54 this._store = new grid.mongo.GridStore(grid.db, _id, this.name, this.mode, this.options)
55
56 var self = this;
57 process.nextTick(function () {
58 self._open();
59 });
60}
61
62/**
63 * Inherit from Stream
64 * @ignore
65 */
66
67GridReadStream.prototype = { __proto__: Stream.prototype }
68
69// public api
70
71GridReadStream.prototype.readable;
72GridReadStream.prototype.paused;
73
74GridReadStream.prototype.setEncoding = fs.ReadStream.prototype.setEncoding;
75
76/**
77 * pause
78 *
79 * @api public
80 */
81
82GridReadStream.prototype.pause = function pause () {
83 // Overridden when the GridStore opens.
84 this.paused = true;
85}
86
87/**
88 * resume
89 *
90 * @api public
91 */
92
93GridReadStream.prototype.resume = function resume () {
94 // Overridden when the GridStore opens.
95 this.paused = false;
96}
97
98/**
99 * destroy
100 *
101 * @api public
102 */
103
104GridReadStream.prototype.destroy = function destroy () {
105 // Overridden when the GridStore opens.
106 this.readable = false;
107}
108
109// private api
110
111GridReadStream.prototype._open = function _open () {
112 var self = this;
113 this._store.open(function (err) {
114 if (err) return self._error(err);
115 if (!self.readable) return;
116 self.emit('open');
117 self._read();
118 });
119}
120
121GridReadStream.prototype._read = function _read () {
122 if (!this.readable || this.paused || this.reading) {
123 return;
124 }
125
126 this.reading = true;
127
128 var self = this;
129 var stream = this._stream = this._store.stream();
130 stream.paused = this.paused;
131
132 stream.on('data', function (data) {
133 if (self._decoder) {
134 var str = self._decoder.write(data);
135 if (str.length) self.emit('data', str);
136 } else {
137 self.emit('data', data);
138 }
139 });
140
141 stream.on('end', function (data) {
142 self.emit('end', data);
143 });
144
145 stream.on('error', function (data) {
146 self._error(data);
147 });
148
149 stream.on('close', function (data) {
150 self.emit('close', data);
151 });
152
153 this.pause = function () {
154 // native doesn't always pause.
155 // bypass its pause() method to hack it
156 self.paused = stream.paused = true;
157 }
158
159 this.resume = function () {
160 self.paused = false;
161 stream.resume();
162 self.readable = stream.readable;
163 }
164
165 this.destroy = function () {
166 self.readable = false;
167 stream.destroy();
168 }
169}
170
171GridReadStream.prototype._error = function _error (err) {
172 this.readable = false;
173 this.emit('error', err);
174}
175