1 | /*!
|
2 | * Copyright 2022 Google LLC. All Rights Reserved.
|
3 | *
|
4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | * you may not use this file except in compliance with the License.
|
6 | * You may obtain a copy of the License at
|
7 | *
|
8 | * http://www.apache.org/licenses/LICENSE-2.0
|
9 | *
|
10 | * Unless required by applicable law or agreed to in writing, software
|
11 | * distributed under the License is distributed on an "AS IS" BASIS,
|
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | * See the License for the specific language governing permissions and
|
14 | * limitations under the License.
|
15 | */
|
16 | var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
|
17 | if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
|
18 | if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
|
19 | return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
|
20 | };
|
21 | var _XMLMultiPartUploadHelper_instances, _XMLMultiPartUploadHelper_setGoogApiClientHeaders, _XMLMultiPartUploadHelper_handleErrorResponse;
|
22 | import { FileExceptionMessages, RequestError, } from './file.js';
|
23 | import pLimit from 'p-limit';
|
24 | import * as path from 'path';
|
25 | import { createReadStream, promises as fsp } from 'fs';
|
26 | import { CRC32C } from './crc32c.js';
|
27 | import { GoogleAuth } from 'google-auth-library';
|
28 | import { XMLParser, XMLBuilder } from 'fast-xml-parser';
|
29 | import AsyncRetry from 'async-retry';
|
30 | import { createHash } from 'crypto';
|
31 | import { GCCL_GCS_CMD_KEY } from './nodejs-common/util.js';
|
32 | import { getRuntimeTrackingString, getUserAgentString } from './util.js';
|
33 | // eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
34 | // @ts-ignore
|
35 | import { getPackageJSON } from './package-json-helper.cjs';
|
36 | const packageJson = getPackageJSON();
|
37 | /**
|
38 | * Default number of concurrently executing promises to use when calling uploadManyFiles.
|
39 | *
|
40 | */
|
41 | const DEFAULT_PARALLEL_UPLOAD_LIMIT = 5;
|
42 | /**
|
43 | * Default number of concurrently executing promises to use when calling downloadManyFiles.
|
44 | *
|
45 | */
|
46 | const DEFAULT_PARALLEL_DOWNLOAD_LIMIT = 5;
|
47 | /**
|
48 | * Default number of concurrently executing promises to use when calling downloadFileInChunks.
|
49 | *
|
50 | */
|
51 | const DEFAULT_PARALLEL_CHUNKED_DOWNLOAD_LIMIT = 5;
|
52 | /**
|
53 | * The minimum size threshold in bytes at which to apply a chunked download strategy when calling downloadFileInChunks.
|
54 | *
|
55 | */
|
56 | const DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD = 32 * 1024 * 1024;
|
57 | /**
|
58 | * The chunk size in bytes to use when calling downloadFileInChunks.
|
59 | *
|
60 | */
|
61 | const DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024;
|
62 | /**
|
63 | * The chunk size in bytes to use when calling uploadFileInChunks.
|
64 | *
|
65 | */
|
66 | const UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024;
|
67 | /**
|
68 | * Default number of concurrently executing promises to use when calling uploadFileInChunks.
|
69 | *
|
70 | */
|
71 | const DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT = 5;
|
72 | const EMPTY_REGEX = '(?:)';
|
73 | /**
|
74 | * The `gccl-gcs-cmd` value for the `X-Goog-API-Client` header.
|
75 | * Example: `gccl-gcs-cmd/tm.upload_many`
|
76 | *
|
77 | * @see {@link GCCL_GCS_CMD}.
|
78 | * @see {@link GCCL_GCS_CMD_KEY}.
|
79 | */
|
80 | const GCCL_GCS_CMD_FEATURE = {
|
81 | UPLOAD_MANY: 'tm.upload_many',
|
82 | DOWNLOAD_MANY: 'tm.download_many',
|
83 | UPLOAD_SHARDED: 'tm.upload_sharded',
|
84 | DOWNLOAD_SHARDED: 'tm.download_sharded',
|
85 | };
|
86 | const defaultMultiPartGenerator = (bucket, fileName, uploadId, partsMap) => {
|
87 | return new XMLMultiPartUploadHelper(bucket, fileName, uploadId, partsMap);
|
88 | };
|
89 | export class MultiPartUploadError extends Error {
|
90 | constructor(message, uploadId, partsMap) {
|
91 | super(message);
|
92 | this.uploadId = uploadId;
|
93 | this.partsMap = partsMap;
|
94 | }
|
95 | }
|
96 | /**
|
97 | * Class representing an implementation of MPU in the XML API. This class is not meant for public usage.
|
98 | *
|
99 | * @private
|
100 | *
|
101 | */
|
102 | class XMLMultiPartUploadHelper {
|
103 | constructor(bucket, fileName, uploadId, partsMap) {
|
104 | _XMLMultiPartUploadHelper_instances.add(this);
|
105 | this.authClient = bucket.storage.authClient || new GoogleAuth();
|
106 | this.uploadId = uploadId || '';
|
107 | this.bucket = bucket;
|
108 | this.fileName = fileName;
|
109 | this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`;
|
110 | this.xmlBuilder = new XMLBuilder({ arrayNodeName: 'Part' });
|
111 | this.xmlParser = new XMLParser();
|
112 | this.partsMap = partsMap || new Map();
|
113 | this.retryOptions = {
|
114 | retries: this.bucket.storage.retryOptions.maxRetries,
|
115 | factor: this.bucket.storage.retryOptions.retryDelayMultiplier,
|
116 | maxTimeout: this.bucket.storage.retryOptions.maxRetryDelay * 1000,
|
117 | maxRetryTime: this.bucket.storage.retryOptions.totalTimeout * 1000,
|
118 | };
|
119 | }
|
120 | /**
|
121 | * Initiates a multipart upload (MPU) to the XML API and stores the resultant upload id.
|
122 | *
|
123 | * @returns {Promise<void>}
|
124 | */
|
125 | async initiateUpload(headers = {}) {
|
126 | const url = `${this.baseUrl}?uploads`;
|
127 | return AsyncRetry(async (bail) => {
|
128 | try {
|
129 | const res = await this.authClient.request({
|
130 | headers: __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_setGoogApiClientHeaders).call(this, headers),
|
131 | method: 'POST',
|
132 | url,
|
133 | });
|
134 | if (res.data && res.data.error) {
|
135 | throw res.data.error;
|
136 | }
|
137 | const parsedXML = this.xmlParser.parse(res.data);
|
138 | this.uploadId = parsedXML.InitiateMultipartUploadResult.UploadId;
|
139 | }
|
140 | catch (e) {
|
141 | __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
|
142 | }
|
143 | }, this.retryOptions);
|
144 | }
|
145 | /**
|
146 | * Uploads the provided chunk of data to the XML API using the previously created upload id.
|
147 | *
|
148 | * @param {number} partNumber the sequence number of this chunk.
|
149 | * @param {Buffer} chunk the chunk of data to be uploaded.
|
150 | * @param {string | false} validation whether or not to include the md5 hash in the headers to cause the server
|
151 | * to validate the chunk was not corrupted.
|
152 | * @returns {Promise<void>}
|
153 | */
|
154 | async uploadPart(partNumber, chunk, validation) {
|
155 | const url = `${this.baseUrl}?partNumber=${partNumber}&uploadId=${this.uploadId}`;
|
156 | let headers = __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_setGoogApiClientHeaders).call(this);
|
157 | if (validation === 'md5') {
|
158 | const hash = createHash('md5').update(chunk).digest('base64');
|
159 | headers = {
|
160 | 'Content-MD5': hash,
|
161 | };
|
162 | }
|
163 | return AsyncRetry(async (bail) => {
|
164 | try {
|
165 | const res = await this.authClient.request({
|
166 | url,
|
167 | method: 'PUT',
|
168 | body: chunk,
|
169 | headers,
|
170 | });
|
171 | if (res.data && res.data.error) {
|
172 | throw res.data.error;
|
173 | }
|
174 | this.partsMap.set(partNumber, res.headers['etag']);
|
175 | }
|
176 | catch (e) {
|
177 | __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
|
178 | }
|
179 | }, this.retryOptions);
|
180 | }
|
181 | /**
|
182 | * Sends the final request of the MPU to tell GCS the upload is now complete.
|
183 | *
|
184 | * @returns {Promise<void>}
|
185 | */
|
186 | async completeUpload() {
|
187 | const url = `${this.baseUrl}?uploadId=${this.uploadId}`;
|
188 | const sortedMap = new Map([...this.partsMap.entries()].sort((a, b) => a[0] - b[0]));
|
189 | const parts = [];
|
190 | for (const entry of sortedMap.entries()) {
|
191 | parts.push({ PartNumber: entry[0], ETag: entry[1] });
|
192 | }
|
193 | const body = `<CompleteMultipartUpload>${this.xmlBuilder.build(parts)}</CompleteMultipartUpload>`;
|
194 | return AsyncRetry(async (bail) => {
|
195 | try {
|
196 | const res = await this.authClient.request({
|
197 | headers: __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_setGoogApiClientHeaders).call(this),
|
198 | url,
|
199 | method: 'POST',
|
200 | body,
|
201 | });
|
202 | if (res.data && res.data.error) {
|
203 | throw res.data.error;
|
204 | }
|
205 | return res;
|
206 | }
|
207 | catch (e) {
|
208 | __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
|
209 | return;
|
210 | }
|
211 | }, this.retryOptions);
|
212 | }
|
213 | /**
|
214 | * Aborts an multipart upload that is in progress. Once aborted, any parts in the process of being uploaded fail,
|
215 | * and future requests using the upload ID fail.
|
216 | *
|
217 | * @returns {Promise<void>}
|
218 | */
|
219 | async abortUpload() {
|
220 | const url = `${this.baseUrl}?uploadId=${this.uploadId}`;
|
221 | return AsyncRetry(async (bail) => {
|
222 | try {
|
223 | const res = await this.authClient.request({
|
224 | url,
|
225 | method: 'DELETE',
|
226 | });
|
227 | if (res.data && res.data.error) {
|
228 | throw res.data.error;
|
229 | }
|
230 | }
|
231 | catch (e) {
|
232 | __classPrivateFieldGet(this, _XMLMultiPartUploadHelper_instances, "m", _XMLMultiPartUploadHelper_handleErrorResponse).call(this, e, bail);
|
233 | return;
|
234 | }
|
235 | }, this.retryOptions);
|
236 | }
|
237 | }
|
238 | _XMLMultiPartUploadHelper_instances = new WeakSet(), _XMLMultiPartUploadHelper_setGoogApiClientHeaders = function _XMLMultiPartUploadHelper_setGoogApiClientHeaders(headers = {}) {
|
239 | let headerFound = false;
|
240 | let userAgentFound = false;
|
241 | for (const [key, value] of Object.entries(headers)) {
|
242 | if (key.toLocaleLowerCase().trim() === 'x-goog-api-client') {
|
243 | headerFound = true;
|
244 | // Prepend command feature to value, if not already there
|
245 | if (!value.includes(GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED)) {
|
246 | headers[key] =
|
247 | `${value} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
|
248 | }
|
249 | }
|
250 | else if (key.toLocaleLowerCase().trim() === 'user-agent') {
|
251 | userAgentFound = true;
|
252 | }
|
253 | }
|
254 | // If the header isn't present, add it
|
255 | if (!headerFound) {
|
256 | headers['x-goog-api-client'] = `${getRuntimeTrackingString()} gccl/${packageJson.version} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
|
257 | }
|
258 | // If the User-Agent isn't present, add it
|
259 | if (!userAgentFound) {
|
260 | headers['User-Agent'] = getUserAgentString();
|
261 | }
|
262 | return headers;
|
263 | }, _XMLMultiPartUploadHelper_handleErrorResponse = function _XMLMultiPartUploadHelper_handleErrorResponse(err, bail) {
|
264 | if (this.bucket.storage.retryOptions.autoRetry &&
|
265 | this.bucket.storage.retryOptions.retryableErrorFn(err)) {
|
266 | throw err;
|
267 | }
|
268 | else {
|
269 | bail(err);
|
270 | }
|
271 | };
|
272 | /**
|
273 | * Create a TransferManager object to perform parallel transfer operations on a Cloud Storage bucket.
|
274 | *
|
275 | * @class
|
276 | * @hideconstructor
|
277 | *
|
278 | * @param {Bucket} bucket A {@link Bucket} instance
|
279 | *
|
280 | */
|
281 | export class TransferManager {
|
282 | constructor(bucket) {
|
283 | this.bucket = bucket;
|
284 | }
|
285 | /**
|
286 | * @typedef {object} UploadManyFilesOptions
|
287 | * @property {number} [concurrencyLimit] The number of concurrently executing promises
|
288 | * to use when uploading the files.
|
289 | * @property {Function} [customDestinationBuilder] A fuction that will take the current path of a local file
|
290 | * and return a string representing a custom path to be used to upload the file to GCS.
|
291 | * @property {boolean} [skipIfExists] Do not upload the file if it already exists in
|
292 | * the bucket. This will set the precondition ifGenerationMatch = 0.
|
293 | * @property {string} [prefix] A prefix to append to all of the uploaded files.
|
294 | * @property {object} [passthroughOptions] {@link UploadOptions} Options to be passed through
|
295 | * to each individual upload operation.
|
296 | *
|
297 | */
|
298 | /**
|
299 | * Upload multiple files in parallel to the bucket. This is a convenience method
|
300 | * that utilizes {@link Bucket#upload} to perform the upload.
|
301 | *
|
302 | * @param {array | string} [filePathsOrDirectory] An array of fully qualified paths to the files or a directory name.
|
303 | * If a directory name is provided, the directory will be recursively walked and all files will be added to the upload list.
|
304 | * to be uploaded to the bucket
|
305 | * @param {UploadManyFilesOptions} [options] Configuration options.
|
306 | * @returns {Promise<UploadResponse[]>}
|
307 | *
|
308 | * @example
|
309 | * ```
|
310 | * const {Storage} = require('@google-cloud/storage');
|
311 | * const storage = new Storage();
|
312 | * const bucket = storage.bucket('my-bucket');
|
313 | * const transferManager = new TransferManager(bucket);
|
314 | *
|
315 | * //-
|
316 | * // Upload multiple files in parallel.
|
317 | * //-
|
318 | * const response = await transferManager.uploadManyFiles(['/local/path/file1.txt, 'local/path/file2.txt']);
|
319 | * // Your bucket now contains:
|
320 | * // - "local/path/file1.txt" (with the contents of '/local/path/file1.txt')
|
321 | * // - "local/path/file2.txt" (with the contents of '/local/path/file2.txt')
|
322 | * const response = await transferManager.uploadManyFiles('/local/directory');
|
323 | * // Your bucket will now contain all files contained in '/local/directory' maintaining the subdirectory structure.
|
324 | * ```
|
325 | *
|
326 | */
|
327 | async uploadManyFiles(filePathsOrDirectory, options = {}) {
|
328 | var _a;
|
329 | if (options.skipIfExists && ((_a = options.passthroughOptions) === null || _a === void 0 ? void 0 : _a.preconditionOpts)) {
|
330 | options.passthroughOptions.preconditionOpts.ifGenerationMatch = 0;
|
331 | }
|
332 | else if (options.skipIfExists &&
|
333 | options.passthroughOptions === undefined) {
|
334 | options.passthroughOptions = {
|
335 | preconditionOpts: {
|
336 | ifGenerationMatch: 0,
|
337 | },
|
338 | };
|
339 | }
|
340 | const limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_UPLOAD_LIMIT);
|
341 | const promises = [];
|
342 | let allPaths = [];
|
343 | if (!Array.isArray(filePathsOrDirectory)) {
|
344 | for await (const curPath of this.getPathsFromDirectory(filePathsOrDirectory)) {
|
345 | allPaths.push(curPath);
|
346 | }
|
347 | }
|
348 | else {
|
349 | allPaths = filePathsOrDirectory;
|
350 | }
|
351 | for (const filePath of allPaths) {
|
352 | const stat = await fsp.lstat(filePath);
|
353 | if (stat.isDirectory()) {
|
354 | continue;
|
355 | }
|
356 | const passThroughOptionsCopy = {
|
357 | ...options.passthroughOptions,
|
358 | [GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.UPLOAD_MANY,
|
359 | };
|
360 | passThroughOptionsCopy.destination = options.customDestinationBuilder
|
361 | ? options.customDestinationBuilder(filePath, options)
|
362 | : filePath.split(path.sep).join(path.posix.sep);
|
363 | if (options.prefix) {
|
364 | passThroughOptionsCopy.destination = path.posix.join(...options.prefix.split(path.sep), passThroughOptionsCopy.destination);
|
365 | }
|
366 | promises.push(limit(() => this.bucket.upload(filePath, passThroughOptionsCopy)));
|
367 | }
|
368 | return Promise.all(promises);
|
369 | }
|
370 | /**
|
371 | * @typedef {object} DownloadManyFilesOptions
|
372 | * @property {number} [concurrencyLimit] The number of concurrently executing promises
|
373 | * to use when downloading the files.
|
374 | * @property {string} [prefix] A prefix to append to all of the downloaded files.
|
375 | * @property {string} [stripPrefix] A prefix to remove from all of the downloaded files.
|
376 | * @property {object} [passthroughOptions] {@link DownloadOptions} Options to be passed through
|
377 | * to each individual download operation.
|
378 | *
|
379 | */
|
380 | /**
|
381 | * Download multiple files in parallel to the local filesystem. This is a convenience method
|
382 | * that utilizes {@link File#download} to perform the download.
|
383 | *
|
384 | * @param {array | string} [filesOrFolder] An array of file name strings or file objects to be downloaded. If
|
385 | * a string is provided this will be treated as a GCS prefix and all files with that prefix will be downloaded.
|
386 | * @param {DownloadManyFilesOptions} [options] Configuration options. Setting options.prefix or options.stripPrefix
|
387 | * or options.passthroughOptions.destination will cause the downloaded files to be written to the file system
|
388 | * instead of being returned as a buffer.
|
389 | * @returns {Promise<DownloadResponse[]>}
|
390 | *
|
391 | * @example
|
392 | * ```
|
393 | * const {Storage} = require('@google-cloud/storage');
|
394 | * const storage = new Storage();
|
395 | * const bucket = storage.bucket('my-bucket');
|
396 | * const transferManager = new TransferManager(bucket);
|
397 | *
|
398 | * //-
|
399 | * // Download multiple files in parallel.
|
400 | * //-
|
401 | * const response = await transferManager.downloadManyFiles(['file1.txt', 'file2.txt']);
|
402 | * // The following files have been downloaded:
|
403 | * // - "file1.txt" (with the contents from my-bucket.file1.txt)
|
404 | * // - "file2.txt" (with the contents from my-bucket.file2.txt)
|
405 | * const response = await transferManager.downloadManyFiles([bucket.File('file1.txt'), bucket.File('file2.txt')]);
|
406 | * // The following files have been downloaded:
|
407 | * // - "file1.txt" (with the contents from my-bucket.file1.txt)
|
408 | * // - "file2.txt" (with the contents from my-bucket.file2.txt)
|
409 | * const response = await transferManager.downloadManyFiles('test-folder');
|
410 | * // All files with GCS prefix of 'test-folder' have been downloaded.
|
411 | * ```
|
412 | *
|
413 | */
|
414 | async downloadManyFiles(filesOrFolder, options = {}) {
|
415 | const limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_DOWNLOAD_LIMIT);
|
416 | const promises = [];
|
417 | let files = [];
|
418 | if (!Array.isArray(filesOrFolder)) {
|
419 | const directoryFiles = await this.bucket.getFiles({
|
420 | prefix: filesOrFolder,
|
421 | });
|
422 | files = directoryFiles[0];
|
423 | }
|
424 | else {
|
425 | files = filesOrFolder.map(curFile => {
|
426 | if (typeof curFile === 'string') {
|
427 | return this.bucket.file(curFile);
|
428 | }
|
429 | return curFile;
|
430 | });
|
431 | }
|
432 | const stripRegexString = options.stripPrefix
|
433 | ? `^${options.stripPrefix}`
|
434 | : EMPTY_REGEX;
|
435 | const regex = new RegExp(stripRegexString, 'g');
|
436 | for (const file of files) {
|
437 | const passThroughOptionsCopy = {
|
438 | ...options.passthroughOptions,
|
439 | [GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_MANY,
|
440 | };
|
441 | if (options.prefix || passThroughOptionsCopy.destination) {
|
442 | passThroughOptionsCopy.destination = path.join(options.prefix || '', passThroughOptionsCopy.destination || '', file.name);
|
443 | }
|
444 | if (options.stripPrefix) {
|
445 | passThroughOptionsCopy.destination = file.name.replace(regex, '');
|
446 | }
|
447 | promises.push(limit(async () => {
|
448 | const destination = passThroughOptionsCopy.destination;
|
449 | if (destination && destination.endsWith(path.sep)) {
|
450 | await fsp.mkdir(destination, { recursive: true });
|
451 | return Promise.resolve([
|
452 | Buffer.alloc(0),
|
453 | ]);
|
454 | }
|
455 | return file.download(passThroughOptionsCopy);
|
456 | }));
|
457 | }
|
458 | return Promise.all(promises);
|
459 | }
|
460 | /**
|
461 | * @typedef {object} DownloadFileInChunksOptions
|
462 | * @property {number} [concurrencyLimit] The number of concurrently executing promises
|
463 | * to use when downloading the file.
|
464 | * @property {number} [chunkSizeBytes] The size in bytes of each chunk to be downloaded.
|
465 | * @property {string | boolean} [validation] Whether or not to perform a CRC32C validation check when download is complete.
|
466 | * @property {boolean} [noReturnData] Whether or not to return the downloaded data. A `true` value here would be useful for files with a size that will not fit into memory.
|
467 | *
|
468 | */
|
469 | /**
|
470 | * Download a large file in chunks utilizing parallel download operations. This is a convenience method
|
471 | * that utilizes {@link File#download} to perform the download.
|
472 | *
|
473 | * @param {File | string} fileOrName {@link File} to download.
|
474 | * @param {DownloadFileInChunksOptions} [options] Configuration options.
|
475 | * @returns {Promise<void | DownloadResponse>}
|
476 | *
|
477 | * @example
|
478 | * ```
|
479 | * const {Storage} = require('@google-cloud/storage');
|
480 | * const storage = new Storage();
|
481 | * const bucket = storage.bucket('my-bucket');
|
482 | * const transferManager = new TransferManager(bucket);
|
483 | *
|
484 | * //-
|
485 | * // Download a large file in chunks utilizing parallel operations.
|
486 | * //-
|
487 | * const response = await transferManager.downloadFileInChunks(bucket.file('large-file.txt');
|
488 | * // Your local directory now contains:
|
489 | * // - "large-file.txt" (with the contents from my-bucket.large-file.txt)
|
490 | * ```
|
491 | *
|
492 | */
|
493 | async downloadFileInChunks(fileOrName, options = {}) {
|
494 | let chunkSize = options.chunkSizeBytes || DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE;
|
495 | let limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_CHUNKED_DOWNLOAD_LIMIT);
|
496 | const noReturnData = Boolean(options.noReturnData);
|
497 | const promises = [];
|
498 | const file = typeof fileOrName === 'string'
|
499 | ? this.bucket.file(fileOrName)
|
500 | : fileOrName;
|
501 | const fileInfo = await file.get();
|
502 | const size = parseInt(fileInfo[0].metadata.size.toString());
|
503 | // If the file size does not meet the threshold download it as a single chunk.
|
504 | if (size < DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD) {
|
505 | limit = pLimit(1);
|
506 | chunkSize = size;
|
507 | }
|
508 | let start = 0;
|
509 | const filePath = options.destination || path.basename(file.name);
|
510 | const fileToWrite = await fsp.open(filePath, 'w');
|
511 | while (start < size) {
|
512 | const chunkStart = start;
|
513 | let chunkEnd = start + chunkSize - 1;
|
514 | chunkEnd = chunkEnd > size ? size : chunkEnd;
|
515 | promises.push(limit(async () => {
|
516 | const resp = await file.download({
|
517 | start: chunkStart,
|
518 | end: chunkEnd,
|
519 | [GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_SHARDED,
|
520 | });
|
521 | const result = await fileToWrite.write(resp[0], 0, resp[0].length, chunkStart);
|
522 | if (noReturnData)
|
523 | return;
|
524 | return result.buffer;
|
525 | }));
|
526 | start += chunkSize;
|
527 | }
|
528 | let chunks;
|
529 | try {
|
530 | chunks = await Promise.all(promises);
|
531 | }
|
532 | finally {
|
533 | await fileToWrite.close();
|
534 | }
|
535 | if (options.validation === 'crc32c' && fileInfo[0].metadata.crc32c) {
|
536 | const downloadedCrc32C = await CRC32C.fromFile(filePath);
|
537 | if (!downloadedCrc32C.validate(fileInfo[0].metadata.crc32c)) {
|
538 | const mismatchError = new RequestError(FileExceptionMessages.DOWNLOAD_MISMATCH);
|
539 | mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';
|
540 | throw mismatchError;
|
541 | }
|
542 | }
|
543 | if (noReturnData)
|
544 | return;
|
545 | return [Buffer.concat(chunks, size)];
|
546 | }
|
547 | /**
|
548 | * @typedef {object} UploadFileInChunksOptions
|
549 | * @property {number} [concurrencyLimit] The number of concurrently executing promises
|
550 | * to use when uploading the file.
|
551 | * @property {number} [chunkSizeBytes] The size in bytes of each chunk to be uploaded.
|
552 | * @property {string} [uploadName] Name of the file when saving to GCS. If ommitted the name is taken from the file path.
|
553 | * @property {number} [maxQueueSize] The number of chunks to be uploaded to hold in memory concurrently. If not specified
|
554 | * defaults to the specified concurrency limit.
|
555 | * @property {string} [uploadId] If specified attempts to resume a previous upload.
|
556 | * @property {Map} [partsMap] If specified alongside uploadId, attempts to resume a previous upload from the last chunk
|
557 | * specified in partsMap
|
558 | * @property {object} [headers] headers to be sent when initiating the multipart upload.
|
559 | * See {@link https://cloud.google.com/storage/docs/xml-api/post-object-multipart#request_headers| Request Headers: Initiate a Multipart Upload}
|
560 | * @property {boolean} [autoAbortFailure] boolean to indicate if an in progress upload session will be automatically aborted upon failure. If not set,
|
561 | * failures will be automatically aborted.
|
562 | *
|
563 | */
|
564 | /**
|
565 | * Upload a large file in chunks utilizing parallel upload opertions. If the upload fails, an uploadId and
|
566 | * map containing all the successfully uploaded parts will be returned to the caller. These arguments can be used to
|
567 | * resume the upload.
|
568 | *
|
569 | * @param {string} [filePath] The path of the file to be uploaded
|
570 | * @param {UploadFileInChunksOptions} [options] Configuration options.
|
571 | * @param {MultiPartHelperGenerator} [generator] A function that will return a type that implements the MPU interface. Most users will not need to use this.
|
572 | * @returns {Promise<void>} If successful a promise resolving to void, otherwise a error containing the message, uploadid, and parts map.
|
573 | *
|
574 | * @example
|
575 | * ```
|
576 | * const {Storage} = require('@google-cloud/storage');
|
577 | * const storage = new Storage();
|
578 | * const bucket = storage.bucket('my-bucket');
|
579 | * const transferManager = new TransferManager(bucket);
|
580 | *
|
581 | * //-
|
582 | * // Upload a large file in chunks utilizing parallel operations.
|
583 | * //-
|
584 | * const response = await transferManager.uploadFileInChunks('large-file.txt');
|
585 | * // Your bucket now contains:
|
586 | * // - "large-file.txt"
|
587 | * ```
|
588 | *
|
589 | *
|
590 | */
|
591 | async uploadFileInChunks(filePath, options = {}, generator = defaultMultiPartGenerator) {
|
592 | const chunkSize = options.chunkSizeBytes || UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE;
|
593 | const limit = pLimit(options.concurrencyLimit || DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT);
|
594 | const maxQueueSize = options.maxQueueSize ||
|
595 | options.concurrencyLimit ||
|
596 | DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT;
|
597 | const fileName = options.uploadName || path.basename(filePath);
|
598 | const mpuHelper = generator(this.bucket, fileName, options.uploadId, options.partsMap);
|
599 | let partNumber = 1;
|
600 | let promises = [];
|
601 | try {
|
602 | if (options.uploadId === undefined) {
|
603 | await mpuHelper.initiateUpload(options.headers);
|
604 | }
|
605 | const startOrResumptionByte = mpuHelper.partsMap.size * chunkSize;
|
606 | const readStream = createReadStream(filePath, {
|
607 | highWaterMark: chunkSize,
|
608 | start: startOrResumptionByte,
|
609 | });
|
610 | // p-limit only limits the number of running promises. We do not want to hold an entire
|
611 | // large file in memory at once so promises acts a queue that will hold only maxQueueSize in memory.
|
612 | for await (const curChunk of readStream) {
|
613 | if (promises.length >= maxQueueSize) {
|
614 | await Promise.all(promises);
|
615 | promises = [];
|
616 | }
|
617 | promises.push(limit(() => mpuHelper.uploadPart(partNumber++, curChunk, options.validation)));
|
618 | }
|
619 | await Promise.all(promises);
|
620 | return await mpuHelper.completeUpload();
|
621 | }
|
622 | catch (e) {
|
623 | if ((options.autoAbortFailure === undefined || options.autoAbortFailure) &&
|
624 | mpuHelper.uploadId) {
|
625 | try {
|
626 | await mpuHelper.abortUpload();
|
627 | return;
|
628 | }
|
629 | catch (e) {
|
630 | throw new MultiPartUploadError(e.message, mpuHelper.uploadId, mpuHelper.partsMap);
|
631 | }
|
632 | }
|
633 | throw new MultiPartUploadError(e.message, mpuHelper.uploadId, mpuHelper.partsMap);
|
634 | }
|
635 | }
|
636 | async *getPathsFromDirectory(directory) {
|
637 | const filesAndSubdirectories = await fsp.readdir(directory, {
|
638 | withFileTypes: true,
|
639 | });
|
640 | for (const curFileOrDirectory of filesAndSubdirectories) {
|
641 | const fullPath = path.join(directory, curFileOrDirectory.name);
|
642 | curFileOrDirectory.isDirectory()
|
643 | ? yield* this.getPathsFromDirectory(fullPath)
|
644 | : yield fullPath;
|
645 | }
|
646 | }
|
647 | }
|