UNPKG

9.62 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.WriteStream = exports.ReadStream = exports.ReadAfterReleasedError = exports.ReadAfterDestroyedError = void 0;
4const crypto_1 = require("crypto");
5const fs_1 = require("fs");
6const os_1 = require("os");
7const path_1 = require("path");
8const stream_1 = require("stream");
9const events_1 = require("events");
10class ReadAfterDestroyedError extends Error {
11}
12exports.ReadAfterDestroyedError = ReadAfterDestroyedError;
13class ReadAfterReleasedError extends Error {
14}
15exports.ReadAfterReleasedError = ReadAfterReleasedError;
16// Use a “proxy” event emitter configured to have an infinite maximum number of
17// listeners to prevent Node.js max listeners exceeded warnings if many
18// `fs-capacitor` `ReadStream` instances are created at the same time. See:
19// https://github.com/mike-marcacci/fs-capacitor/issues/30
20const processExitProxy = new events_1.EventEmitter();
21processExitProxy.setMaxListeners(Infinity);
22process.once('exit', () => processExitProxy.emit('exit'));
23class ReadStream extends stream_1.Readable {
24 constructor(writeStream, options) {
25 super({
26 highWaterMark: options === null || options === void 0 ? void 0 : options.highWaterMark,
27 encoding: options === null || options === void 0 ? void 0 : options.encoding,
28 autoDestroy: true,
29 });
30 this._pos = 0;
31 this._writeStream = writeStream;
32 }
33 _read(n) {
34 if (this.destroyed)
35 return;
36 if (typeof this._writeStream['_fd'] !== 'number') {
37 this._writeStream.once('ready', () => this._read(n));
38 return;
39 }
40 // Using `allocUnsafe` here is OK because we return a slice the length of
41 // `bytesRead`, and discard the rest. This prevents node from having to zero
42 // out the entire allocation first.
43 const buf = Buffer.allocUnsafe(n);
44 (0, fs_1.read)(this._writeStream['_fd'], buf, 0, n, this._pos, (error, bytesRead) => {
45 if (error)
46 this.destroy(error);
47 // Push any read bytes into the local stream buffer.
48 if (bytesRead) {
49 this._pos += bytesRead;
50 this.push(buf.slice(0, bytesRead));
51 return;
52 }
53 // If there were no more bytes to read and the write stream is finished,
54 // then this stream has reached the end.
55 if (this._writeStream._writableState.finished) {
56 // Check if we have consumed the whole file up to where
57 // the write stream has written before ending the stream
58 if (this._pos < this._writeStream._pos)
59 this._read(n);
60 else
61 this.push(null);
62 return;
63 }
64 // Otherwise, wait for the write stream to add more data or finish.
65 const retry = () => {
66 this._writeStream.off('finish', retry);
67 this._writeStream.off('write', retry);
68 this._read(n);
69 };
70 this._writeStream.on('finish', retry);
71 this._writeStream.on('write', retry);
72 });
73 }
74}
75exports.ReadStream = ReadStream;
76class WriteStream extends stream_1.Writable {
77 constructor(options) {
78 super({
79 highWaterMark: options === null || options === void 0 ? void 0 : options.highWaterMark,
80 defaultEncoding: options === null || options === void 0 ? void 0 : options.defaultEncoding,
81 autoDestroy: false,
82 });
83 this._fd = null;
84 this._path = null;
85 this._pos = 0;
86 this._readStreams = new Set();
87 this._released = false;
88 this._cleanup = (callback) => {
89 const fd = this._fd;
90 const path = this._path;
91 if (typeof fd !== 'number' || typeof path !== 'string') {
92 callback(null);
93 return;
94 }
95 // Close the file descriptor.
96 (0, fs_1.close)(fd, (closeError) => {
97 // An error here probably means the fd was already closed, but we can
98 // still try to unlink the file.
99 (0, fs_1.unlink)(path, (unlinkError) => {
100 // If we are unable to unlink the file, the operating system will
101 // clean up on next restart, since we use store thes in `os.tmpdir()`
102 this._fd = null;
103 // We avoid removing this until now in case an exit occurs while
104 // asyncronously cleaning up.
105 processExitProxy.off('exit', this._cleanupSync);
106 callback(unlinkError !== null && unlinkError !== void 0 ? unlinkError : closeError);
107 });
108 });
109 };
110 this._cleanupSync = () => {
111 processExitProxy.off('exit', this._cleanupSync);
112 if (typeof this._fd === 'number')
113 try {
114 (0, fs_1.closeSync)(this._fd);
115 }
116 catch (error) {
117 // An error here probably means the fd was already closed, but we can
118 // still try to unlink the file.
119 }
120 try {
121 if (this._path !== null) {
122 (0, fs_1.unlinkSync)(this._path);
123 }
124 }
125 catch (error) {
126 // If we are unable to unlink the file, the operating system will clean
127 // up on next restart, since we use store thes in `os.tmpdir()`
128 }
129 };
130 // Generate a random filename.
131 (0, crypto_1.randomBytes)(16, (error, buffer) => {
132 var _a;
133 if (error) {
134 this.destroy(error);
135 return;
136 }
137 this._path = (0, path_1.join)(((_a = options === null || options === void 0 ? void 0 : options.tmpdir) !== null && _a !== void 0 ? _a : os_1.tmpdir)(), `capacitor-${buffer.toString('hex')}.tmp`);
138 // Create a file in the OS's temporary files directory.
139 (0, fs_1.open)(this._path, 'wx+', 0o600, (error, fd) => {
140 if (error) {
141 this.destroy(error);
142 return;
143 }
144 // Cleanup when the process exits or is killed.
145 processExitProxy.once('exit', this._cleanupSync);
146 this._fd = fd;
147 this.emit('ready');
148 });
149 });
150 }
151 _final(callback) {
152 if (typeof this._fd !== 'number') {
153 this.once('ready', () => this._final(callback));
154 return;
155 }
156 callback();
157 }
158 _write(chunk, encoding, callback) {
159 if (typeof this._fd !== 'number') {
160 this.once('ready', () => this._write(chunk, encoding, callback));
161 return;
162 }
163 (0, fs_1.write)(this._fd, chunk, 0, chunk.length, this._pos, (error) => {
164 if (error) {
165 callback(error);
166 return;
167 }
168 // It's safe to increment `this._pos` after flushing to the filesystem
169 // because node streams ensure that only one `_write()` is active at a
170 // time. If this assumption is broken, the behavior of this library is
171 // undefined, regardless of where this is incremented. Relocating this
172 // to increment syncronously would result in correct file contents, but
173 // the out-of-order writes would still open the potential for read streams
174 // to scan positions that have not yet been written.
175 this._pos += chunk.length;
176 this.emit('write');
177 callback();
178 });
179 }
180 release() {
181 this._released = true;
182 if (this._readStreams.size === 0)
183 this.destroy();
184 }
185 _destroy(error, callback) {
186 // Destroy all attached read streams.
187 for (const readStream of this._readStreams) {
188 readStream.destroy(error || undefined);
189 }
190 // This capacitor is fully initialized.
191 if (typeof this._fd === 'number' && typeof this._path === 'string') {
192 this._cleanup((cleanupError) => callback(cleanupError !== null && cleanupError !== void 0 ? cleanupError : error));
193 return;
194 }
195 // This capacitor has not yet finished initialization; if initialization
196 // does complete, immediately clean up after.
197 this.once('ready', () => {
198 this._cleanup((cleanupError) => {
199 if (cleanupError) {
200 this.emit('error', cleanupError);
201 }
202 });
203 });
204 callback(error);
205 }
206 createReadStream(options) {
207 if (this.destroyed)
208 throw new ReadAfterDestroyedError('A ReadStream cannot be created from a destroyed WriteStream.');
209 if (this._released)
210 throw new ReadAfterReleasedError('A ReadStream cannot be created from a released WriteStream.');
211 const readStream = new ReadStream(this, options);
212 this._readStreams.add(readStream);
213 readStream.once('close', () => {
214 this._readStreams.delete(readStream);
215 if (this._released && this._readStreams.size === 0) {
216 this.destroy();
217 }
218 });
219 return readStream;
220 }
221}
222exports.WriteStream = WriteStream;
223exports.default = {
224 WriteStream,
225 ReadStream,
226 ReadAfterDestroyedError,
227 ReadAfterReleasedError,
228};
229//# sourceMappingURL=fs-capacitor.js.map
\No newline at end of file