UNPKG

29.2 kBJavaScriptView Raw
1/*!
2 * Copyright 2022 Google LLC. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
17 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
18 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
19 return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
20};
21var _XMLMultiPartUploadHelper_instances, _XMLMultiPartUploadHelper_setGoogApiClientHeaders, _XMLMultiPartUploadHelper_handleErrorResponse;
22import { FileExceptionMessages, RequestError, } from './file.js';
23import pLimit from 'p-limit';
24import * as path from 'path';
25import { createReadStream, promises as fsp } from 'fs';
26import { CRC32C } from './crc32c.js';
27import { GoogleAuth } from 'google-auth-library';
28import { XMLParser, XMLBuilder } from 'fast-xml-parser';
29import AsyncRetry from 'async-retry';
30import { createHash } from 'crypto';
31import { GCCL_GCS_CMD_KEY } from './nodejs-common/util.js';
32import { getRuntimeTrackingString, getUserAgentString } from './util.js';
33// eslint-disable-next-line @typescript-eslint/ban-ts-comment
34// @ts-ignore
35import { getPackageJSON } from './package-json-helper.cjs';
36const packageJson = getPackageJSON();
37/**
38 * Default number of concurrently executing promises to use when calling uploadManyFiles.
39 *
40 */
41const DEFAULT_PARALLEL_UPLOAD_LIMIT = 5;
42/**
43 * Default number of concurrently executing promises to use when calling downloadManyFiles.
44 *
45 */
46const DEFAULT_PARALLEL_DOWNLOAD_LIMIT = 5;
47/**
48 * Default number of concurrently executing promises to use when calling downloadFileInChunks.
49 *
50 */
51const DEFAULT_PARALLEL_CHUNKED_DOWNLOAD_LIMIT = 5;
52/**
53 * The minimum size threshold in bytes at which to apply a chunked download strategy when calling downloadFileInChunks.
54 *
55 */
56const DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD = 32 * 1024 * 1024;
57/**
58 * The chunk size in bytes to use when calling downloadFileInChunks.
59 *
60 */
61const DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024;
62/**
63 * The chunk size in bytes to use when calling uploadFileInChunks.
64 *
65 */
66const UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024;
67/**
68 * Default number of concurrently executing promises to use when calling uploadFileInChunks.
69 *
70 */
71const DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT = 5;
72const EMPTY_REGEX = '(?:)';
73/**
74 * The `gccl-gcs-cmd` value for the `X-Goog-API-Client` header.
75 * Example: `gccl-gcs-cmd/tm.upload_many`
76 *
77 * @see {@link GCCL_GCS_CMD}.
78 * @see {@link GCCL_GCS_CMD_KEY}.
79 */
80const GCCL_GCS_CMD_FEATURE = {
81 UPLOAD_MANY: 'tm.upload_many',
82 DOWNLOAD_MANY: 'tm.download_many',
83 UPLOAD_SHARDED: 'tm.upload_sharded',
84 DOWNLOAD_SHARDED: 'tm.download_sharded',
85};
86const defaultMultiPartGenerator = (bucket, fileName, uploadId, partsMap) => {
87 return new XMLMultiPartUploadHelper(bucket, fileName, uploadId, partsMap);
88};
89export class MultiPartUploadError extends Error {
90 constructor(message, uploadId, partsMap) {
91 super(message);
92 this.uploadId = uploadId;
93 this.partsMap = partsMap;
94 }
95}
96/**
97 * Class representing an implementation of MPU in the XML API. This class is not meant for public usage.
98 *
99 * @private
100 *
101 */
102class XMLMultiPartUploadHelper {
103 constructor(bucket, fileName, uploadId, partsMap) {
104 _XMLMultiPartUploadHelper_instances.add(this);
105 this.authClient = bucket.storage.authClient || new GoogleAuth();
106 this.uploadId = uploadId || '';
107 this.bucket = bucket;
108 this.fileName = fileName;
109 this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`;
110 this.xmlBuilder = new XMLBuilder({ arrayNodeName: 'Part' });
111 this.xmlParser = new XMLParser();
112 this.partsMap = partsMap || new Map();
113 this.retryOptions = {
114 retries: this.bucket.storage.retryOptions.maxRetries,
115 factor: this.bucket.storage.retryOptions.retryDelayMultiplier,
116 maxTimeout: this.bucket.storage.retryOptions.maxRetryDelay * 1000,
117 maxRetryTime: this.bucket.storage.retryOptions.totalTimeout * 1000,
118 };
119 }
120 /**
121 * Initiates a multipart upload (MPU) to the XML API and stores the resultant upload id.
122 *
123 * @returns {Promise<void>}
124 */
125 async initiateUpload(headers = {}) {
126 const url = `${this.baseUrl}?uploads`;
127 return AsyncRetry(async (bail) => {
128 try {
129 const res = await this.authClient.request({
130 headers: __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_setGoogApiClientHeaders).call(this, headers),
131 method: 'POST',
132 url,
133 });
134 if (res.data && res.data.error) {
135 throw res.data.error;
136 }
137 const parsedXML = this.xmlParser.parse(res.data);
138 this.uploadId = parsedXML.InitiateMultipartUploadResult.UploadId;
139 }
140 catch (e) {
141 __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
142 }
143 }, this.retryOptions);
144 }
145 /**
146 * Uploads the provided chunk of data to the XML API using the previously created upload id.
147 *
148 * @param {number} partNumber the sequence number of this chunk.
149 * @param {Buffer} chunk the chunk of data to be uploaded.
150 * @param {string | false} validation whether or not to include the md5 hash in the headers to cause the server
151 * to validate the chunk was not corrupted.
152 * @returns {Promise<void>}
153 */
154 async uploadPart(partNumber, chunk, validation) {
155 const url = `${this.baseUrl}?partNumber=${partNumber}&uploadId=${this.uploadId}`;
156 let headers = __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_setGoogApiClientHeaders).call(this);
157 if (validation === 'md5') {
158 const hash = createHash('md5').update(chunk).digest('base64');
159 headers = {
160 'Content-MD5': hash,
161 };
162 }
163 return AsyncRetry(async (bail) => {
164 try {
165 const res = await this.authClient.request({
166 url,
167 method: 'PUT',
168 body: chunk,
169 headers,
170 });
171 if (res.data && res.data.error) {
172 throw res.data.error;
173 }
174 this.partsMap.set(partNumber, res.headers['etag']);
175 }
176 catch (e) {
177 __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
178 }
179 }, this.retryOptions);
180 }
181 /**
182 * Sends the final request of the MPU to tell GCS the upload is now complete.
183 *
184 * @returns {Promise<void>}
185 */
186 async completeUpload() {
187 const url = `${this.baseUrl}?uploadId=${this.uploadId}`;
188 const sortedMap = new Map([...this.partsMap.entries()].sort((a, b) => a[0] - b[0]));
189 const parts = [];
190 for (const entry of sortedMap.entries()) {
191 parts.push({ PartNumber: entry[0], ETag: entry[1] });
192 }
193 const body = `<CompleteMultipartUpload>${this.xmlBuilder.build(parts)}</CompleteMultipartUpload>`;
194 return AsyncRetry(async (bail) => {
195 try {
196 const res = await this.authClient.request({
197 headers: __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_setGoogApiClientHeaders).call(this),
198 url,
199 method: 'POST',
200 body,
201 });
202 if (res.data && res.data.error) {
203 throw res.data.error;
204 }
205 return res;
206 }
207 catch (e) {
208 __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
209 return;
210 }
211 }, this.retryOptions);
212 }
213 /**
214 * Aborts an multipart upload that is in progress. Once aborted, any parts in the process of being uploaded fail,
215 * and future requests using the upload ID fail.
216 *
217 * @returns {Promise<void>}
218 */
219 async abortUpload() {
220 const url = `${this.baseUrl}?uploadId=${this.uploadId}`;
221 return AsyncRetry(async (bail) => {
222 try {
223 const res = await this.authClient.request({
224 url,
225 method: 'DELETE',
226 });
227 if (res.data && res.data.error) {
228 throw res.data.error;
229 }
230 }
231 catch (e) {
232 __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
233 return;
234 }
235 }, this.retryOptions);
236 }
237}
238_XMLMultiPartUploadHelper_instances = new WeakSet(), _XMLMultiPartUploadHelper_setGoogApiClientHeaders = function _XMLMultiPartUploadHelper_setGoogApiClientHeaders(headers = {}) {
239 let headerFound = false;
240 let userAgentFound = false;
241 for (const [key, value] of Object.entries(headers)) {
242 if (key.toLocaleLowerCase().trim() === 'x-goog-api-client') {
243 headerFound = true;
244 // Prepend command feature to value, if not already there
245 if (!value.includes(GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED)) {
246 headers[key] =
247 `${value} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
248 }
249 }
250 else if (key.toLocaleLowerCase().trim() === 'user-agent') {
251 userAgentFound = true;
252 }
253 }
254 // If the header isn't present, add it
255 if (!headerFound) {
256 headers['x-goog-api-client'] = `${getRuntimeTrackingString()} gccl/${packageJson.version} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
257 }
258 // If the User-Agent isn't present, add it
259 if (!userAgentFound) {
260 headers['User-Agent'] = getUserAgentString();
261 }
262 return headers;
263}, _XMLMultiPartUploadHelper_handleErrorResponse = function _XMLMultiPartUploadHelper_handleErrorResponse(err, bail) {
264 if (this.bucket.storage.retryOptions.autoRetry &&
265 this.bucket.storage.retryOptions.retryableErrorFn(err)) {
266 throw err;
267 }
268 else {
269 bail(err);
270 }
271};
272/**
273 * Create a TransferManager object to perform parallel transfer operations on a Cloud Storage bucket.
274 *
275 * @class
276 * @hideconstructor
277 *
278 * @param {Bucket} bucket A {@link Bucket} instance
279 *
280 */
281export class TransferManager {
282 constructor(bucket) {
283 this.bucket = bucket;
284 }
285 /**
286 * @typedef {object} UploadManyFilesOptions
287 * @property {number} [concurrencyLimit] The number of concurrently executing promises
288 * to use when uploading the files.
289 * @property {Function} [customDestinationBuilder] A fuction that will take the current path of a local file
290 * and return a string representing a custom path to be used to upload the file to GCS.
291 * @property {boolean} [skipIfExists] Do not upload the file if it already exists in
292 * the bucket. This will set the precondition ifGenerationMatch = 0.
293 * @property {string} [prefix] A prefix to append to all of the uploaded files.
294 * @property {object} [passthroughOptions] {@link UploadOptions} Options to be passed through
295 * to each individual upload operation.
296 *
297 */
298 /**
299 * Upload multiple files in parallel to the bucket. This is a convenience method
300 * that utilizes {@link Bucket#upload} to perform the upload.
301 *
302 * @param {array | string} [filePathsOrDirectory] An array of fully qualified paths to the files or a directory name.
303 * If a directory name is provided, the directory will be recursively walked and all files will be added to the upload list.
304 * to be uploaded to the bucket
305 * @param {UploadManyFilesOptions} [options] Configuration options.
306 * @returns {Promise<UploadResponse[]>}
307 *
308 * @example
309 * ```
310 * const {Storage} = require('@google-cloud/storage');
311 * const storage = new Storage();
312 * const bucket = storage.bucket('my-bucket');
313 * const transferManager = new TransferManager(bucket);
314 *
315 * //-
316 * // Upload multiple files in parallel.
317 * //-
318 * const response = await transferManager.uploadManyFiles(['/local/path/file1.txt, 'local/path/file2.txt']);
319 * // Your bucket now contains:
320 * // - "local/path/file1.txt" (with the contents of '/local/path/file1.txt')
321 * // - "local/path/file2.txt" (with the contents of '/local/path/file2.txt')
322 * const response = await transferManager.uploadManyFiles('/local/directory');
323 * // Your bucket will now contain all files contained in '/local/directory' maintaining the subdirectory structure.
324 * ```
325 *
326 */
327 async uploadManyFiles(filePathsOrDirectory, options = {}) {
328 var _a;
329 if (options.skipIfExists && ((_a = options.passthroughOptions) === null || _a === void 0 ? void 0 : _a.preconditionOpts)) {
330 options.passthroughOptions.preconditionOpts.ifGenerationMatch = 0;
331 }
332 else if (options.skipIfExists &&
333 options.passthroughOptions === undefined) {
334 options.passthroughOptions = {
335 preconditionOpts: {
336 ifGenerationMatch: 0,
337 },
338 };
339 }
340 const limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_UPLOAD_LIMIT);
341 const promises = [];
342 let allPaths = [];
343 if (!Array.isArray(filePathsOrDirectory)) {
344 for await (const curPath of this.getPathsFromDirectory(filePathsOrDirectory)) {
345 allPaths.push(curPath);
346 }
347 }
348 else {
349 allPaths = filePathsOrDirectory;
350 }
351 for (const filePath of allPaths) {
352 const stat = await fsp.lstat(filePath);
353 if (stat.isDirectory()) {
354 continue;
355 }
356 const passThroughOptionsCopy = {
357 ...options.passthroughOptions,
358 [GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.UPLOAD_MANY,
359 };
360 passThroughOptionsCopy.destination = options.customDestinationBuilder
361 ? options.customDestinationBuilder(filePath, options)
362 : filePath.split(path.sep).join(path.posix.sep);
363 if (options.prefix) {
364 passThroughOptionsCopy.destination = path.posix.join(...options.prefix.split(path.sep), passThroughOptionsCopy.destination);
365 }
366 promises.push(limit(() => this.bucket.upload(filePath, passThroughOptionsCopy)));
367 }
368 return Promise.all(promises);
369 }
370 /**
371 * @typedef {object} DownloadManyFilesOptions
372 * @property {number} [concurrencyLimit] The number of concurrently executing promises
373 * to use when downloading the files.
374 * @property {string} [prefix] A prefix to append to all of the downloaded files.
375 * @property {string} [stripPrefix] A prefix to remove from all of the downloaded files.
376 * @property {object} [passthroughOptions] {@link DownloadOptions} Options to be passed through
377 * to each individual download operation.
378 *
379 */
380 /**
381 * Download multiple files in parallel to the local filesystem. This is a convenience method
382 * that utilizes {@link File#download} to perform the download.
383 *
384 * @param {array | string} [filesOrFolder] An array of file name strings or file objects to be downloaded. If
385 * a string is provided this will be treated as a GCS prefix and all files with that prefix will be downloaded.
386 * @param {DownloadManyFilesOptions} [options] Configuration options. Setting options.prefix or options.stripPrefix
387 * or options.passthroughOptions.destination will cause the downloaded files to be written to the file system
388 * instead of being returned as a buffer.
389 * @returns {Promise<DownloadResponse[]>}
390 *
391 * @example
392 * ```
393 * const {Storage} = require('@google-cloud/storage');
394 * const storage = new Storage();
395 * const bucket = storage.bucket('my-bucket');
396 * const transferManager = new TransferManager(bucket);
397 *
398 * //-
399 * // Download multiple files in parallel.
400 * //-
401 * const response = await transferManager.downloadManyFiles(['file1.txt', 'file2.txt']);
402 * // The following files have been downloaded:
403 * // - "file1.txt" (with the contents from my-bucket.file1.txt)
404 * // - "file2.txt" (with the contents from my-bucket.file2.txt)
405 * const response = await transferManager.downloadManyFiles([bucket.File('file1.txt'), bucket.File('file2.txt')]);
406 * // The following files have been downloaded:
407 * // - "file1.txt" (with the contents from my-bucket.file1.txt)
408 * // - "file2.txt" (with the contents from my-bucket.file2.txt)
409 * const response = await transferManager.downloadManyFiles('test-folder');
410 * // All files with GCS prefix of 'test-folder' have been downloaded.
411 * ```
412 *
413 */
414 async downloadManyFiles(filesOrFolder, options = {}) {
415 const limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_DOWNLOAD_LIMIT);
416 const promises = [];
417 let files = [];
418 if (!Array.isArray(filesOrFolder)) {
419 const directoryFiles = await this.bucket.getFiles({
420 prefix: filesOrFolder,
421 });
422 files = directoryFiles[0];
423 }
424 else {
425 files = filesOrFolder.map(curFile => {
426 if (typeof curFile === 'string') {
427 return this.bucket.file(curFile);
428 }
429 return curFile;
430 });
431 }
432 const stripRegexString = options.stripPrefix
433 ? `^${options.stripPrefix}`
434 : EMPTY_REGEX;
435 const regex = new RegExp(stripRegexString, 'g');
436 for (const file of files) {
437 const passThroughOptionsCopy = {
438 ...options.passthroughOptions,
439 [GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_MANY,
440 };
441 if (options.prefix || passThroughOptionsCopy.destination) {
442 passThroughOptionsCopy.destination = path.join(options.prefix || '', passThroughOptionsCopy.destination || '', file.name);
443 }
444 if (options.stripPrefix) {
445 passThroughOptionsCopy.destination = file.name.replace(regex, '');
446 }
447 promises.push(limit(async () => {
448 const destination = passThroughOptionsCopy.destination;
449 if (destination && destination.endsWith(path.sep)) {
450 await fsp.mkdir(destination, { recursive: true });
451 return Promise.resolve([
452 Buffer.alloc(0),
453 ]);
454 }
455 return file.download(passThroughOptionsCopy);
456 }));
457 }
458 return Promise.all(promises);
459 }
460 /**
461 * @typedef {object} DownloadFileInChunksOptions
462 * @property {number} [concurrencyLimit] The number of concurrently executing promises
463 * to use when downloading the file.
464 * @property {number} [chunkSizeBytes] The size in bytes of each chunk to be downloaded.
465 * @property {string | boolean} [validation] Whether or not to perform a CRC32C validation check when download is complete.
466 * @property {boolean} [noReturnData] Whether or not to return the downloaded data. A `true` value here would be useful for files with a size that will not fit into memory.
467 *
468 */
469 /**
470 * Download a large file in chunks utilizing parallel download operations. This is a convenience method
471 * that utilizes {@link File#download} to perform the download.
472 *
473 * @param {File | string} fileOrName {@link File} to download.
474 * @param {DownloadFileInChunksOptions} [options] Configuration options.
475 * @returns {Promise<void | DownloadResponse>}
476 *
477 * @example
478 * ```
479 * const {Storage} = require('@google-cloud/storage');
480 * const storage = new Storage();
481 * const bucket = storage.bucket('my-bucket');
482 * const transferManager = new TransferManager(bucket);
483 *
484 * //-
485 * // Download a large file in chunks utilizing parallel operations.
486 * //-
487 * const response = await transferManager.downloadFileInChunks(bucket.file('large-file.txt');
488 * // Your local directory now contains:
489 * // - "large-file.txt" (with the contents from my-bucket.large-file.txt)
490 * ```
491 *
492 */
493 async downloadFileInChunks(fileOrName, options = {}) {
494 let chunkSize = options.chunkSizeBytes || DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE;
495 let limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_CHUNKED_DOWNLOAD_LIMIT);
496 const noReturnData = Boolean(options.noReturnData);
497 const promises = [];
498 const file = typeof fileOrName === 'string'
499 ? this.bucket.file(fileOrName)
500 : fileOrName;
501 const fileInfo = await file.get();
502 const size = parseInt(fileInfo[0].metadata.size.toString());
503 // If the file size does not meet the threshold download it as a single chunk.
504 if (size < DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD) {
505 limit = pLimit(1);
506 chunkSize = size;
507 }
508 let start = 0;
509 const filePath = options.destination || path.basename(file.name);
510 const fileToWrite = await fsp.open(filePath, 'w');
511 while (start < size) {
512 const chunkStart = start;
513 let chunkEnd = start + chunkSize - 1;
514 chunkEnd = chunkEnd > size ? size : chunkEnd;
515 promises.push(limit(async () => {
516 const resp = await file.download({
517 start: chunkStart,
518 end: chunkEnd,
519 [GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_SHARDED,
520 });
521 const result = await fileToWrite.write(resp[0], 0, resp[0].length, chunkStart);
522 if (noReturnData)
523 return;
524 return result.buffer;
525 }));
526 start += chunkSize;
527 }
528 let chunks;
529 try {
530 chunks = await Promise.all(promises);
531 }
532 finally {
533 await fileToWrite.close();
534 }
535 if (options.validation === 'crc32c' && fileInfo[0].metadata.crc32c) {
536 const downloadedCrc32C = await CRC32C.fromFile(filePath);
537 if (!downloadedCrc32C.validate(fileInfo[0].metadata.crc32c)) {
538 const mismatchError = new RequestError(FileExceptionMessages.DOWNLOAD_MISMATCH);
539 mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';
540 throw mismatchError;
541 }
542 }
543 if (noReturnData)
544 return;
545 return [Buffer.concat(chunks, size)];
546 }
547 /**
548 * @typedef {object} UploadFileInChunksOptions
549 * @property {number} [concurrencyLimit] The number of concurrently executing promises
550 * to use when uploading the file.
551 * @property {number} [chunkSizeBytes] The size in bytes of each chunk to be uploaded.
552 * @property {string} [uploadName] Name of the file when saving to GCS. If ommitted the name is taken from the file path.
553 * @property {number} [maxQueueSize] The number of chunks to be uploaded to hold in memory concurrently. If not specified
554 * defaults to the specified concurrency limit.
555 * @property {string} [uploadId] If specified attempts to resume a previous upload.
556 * @property {Map} [partsMap] If specified alongside uploadId, attempts to resume a previous upload from the last chunk
557 * specified in partsMap
558 * @property {object} [headers] headers to be sent when initiating the multipart upload.
559 * See {@link https://cloud.google.com/storage/docs/xml-api/post-object-multipart#request_headers| Request Headers: Initiate a Multipart Upload}
560 * @property {boolean} [autoAbortFailure] boolean to indicate if an in progress upload session will be automatically aborted upon failure. If not set,
561 * failures will be automatically aborted.
562 *
563 */
564 /**
565 * Upload a large file in chunks utilizing parallel upload opertions. If the upload fails, an uploadId and
566 * map containing all the successfully uploaded parts will be returned to the caller. These arguments can be used to
567 * resume the upload.
568 *
569 * @param {string} [filePath] The path of the file to be uploaded
570 * @param {UploadFileInChunksOptions} [options] Configuration options.
571 * @param {MultiPartHelperGenerator} [generator] A function that will return a type that implements the MPU interface. Most users will not need to use this.
572 * @returns {Promise<void>} If successful a promise resolving to void, otherwise a error containing the message, uploadid, and parts map.
573 *
574 * @example
575 * ```
576 * const {Storage} = require('@google-cloud/storage');
577 * const storage = new Storage();
578 * const bucket = storage.bucket('my-bucket');
579 * const transferManager = new TransferManager(bucket);
580 *
581 * //-
582 * // Upload a large file in chunks utilizing parallel operations.
583 * //-
584 * const response = await transferManager.uploadFileInChunks('large-file.txt');
585 * // Your bucket now contains:
586 * // - "large-file.txt"
587 * ```
588 *
589 *
590 */
591 async uploadFileInChunks(filePath, options = {}, generator = defaultMultiPartGenerator) {
592 const chunkSize = options.chunkSizeBytes || UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE;
593 const limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT);
594 const maxQueueSize = options.maxQueueSize ||
595 options.concurrencyLimit ||
596 DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT;
597 const fileName = options.uploadName || path.basename(filePath);
598 const mpuHelper = generator(this.bucket, fileName, options.uploadId, options.partsMap);
599 let partNumber = 1;
600 let promises = [];
601 try {
602 if (options.uploadId === undefined) {
603 await mpuHelper.initiateUpload(options.headers);
604 }
605 const startOrResumptionByte = mpuHelper.partsMap.size * chunkSize;
606 const readStream = createReadStream(filePath, {
607 highWaterMark: chunkSize,
608 start: startOrResumptionByte,
609 });
610 // p-limit only limits the number of running promises. We do not want to hold an entire
611 // large file in memory at once so promises acts a queue that will hold only maxQueueSize in memory.
612 for await (const curChunk of readStream) {
613 if (promises.length >= maxQueueSize) {
614 await Promise.all(promises);
615 promises = [];
616 }
617 promises.push(limit(() => mpuHelper.uploadPart(partNumber++, curChunk, options.validation)));
618 }
619 await Promise.all(promises);
620 return await mpuHelper.completeUpload();
621 }
622 catch (e) {
623 if ((options.autoAbortFailure === undefined || options.autoAbortFailure) &&
624 mpuHelper.uploadId) {
625 try {
626 await mpuHelper.abortUpload();
627 return;
628 }
629 catch (e) {
630 throw new MultiPartUploadError(e.message, mpuHelper.uploadId, mpuHelper.partsMap);
631 }
632 }
633 throw new MultiPartUploadError(e.message, mpuHelper.uploadId, mpuHelper.partsMap);
634 }
635 }
636 async *getPathsFromDirectory(directory) {
637 const filesAndSubdirectories = await fsp.readdir(directory, {
638 withFileTypes: true,
639 });
640 for (const curFileOrDirectory of filesAndSubdirectories) {
641 const fullPath = path.join(directory, curFileOrDirectory.name);
642 curFileOrDirectory.isDirectory()
643 ? yield* this.getPathsFromDirectory(fullPath)
644 : yield fullPath;
645 }
646 }
647}