UNPKG

8.93 kBJavaScriptView Raw
1import { randomBytes } from "crypto";
2import { read, open, closeSync, unlinkSync, write, close, unlink } from "fs";
3import { tmpdir } from "os";
4import { join } from "path";
5import { Readable, Writable } from "stream";
6import { EventEmitter } from "events";
7export class ReadAfterDestroyedError extends Error {
8}
9export class ReadAfterReleasedError extends Error {
10}
11// Use a “proxy” event emitter configured to have an infinite maximum number of
12// listeners to prevent Node.js max listeners exceeded warnings if many
13// `fs-capacitor` `ReadStream` instances are created at the same time. See:
14// https://github.com/mike-marcacci/fs-capacitor/issues/30
15const processExitProxy = new EventEmitter();
16processExitProxy.setMaxListeners(Infinity);
17process.once("exit", () => processExitProxy.emit("exit"));
18export class ReadStream extends Readable {
19 constructor(writeStream, options) {
20 super({
21 highWaterMark: options === null || options === void 0 ? void 0 : options.highWaterMark,
22 encoding: options === null || options === void 0 ? void 0 : options.encoding,
23 autoDestroy: true,
24 });
25 this._pos = 0;
26 this._writeStream = writeStream;
27 }
28 _read(n) {
29 if (this.destroyed)
30 return;
31 if (typeof this._writeStream["_fd"] !== "number") {
32 this._writeStream.once("ready", () => this._read(n));
33 return;
34 }
35 // Using `allocUnsafe` here is OK because we return a slice the length of
36 // `bytesRead`, and discard the rest. This prevents node from having to zero
37 // out the entire allocation first.
38 const buf = Buffer.allocUnsafe(n);
39 read(this._writeStream["_fd"], buf, 0, n, this._pos, (error, bytesRead) => {
40 if (error)
41 this.destroy(error);
42 // Push any read bytes into the local stream buffer.
43 if (bytesRead) {
44 this._pos += bytesRead;
45 this.push(buf.slice(0, bytesRead));
46 return;
47 }
48 // If there were no more bytes to read and the write stream is finished,
49 // than this stream has reached the end.
50 if (this._writeStream._writableState.finished) {
51 this.push(null);
52 return;
53 }
54 // Otherwise, wait for the write stream to add more data or finish.
55 const retry = () => {
56 this._writeStream.off("finish", retry);
57 this._writeStream.off("write", retry);
58 this._read(n);
59 };
60 this._writeStream.on("finish", retry);
61 this._writeStream.on("write", retry);
62 });
63 }
64}
65export class WriteStream extends Writable {
66 constructor(options) {
67 super({
68 highWaterMark: options === null || options === void 0 ? void 0 : options.highWaterMark,
69 defaultEncoding: options === null || options === void 0 ? void 0 : options.defaultEncoding,
70 autoDestroy: false,
71 });
72 this._fd = null;
73 this._path = null;
74 this._pos = 0;
75 this._readStreams = new Set();
76 this._released = false;
77 this._cleanup = (callback) => {
78 const fd = this._fd;
79 const path = this._path;
80 if (typeof fd !== "number" || typeof path !== "string") {
81 callback(null);
82 return;
83 }
84 // Close the file descriptor.
85 close(fd, (closeError) => {
86 // An error here probably means the fd was already closed, but we can
87 // still try to unlink the file.
88 unlink(path, (unlinkError) => {
89 // If we are unable to unlink the file, the operating system will
90 // clean up on next restart, since we use store thes in `os.tmpdir()`
91 this._fd = null;
92 // We avoid removing this until now in case an exit occurs while
93 // asyncronously cleaning up.
94 processExitProxy.off("exit", this._cleanupSync);
95 callback(unlinkError !== null && unlinkError !== void 0 ? unlinkError : closeError);
96 });
97 });
98 };
99 this._cleanupSync = () => {
100 processExitProxy.off("exit", this._cleanupSync);
101 if (typeof this._fd === "number")
102 try {
103 closeSync(this._fd);
104 }
105 catch (error) {
106 // An error here probably means the fd was already closed, but we can
107 // still try to unlink the file.
108 }
109 try {
110 if (this._path !== null) {
111 unlinkSync(this._path);
112 }
113 }
114 catch (error) {
115 // If we are unable to unlink the file, the operating system will clean
116 // up on next restart, since we use store thes in `os.tmpdir()`
117 }
118 };
119 // Generate a random filename.
120 randomBytes(16, (error, buffer) => {
121 var _a;
122 if (error) {
123 this.destroy(error);
124 return;
125 }
126 this._path = join(((_a = options === null || options === void 0 ? void 0 : options.tmpdir) !== null && _a !== void 0 ? _a : tmpdir)(), `capacitor-${buffer.toString("hex")}.tmp`);
127 // Create a file in the OS's temporary files directory.
128 open(this._path, "wx+", 0o600, (error, fd) => {
129 if (error) {
130 this.destroy(error);
131 return;
132 }
133 // Cleanup when the process exits or is killed.
134 processExitProxy.once("exit", this._cleanupSync);
135 this._fd = fd;
136 this.emit("ready");
137 });
138 });
139 }
140 _final(callback) {
141 if (typeof this._fd !== "number") {
142 this.once("ready", () => this._final(callback));
143 return;
144 }
145 callback();
146 }
147 _write(chunk, encoding, callback) {
148 if (typeof this._fd !== "number") {
149 this.once("ready", () => this._write(chunk, encoding, callback));
150 return;
151 }
152 write(this._fd, chunk, 0, chunk.length, this._pos, (error) => {
153 if (error) {
154 callback(error);
155 return;
156 }
157 // It's safe to increment `this._pos` after flushing to the filesystem
158 // because node streams ensure that only one `_write()` is active at a
159 // time. If this assumption is broken, the behavior of this library is
160 // undefined, regardless of where this is incremented. Relocating this
161 // to increment syncronously would result in correct file contents, but
162 // the out-of-order writes would still open the potential for read streams
163 // to scan positions that have not yet been written.
164 this._pos += chunk.length;
165 this.emit("write");
166 callback();
167 });
168 }
169 release() {
170 this._released = true;
171 if (this._readStreams.size === 0)
172 this.destroy();
173 }
174 _destroy(error, callback) {
175 // Destroy all attached read streams.
176 for (const readStream of this._readStreams) {
177 readStream.destroy(error || undefined);
178 }
179 // This capacitor is fully initialized.
180 if (typeof this._fd === "number" && typeof this._path === "string") {
181 this._cleanup((cleanupError) => callback(cleanupError !== null && cleanupError !== void 0 ? cleanupError : error));
182 return;
183 }
184 // This capacitor has not yet finished initialization; if initialization
185 // does complete, immediately clean up after.
186 this.once("ready", () => {
187 this._cleanup((cleanupError) => {
188 if (cleanupError) {
189 this.emit("error", cleanupError);
190 }
191 });
192 });
193 callback(error);
194 }
195 createReadStream(options) {
196 if (this.destroyed)
197 throw new ReadAfterDestroyedError("A ReadStream cannot be created from a destroyed WriteStream.");
198 if (this._released)
199 throw new ReadAfterReleasedError("A ReadStream cannot be created from a released WriteStream.");
200 const readStream = new ReadStream(this, options);
201 this._readStreams.add(readStream);
202 readStream.once("close", () => {
203 this._readStreams.delete(readStream);
204 if (this._released && this._readStreams.size === 0) {
205 this.destroy();
206 }
207 });
208 return readStream;
209 }
210}
211export default {
212 WriteStream,
213 ReadStream,
214 ReadAfterDestroyedError,
215 ReadAfterReleasedError,
216};
217//# sourceMappingURL=index.js.map
\No newline at end of file