UNPKG

17.6 kBJavaScriptView Raw
1/*
2 MIT License http://www.opensource.org/licenses/mit-license.php
3*/
4
5"use strict";
6
7const { constants } = require("buffer");
8const { pipeline } = require("stream");
9const {
10 createBrotliCompress,
11 createBrotliDecompress,
12 createGzip,
13 createGunzip,
14 constants: zConstants
15} = require("zlib");
16const createHash = require("../util/createHash");
17const { dirname, join, mkdirp } = require("../util/fs");
18const memoize = require("../util/memoize");
19const SerializerMiddleware = require("./SerializerMiddleware");
20
21/** @typedef {typeof import("../util/Hash")} Hash */
22/** @typedef {import("../util/fs").IntermediateFileSystem} IntermediateFileSystem */
23/** @typedef {import("./types").BufferSerializableType} BufferSerializableType */
24
25/*
26Format:
27
28File -> Header Section*
29
30Version -> u32
31AmountOfSections -> u32
32SectionSize -> i32 (if less than zero represents lazy value)
33
34Header -> Version AmountOfSections SectionSize*
35
36Buffer -> n bytes
37Section -> Buffer
38
39*/
40
41// "wpc" + 1 in little-endian
42const VERSION = 0x01637077;
43
44/**
45 * @param {Buffer[]} buffers buffers
46 * @param {string | Hash} hashFunction hash function to use
47 * @returns {string} hash
48 */
49const hashForName = (buffers, hashFunction) => {
50 const hash = createHash(hashFunction);
51 for (const buf of buffers) hash.update(buf);
52 return /** @type {string} */ (hash.digest("hex"));
53};
54
55const COMPRESSION_CHUNK_SIZE = 100 * 1024 * 1024;
56const DECOMPRESSION_CHUNK_SIZE = 100 * 1024 * 1024;
57
58const writeUInt64LE = Buffer.prototype.writeBigUInt64LE
59 ? (buf, value, offset) => {
60 buf.writeBigUInt64LE(BigInt(value), offset);
61 }
62 : (buf, value, offset) => {
63 const low = value % 0x100000000;
64 const high = (value - low) / 0x100000000;
65 buf.writeUInt32LE(low, offset);
66 buf.writeUInt32LE(high, offset + 4);
67 };
68
69const readUInt64LE = Buffer.prototype.readBigUInt64LE
70 ? (buf, offset) => {
71 return Number(buf.readBigUInt64LE(offset));
72 }
73 : (buf, offset) => {
74 const low = buf.readUInt32LE(offset);
75 const high = buf.readUInt32LE(offset + 4);
76 return high * 0x100000000 + low;
77 };
78
79/**
80 * @typedef {Object} SerializeResult
81 * @property {string | false} name
82 * @property {number} size
83 * @property {Promise=} backgroundJob
84 */
85
86/**
87 * @param {FileMiddleware} middleware this
88 * @param {BufferSerializableType[] | Promise<BufferSerializableType[]>} data data to be serialized
89 * @param {string | boolean} name file base name
90 * @param {function(string | false, Buffer[]): Promise<void>} writeFile writes a file
91 * @param {string | Hash} hashFunction hash function to use
92 * @returns {Promise<SerializeResult>} resulting file pointer and promise
93 */
94const serialize = async (
95 middleware,
96 data,
97 name,
98 writeFile,
99 hashFunction = "md4"
100) => {
101 /** @type {(Buffer[] | Buffer | SerializeResult | Promise<SerializeResult>)[]} */
102 const processedData = [];
103 /** @type {WeakMap<SerializeResult, function(): any | Promise<any>>} */
104 const resultToLazy = new WeakMap();
105 /** @type {Buffer[]} */
106 let lastBuffers = undefined;
107 for (const item of await data) {
108 if (typeof item === "function") {
109 if (!SerializerMiddleware.isLazy(item))
110 throw new Error("Unexpected function");
111 if (!SerializerMiddleware.isLazy(item, middleware)) {
112 throw new Error(
113 "Unexpected lazy value with non-this target (can't pass through lazy values)"
114 );
115 }
116 lastBuffers = undefined;
117 const serializedInfo = SerializerMiddleware.getLazySerializedValue(item);
118 if (serializedInfo) {
119 if (typeof serializedInfo === "function") {
120 throw new Error(
121 "Unexpected lazy value with non-this target (can't pass through lazy values)"
122 );
123 } else {
124 processedData.push(serializedInfo);
125 }
126 } else {
127 const content = item();
128 if (content) {
129 const options = SerializerMiddleware.getLazyOptions(item);
130 processedData.push(
131 serialize(
132 middleware,
133 content,
134 (options && options.name) || true,
135 writeFile,
136 hashFunction
137 ).then(result => {
138 /** @type {any} */ (item).options.size = result.size;
139 resultToLazy.set(result, item);
140 return result;
141 })
142 );
143 } else {
144 throw new Error(
145 "Unexpected falsy value returned by lazy value function"
146 );
147 }
148 }
149 } else if (item) {
150 if (lastBuffers) {
151 lastBuffers.push(item);
152 } else {
153 lastBuffers = [item];
154 processedData.push(lastBuffers);
155 }
156 } else {
157 throw new Error("Unexpected falsy value in items array");
158 }
159 }
160 /** @type {Promise<any>[]} */
161 const backgroundJobs = [];
162 const resolvedData = (
163 await Promise.all(
164 /** @type {Promise<Buffer[] | Buffer | SerializeResult>[]} */ (
165 processedData
166 )
167 )
168 ).map(item => {
169 if (Array.isArray(item) || Buffer.isBuffer(item)) return item;
170
171 backgroundJobs.push(item.backgroundJob);
172 // create pointer buffer from size and name
173 const name = /** @type {string} */ (item.name);
174 const nameBuffer = Buffer.from(name);
175 const buf = Buffer.allocUnsafe(8 + nameBuffer.length);
176 writeUInt64LE(buf, item.size, 0);
177 nameBuffer.copy(buf, 8, 0);
178 const lazy = resultToLazy.get(item);
179 SerializerMiddleware.setLazySerializedValue(lazy, buf);
180 return buf;
181 });
182 const lengths = [];
183 for (const item of resolvedData) {
184 if (Array.isArray(item)) {
185 let l = 0;
186 for (const b of item) l += b.length;
187 while (l > 0x7fffffff) {
188 lengths.push(0x7fffffff);
189 l -= 0x7fffffff;
190 }
191 lengths.push(l);
192 } else if (item) {
193 lengths.push(-item.length);
194 } else {
195 throw new Error("Unexpected falsy value in resolved data " + item);
196 }
197 }
198 const header = Buffer.allocUnsafe(8 + lengths.length * 4);
199 header.writeUInt32LE(VERSION, 0);
200 header.writeUInt32LE(lengths.length, 4);
201 for (let i = 0; i < lengths.length; i++) {
202 header.writeInt32LE(lengths[i], 8 + i * 4);
203 }
204 const buf = [header];
205 for (const item of resolvedData) {
206 if (Array.isArray(item)) {
207 for (const b of item) buf.push(b);
208 } else if (item) {
209 buf.push(item);
210 }
211 }
212 if (name === true) {
213 name = hashForName(buf, hashFunction);
214 }
215 backgroundJobs.push(writeFile(name, buf));
216 let size = 0;
217 for (const b of buf) size += b.length;
218 return {
219 size,
220 name,
221 backgroundJob:
222 backgroundJobs.length === 1
223 ? backgroundJobs[0]
224 : Promise.all(backgroundJobs)
225 };
226};
227
228/**
229 * @param {FileMiddleware} middleware this
230 * @param {string | false} name filename
231 * @param {function(string | false): Promise<Buffer[]>} readFile read content of a file
232 * @returns {Promise<BufferSerializableType[]>} deserialized data
233 */
234const deserialize = async (middleware, name, readFile) => {
235 const contents = await readFile(name);
236 if (contents.length === 0) throw new Error("Empty file " + name);
237 let contentsIndex = 0;
238 let contentItem = contents[0];
239 let contentItemLength = contentItem.length;
240 let contentPosition = 0;
241 if (contentItemLength === 0) throw new Error("Empty file " + name);
242 const nextContent = () => {
243 contentsIndex++;
244 contentItem = contents[contentsIndex];
245 contentItemLength = contentItem.length;
246 contentPosition = 0;
247 };
248 const ensureData = n => {
249 if (contentPosition === contentItemLength) {
250 nextContent();
251 }
252 while (contentItemLength - contentPosition < n) {
253 const remaining = contentItem.slice(contentPosition);
254 let lengthFromNext = n - remaining.length;
255 const buffers = [remaining];
256 for (let i = contentsIndex + 1; i < contents.length; i++) {
257 const l = contents[i].length;
258 if (l > lengthFromNext) {
259 buffers.push(contents[i].slice(0, lengthFromNext));
260 contents[i] = contents[i].slice(lengthFromNext);
261 lengthFromNext = 0;
262 break;
263 } else {
264 buffers.push(contents[i]);
265 contentsIndex = i;
266 lengthFromNext -= l;
267 }
268 }
269 if (lengthFromNext > 0) throw new Error("Unexpected end of data");
270 contentItem = Buffer.concat(buffers, n);
271 contentItemLength = n;
272 contentPosition = 0;
273 }
274 };
275 const readUInt32LE = () => {
276 ensureData(4);
277 const value = contentItem.readUInt32LE(contentPosition);
278 contentPosition += 4;
279 return value;
280 };
281 const readInt32LE = () => {
282 ensureData(4);
283 const value = contentItem.readInt32LE(contentPosition);
284 contentPosition += 4;
285 return value;
286 };
287 const readSlice = l => {
288 ensureData(l);
289 if (contentPosition === 0 && contentItemLength === l) {
290 const result = contentItem;
291 if (contentsIndex + 1 < contents.length) {
292 nextContent();
293 } else {
294 contentPosition = l;
295 }
296 return result;
297 }
298 const result = contentItem.slice(contentPosition, contentPosition + l);
299 contentPosition += l;
300 // we clone the buffer here to allow the original content to be garbage collected
301 return l * 2 < contentItem.buffer.byteLength ? Buffer.from(result) : result;
302 };
303 const version = readUInt32LE();
304 if (version !== VERSION) {
305 throw new Error("Invalid file version");
306 }
307 const sectionCount = readUInt32LE();
308 const lengths = [];
309 let lastLengthPositive = false;
310 for (let i = 0; i < sectionCount; i++) {
311 const value = readInt32LE();
312 const valuePositive = value >= 0;
313 if (lastLengthPositive && valuePositive) {
314 lengths[lengths.length - 1] += value;
315 } else {
316 lengths.push(value);
317 lastLengthPositive = valuePositive;
318 }
319 }
320 const result = [];
321 for (let length of lengths) {
322 if (length < 0) {
323 const slice = readSlice(-length);
324 const size = Number(readUInt64LE(slice, 0));
325 const nameBuffer = slice.slice(8);
326 const name = nameBuffer.toString();
327 result.push(
328 SerializerMiddleware.createLazy(
329 memoize(() => deserialize(middleware, name, readFile)),
330 middleware,
331 {
332 name,
333 size
334 },
335 slice
336 )
337 );
338 } else {
339 if (contentPosition === contentItemLength) {
340 nextContent();
341 } else if (contentPosition !== 0) {
342 if (length <= contentItemLength - contentPosition) {
343 result.push(
344 Buffer.from(
345 contentItem.buffer,
346 contentItem.byteOffset + contentPosition,
347 length
348 )
349 );
350 contentPosition += length;
351 length = 0;
352 } else {
353 const l = contentItemLength - contentPosition;
354 result.push(
355 Buffer.from(
356 contentItem.buffer,
357 contentItem.byteOffset + contentPosition,
358 l
359 )
360 );
361 length -= l;
362 contentPosition = contentItemLength;
363 }
364 } else {
365 if (length >= contentItemLength) {
366 result.push(contentItem);
367 length -= contentItemLength;
368 contentPosition = contentItemLength;
369 } else {
370 result.push(
371 Buffer.from(contentItem.buffer, contentItem.byteOffset, length)
372 );
373 contentPosition += length;
374 length = 0;
375 }
376 }
377 while (length > 0) {
378 nextContent();
379 if (length >= contentItemLength) {
380 result.push(contentItem);
381 length -= contentItemLength;
382 contentPosition = contentItemLength;
383 } else {
384 result.push(
385 Buffer.from(contentItem.buffer, contentItem.byteOffset, length)
386 );
387 contentPosition += length;
388 length = 0;
389 }
390 }
391 }
392 }
393 return result;
394};
395
396/**
397 * @typedef {BufferSerializableType[]} DeserializedType
398 * @typedef {true} SerializedType
399 * @extends {SerializerMiddleware<DeserializedType, SerializedType>}
400 */
401class FileMiddleware extends SerializerMiddleware {
402 /**
403 * @param {IntermediateFileSystem} fs filesystem
404 * @param {string | Hash} hashFunction hash function to use
405 */
406 constructor(fs, hashFunction = "md4") {
407 super();
408 this.fs = fs;
409 this._hashFunction = hashFunction;
410 }
411 /**
412 * @param {DeserializedType} data data
413 * @param {Object} context context object
414 * @returns {SerializedType|Promise<SerializedType>} serialized data
415 */
416 serialize(data, context) {
417 const { filename, extension = "" } = context;
418 return new Promise((resolve, reject) => {
419 mkdirp(this.fs, dirname(this.fs, filename), err => {
420 if (err) return reject(err);
421
422 // It's important that we don't touch existing files during serialization
423 // because serialize may read existing files (when deserializing)
424 const allWrittenFiles = new Set();
425 const writeFile = async (name, content) => {
426 const file = name
427 ? join(this.fs, filename, `../${name}${extension}`)
428 : filename;
429 await new Promise((resolve, reject) => {
430 let stream = this.fs.createWriteStream(file + "_");
431 let compression;
432 if (file.endsWith(".gz")) {
433 compression = createGzip({
434 chunkSize: COMPRESSION_CHUNK_SIZE,
435 level: zConstants.Z_BEST_SPEED
436 });
437 } else if (file.endsWith(".br")) {
438 compression = createBrotliCompress({
439 chunkSize: COMPRESSION_CHUNK_SIZE,
440 params: {
441 [zConstants.BROTLI_PARAM_MODE]: zConstants.BROTLI_MODE_TEXT,
442 [zConstants.BROTLI_PARAM_QUALITY]: 2,
443 [zConstants.BROTLI_PARAM_DISABLE_LITERAL_CONTEXT_MODELING]: true,
444 [zConstants.BROTLI_PARAM_SIZE_HINT]: content.reduce(
445 (size, b) => size + b.length,
446 0
447 )
448 }
449 });
450 }
451 if (compression) {
452 pipeline(compression, stream, reject);
453 stream = compression;
454 stream.on("finish", () => resolve());
455 } else {
456 stream.on("error", err => reject(err));
457 stream.on("finish", () => resolve());
458 }
459 for (const b of content) stream.write(b);
460 stream.end();
461 });
462 if (name) allWrittenFiles.add(file);
463 };
464
465 resolve(
466 serialize(this, data, false, writeFile, this._hashFunction).then(
467 async ({ backgroundJob }) => {
468 await backgroundJob;
469
470 // Rename the index file to disallow access during inconsistent file state
471 await new Promise(resolve =>
472 this.fs.rename(filename, filename + ".old", err => {
473 resolve();
474 })
475 );
476
477 // update all written files
478 await Promise.all(
479 Array.from(
480 allWrittenFiles,
481 file =>
482 new Promise((resolve, reject) => {
483 this.fs.rename(file + "_", file, err => {
484 if (err) return reject(err);
485 resolve();
486 });
487 })
488 )
489 );
490
491 // As final step automatically update the index file to have a consistent pack again
492 await new Promise(resolve => {
493 this.fs.rename(filename + "_", filename, err => {
494 if (err) return reject(err);
495 resolve();
496 });
497 });
498 return /** @type {true} */ (true);
499 }
500 )
501 );
502 });
503 });
504 }
505
506 /**
507 * @param {SerializedType} data data
508 * @param {Object} context context object
509 * @returns {DeserializedType|Promise<DeserializedType>} deserialized data
510 */
511 deserialize(data, context) {
512 const { filename, extension = "" } = context;
513 const readFile = name =>
514 new Promise((resolve, reject) => {
515 const file = name
516 ? join(this.fs, filename, `../${name}${extension}`)
517 : filename;
518 this.fs.stat(file, (err, stats) => {
519 if (err) {
520 reject(err);
521 return;
522 }
523 let remaining = /** @type {number} */ (stats.size);
524 let currentBuffer;
525 let currentBufferUsed;
526 const buf = [];
527 let decompression;
528 if (file.endsWith(".gz")) {
529 decompression = createGunzip({
530 chunkSize: DECOMPRESSION_CHUNK_SIZE
531 });
532 } else if (file.endsWith(".br")) {
533 decompression = createBrotliDecompress({
534 chunkSize: DECOMPRESSION_CHUNK_SIZE
535 });
536 }
537 if (decompression) {
538 let newResolve, newReject;
539 resolve(
540 Promise.all([
541 new Promise((rs, rj) => {
542 newResolve = rs;
543 newReject = rj;
544 }),
545 new Promise((resolve, reject) => {
546 decompression.on("data", chunk => buf.push(chunk));
547 decompression.on("end", () => resolve());
548 decompression.on("error", err => reject(err));
549 })
550 ]).then(() => buf)
551 );
552 resolve = newResolve;
553 reject = newReject;
554 }
555 this.fs.open(file, "r", (err, fd) => {
556 if (err) {
557 reject(err);
558 return;
559 }
560 const read = () => {
561 if (currentBuffer === undefined) {
562 currentBuffer = Buffer.allocUnsafeSlow(
563 Math.min(
564 constants.MAX_LENGTH,
565 remaining,
566 decompression ? DECOMPRESSION_CHUNK_SIZE : Infinity
567 )
568 );
569 currentBufferUsed = 0;
570 }
571 let readBuffer = currentBuffer;
572 let readOffset = currentBufferUsed;
573 let readLength = currentBuffer.length - currentBufferUsed;
574 // values passed to fs.read must be valid int32 values
575 if (readOffset > 0x7fffffff) {
576 readBuffer = currentBuffer.slice(readOffset);
577 readOffset = 0;
578 }
579 if (readLength > 0x7fffffff) {
580 readLength = 0x7fffffff;
581 }
582 this.fs.read(
583 fd,
584 readBuffer,
585 readOffset,
586 readLength,
587 null,
588 (err, bytesRead) => {
589 if (err) {
590 this.fs.close(fd, () => {
591 reject(err);
592 });
593 return;
594 }
595 currentBufferUsed += bytesRead;
596 remaining -= bytesRead;
597 if (currentBufferUsed === currentBuffer.length) {
598 if (decompression) {
599 decompression.write(currentBuffer);
600 } else {
601 buf.push(currentBuffer);
602 }
603 currentBuffer = undefined;
604 if (remaining === 0) {
605 if (decompression) {
606 decompression.end();
607 }
608 this.fs.close(fd, err => {
609 if (err) {
610 reject(err);
611 return;
612 }
613 resolve(buf);
614 });
615 return;
616 }
617 }
618 read();
619 }
620 );
621 };
622 read();
623 });
624 });
625 });
626 return deserialize(this, false, readFile);
627 }
628}
629
630module.exports = FileMiddleware;