UNPKG

37.4 kBJavaScriptView Raw
1// Copyright 2022 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
15 if (kind === "m") throw new TypeError("Private method is not writable");
16 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
17 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
18 return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
19};
20var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
21 if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
22 if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
23 return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
24};
25var _Upload_instances, _Upload_gcclGcsCmd, _Upload_resetLocalBuffersCache, _Upload_addLocalBufferCache;
26import AbortController from 'abort-controller';
27import { createHash } from 'crypto';
28import * as gaxios from 'gaxios';
29import { DEFAULT_UNIVERSE, GoogleAuth, } from 'google-auth-library';
30import { Readable, Writable } from 'stream';
31import AsyncRetry from 'async-retry';
32import * as uuid from 'uuid';
33import { getRuntimeTrackingString, getModuleFormat, getUserAgentString, } from './util.js';
34import { GCCL_GCS_CMD_KEY } from './nodejs-common/util.js';
35// eslint-disable-next-line @typescript-eslint/ban-ts-comment
36// @ts-ignore
37import { getPackageJSON } from './package-json-helper.cjs';
38const NOT_FOUND_STATUS_CODE = 404;
39const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
40const packageJson = getPackageJSON();
41export const PROTOCOL_REGEX = /^(\w*):\/\//;
42export class Upload extends Writable {
43 constructor(cfg) {
44 var _a;
45 super(cfg);
46 _Upload_instances.add(this);
47 this.numBytesWritten = 0;
48 this.numRetries = 0;
49 this.currentInvocationId = {
50 checkUploadStatus: uuid.v4(),
51 chunk: uuid.v4(),
52 uri: uuid.v4(),
53 };
54 /**
55 * A cache of buffers written to this instance, ready for consuming
56 */
57 this.writeBuffers = [];
58 this.numChunksReadInRequest = 0;
59 /**
60 * An array of buffers used for caching the most recent upload chunk.
61 * We should not assume that the server received all bytes sent in the request.
62 * - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
63 */
64 this.localWriteCache = [];
65 this.localWriteCacheByteLength = 0;
66 this.upstreamEnded = false;
67 _Upload_gcclGcsCmd.set(this, void 0);
68 cfg = cfg || {};
69 if (!cfg.bucket || !cfg.file) {
70 throw new Error('A bucket and file name are required');
71 }
72 if (cfg.offset && !cfg.uri) {
73 throw new RangeError('Cannot provide an `offset` without providing a `uri`');
74 }
75 if (cfg.isPartialUpload && !cfg.chunkSize) {
76 throw new RangeError('Cannot set `isPartialUpload` without providing a `chunkSize`');
77 }
78 cfg.authConfig = cfg.authConfig || {};
79 cfg.authConfig.scopes = [
80 'https://www.googleapis.com/auth/devstorage.full_control',
81 ];
82 this.authClient = cfg.authClient || new GoogleAuth(cfg.authConfig);
83 const universe = cfg.universeDomain || DEFAULT_UNIVERSE;
84 this.apiEndpoint = `https://storage.${universe}`;
85 if (cfg.apiEndpoint && cfg.apiEndpoint !== this.apiEndpoint) {
86 this.apiEndpoint = this.sanitizeEndpoint(cfg.apiEndpoint);
87 const hostname = new URL(this.apiEndpoint).hostname;
88 // check if it is a domain of a known universe
89 const isDomain = hostname === universe;
90 const isDefaultUniverseDomain = hostname === DEFAULT_UNIVERSE;
91 // check if it is a subdomain of a known universe
92 // by checking a last (universe's length + 1) of a hostname
93 const isSubDomainOfUniverse = hostname.slice(-(universe.length + 1)) === `.${universe}`;
94 const isSubDomainOfDefaultUniverse = hostname.slice(-(DEFAULT_UNIVERSE.length + 1)) ===
95 `.${DEFAULT_UNIVERSE}`;
96 if (!isDomain &&
97 !isDefaultUniverseDomain &&
98 !isSubDomainOfUniverse &&
99 !isSubDomainOfDefaultUniverse) {
100 // a custom, non-universe domain,
101 // use gaxios
102 this.authClient = gaxios;
103 }
104 }
105 this.baseURI = `${this.apiEndpoint}/upload/storage/v1/b`;
106 this.bucket = cfg.bucket;
107 const cacheKeyElements = [cfg.bucket, cfg.file];
108 if (typeof cfg.generation === 'number') {
109 cacheKeyElements.push(`${cfg.generation}`);
110 }
111 this.cacheKey = cacheKeyElements.join('/');
112 this.customRequestOptions = cfg.customRequestOptions || {};
113 this.file = cfg.file;
114 this.generation = cfg.generation;
115 this.kmsKeyName = cfg.kmsKeyName;
116 this.metadata = cfg.metadata || {};
117 this.offset = cfg.offset;
118 this.origin = cfg.origin;
119 this.params = cfg.params || {};
120 this.userProject = cfg.userProject;
121 this.chunkSize = cfg.chunkSize;
122 this.retryOptions = cfg.retryOptions;
123 this.isPartialUpload = (_a = cfg.isPartialUpload) !== null && _a !== void 0 ? _a : false;
124 if (cfg.key) {
125 const base64Key = Buffer.from(cfg.key).toString('base64');
126 this.encryption = {
127 key: base64Key,
128 hash: createHash('sha256').update(cfg.key).digest('base64'),
129 };
130 }
131 this.predefinedAcl = cfg.predefinedAcl;
132 if (cfg.private)
133 this.predefinedAcl = 'private';
134 if (cfg.public)
135 this.predefinedAcl = 'publicRead';
136 const autoRetry = cfg.retryOptions.autoRetry;
137 this.uriProvidedManually = !!cfg.uri;
138 this.uri = cfg.uri;
139 if (this.offset) {
140 // we're resuming an incomplete upload
141 this.numBytesWritten = this.offset;
142 }
143 this.numRetries = 0; // counter for number of retries currently executed
144 if (!autoRetry) {
145 cfg.retryOptions.maxRetries = 0;
146 }
147 this.timeOfFirstRequest = Date.now();
148 const contentLength = cfg.metadata
149 ? Number(cfg.metadata.contentLength)
150 : NaN;
151 this.contentLength = isNaN(contentLength) ? '*' : contentLength;
152 __classPrivateFieldSet(this, _Upload_gcclGcsCmd, cfg[GCCL_GCS_CMD_KEY], "f");
153 this.once('writing', () => {
154 if (this.uri) {
155 this.continueUploading();
156 }
157 else {
158 this.createURI(err => {
159 if (err) {
160 return this.destroy(err);
161 }
162 this.startUploading();
163 return;
164 });
165 }
166 });
167 }
168 /**
169 * Prevent 'finish' event until the upload has succeeded.
170 *
171 * @param fireFinishEvent The finish callback
172 */
173 _final(fireFinishEvent = () => { }) {
174 this.upstreamEnded = true;
175 this.once('uploadFinished', fireFinishEvent);
176 process.nextTick(() => {
177 this.emit('upstreamFinished');
178 // it's possible `_write` may not be called - namely for empty object uploads
179 this.emit('writing');
180 });
181 }
182 /**
183 * Handles incoming data from upstream
184 *
185 * @param chunk The chunk to append to the buffer
186 * @param encoding The encoding of the chunk
187 * @param readCallback A callback for when the buffer has been read downstream
188 */
189 _write(chunk, encoding, readCallback = () => { }) {
190 // Backwards-compatible event
191 this.emit('writing');
192 this.writeBuffers.push(typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk);
193 this.once('readFromChunkBuffer', readCallback);
194 process.nextTick(() => this.emit('wroteToChunkBuffer'));
195 }
196 /**
197 * Prepends the local buffer to write buffer and resets it.
198 *
199 * @param keepLastBytes number of bytes to keep from the end of the local buffer.
200 */
201 prependLocalBufferToUpstream(keepLastBytes) {
202 // Typically, the upstream write buffers should be smaller than the local
203 // cache, so we can save time by setting the local cache as the new
204 // upstream write buffer array and appending the old array to it
205 let initialBuffers = [];
206 if (keepLastBytes) {
207 // we only want the last X bytes
208 let bytesKept = 0;
209 while (keepLastBytes > bytesKept) {
210 // load backwards because we want the last X bytes
211 // note: `localWriteCacheByteLength` is reset below
212 let buf = this.localWriteCache.pop();
213 if (!buf)
214 break;
215 bytesKept += buf.byteLength;
216 if (bytesKept > keepLastBytes) {
217 // we have gone over the amount desired, let's keep the last X bytes
218 // of this buffer
219 const diff = bytesKept - keepLastBytes;
220 buf = buf.subarray(diff);
221 bytesKept -= diff;
222 }
223 initialBuffers.unshift(buf);
224 }
225 }
226 else {
227 // we're keeping all of the local cache, simply use it as the initial buffer
228 initialBuffers = this.localWriteCache;
229 }
230 // Append the old upstream to the new
231 const append = this.writeBuffers;
232 this.writeBuffers = initialBuffers;
233 for (const buf of append) {
234 this.writeBuffers.push(buf);
235 }
236 // reset last buffers sent
237 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this);
238 }
239 /**
240 * Retrieves data from upstream's buffer.
241 *
242 * @param limit The maximum amount to return from the buffer.
243 */
244 *pullFromChunkBuffer(limit) {
245 while (limit) {
246 const buf = this.writeBuffers.shift();
247 if (!buf)
248 break;
249 let bufToYield = buf;
250 if (buf.byteLength > limit) {
251 bufToYield = buf.subarray(0, limit);
252 this.writeBuffers.unshift(buf.subarray(limit));
253 limit = 0;
254 }
255 else {
256 limit -= buf.byteLength;
257 }
258 yield bufToYield;
259 // Notify upstream we've read from the buffer and we're able to consume
260 // more. It can also potentially send more data down as we're currently
261 // iterating.
262 this.emit('readFromChunkBuffer');
263 }
264 }
265 /**
266 * A handler for determining if data is ready to be read from upstream.
267 *
268 * @returns If there will be more chunks to read in the future
269 */
270 async waitForNextChunk() {
271 const willBeMoreChunks = await new Promise(resolve => {
272 // There's data available - it should be digested
273 if (this.writeBuffers.length) {
274 return resolve(true);
275 }
276 // The upstream writable ended, we shouldn't expect any more data.
277 if (this.upstreamEnded) {
278 return resolve(false);
279 }
280 // Nothing immediate seems to be determined. We need to prepare some
281 // listeners to determine next steps...
282 const wroteToChunkBufferCallback = () => {
283 removeListeners();
284 return resolve(true);
285 };
286 const upstreamFinishedCallback = () => {
287 removeListeners();
288 // this should be the last chunk, if there's anything there
289 if (this.writeBuffers.length)
290 return resolve(true);
291 return resolve(false);
292 };
293 // Remove listeners when we're ready to callback.
294 const removeListeners = () => {
295 this.removeListener('wroteToChunkBuffer', wroteToChunkBufferCallback);
296 this.removeListener('upstreamFinished', upstreamFinishedCallback);
297 };
298 // If there's data recently written it should be digested
299 this.once('wroteToChunkBuffer', wroteToChunkBufferCallback);
300 // If the upstream finishes let's see if there's anything to grab
301 this.once('upstreamFinished', upstreamFinishedCallback);
302 });
303 return willBeMoreChunks;
304 }
305 /**
306 * Reads data from upstream up to the provided `limit`.
307 * Ends when the limit has reached or no data is expected to be pushed from upstream.
308 *
309 * @param limit The most amount of data this iterator should return. `Infinity` by default.
310 */
311 async *upstreamIterator(limit = Infinity) {
312 // read from upstream chunk buffer
313 while (limit && (await this.waitForNextChunk())) {
314 // read until end or limit has been reached
315 for (const chunk of this.pullFromChunkBuffer(limit)) {
316 limit -= chunk.byteLength;
317 yield chunk;
318 }
319 }
320 }
321 createURI(callback) {
322 if (!callback) {
323 return this.createURIAsync();
324 }
325 this.createURIAsync().then(r => callback(null, r), callback);
326 }
327 async createURIAsync() {
328 const metadata = { ...this.metadata };
329 const headers = {};
330 // Delete content length and content type from metadata if they exist.
331 // These are headers and should not be sent as part of the metadata.
332 if (metadata.contentLength) {
333 headers['X-Upload-Content-Length'] = metadata.contentLength.toString();
334 delete metadata.contentLength;
335 }
336 if (metadata.contentType) {
337 headers['X-Upload-Content-Type'] = metadata.contentType;
338 delete metadata.contentType;
339 }
340 let googAPIClient = `${getRuntimeTrackingString()} gccl/${packageJson.version}-${getModuleFormat()} gccl-invocation-id/${this.currentInvocationId.uri}`;
341 if (__classPrivateFieldGet(this, _Upload_gcclGcsCmd, "f")) {
342 googAPIClient += ` gccl-gcs-cmd/${__classPrivateFieldGet(this, _Upload_gcclGcsCmd, "f")}`;
343 }
344 // Check if headers already exist before creating new ones
345 const reqOpts = {
346 method: 'POST',
347 url: [this.baseURI, this.bucket, 'o'].join('/'),
348 params: Object.assign({
349 name: this.file,
350 uploadType: 'resumable',
351 }, this.params),
352 data: metadata,
353 headers: {
354 'User-Agent': getUserAgentString(),
355 'x-goog-api-client': googAPIClient,
356 ...headers,
357 },
358 };
359 if (metadata.contentLength) {
360 reqOpts.headers['X-Upload-Content-Length'] =
361 metadata.contentLength.toString();
362 }
363 if (metadata.contentType) {
364 reqOpts.headers['X-Upload-Content-Type'] = metadata.contentType;
365 }
366 if (typeof this.generation !== 'undefined') {
367 reqOpts.params.ifGenerationMatch = this.generation;
368 }
369 if (this.kmsKeyName) {
370 reqOpts.params.kmsKeyName = this.kmsKeyName;
371 }
372 if (this.predefinedAcl) {
373 reqOpts.params.predefinedAcl = this.predefinedAcl;
374 }
375 if (this.origin) {
376 reqOpts.headers.Origin = this.origin;
377 }
378 const uri = await AsyncRetry(async (bail) => {
379 var _a, _b, _c;
380 try {
381 const res = await this.makeRequest(reqOpts);
382 // We have successfully got a URI we can now create a new invocation id
383 this.currentInvocationId.uri = uuid.v4();
384 return res.headers.location;
385 }
386 catch (err) {
387 const e = err;
388 const apiError = {
389 code: (_a = e.response) === null || _a === void 0 ? void 0 : _a.status,
390 name: (_b = e.response) === null || _b === void 0 ? void 0 : _b.statusText,
391 message: (_c = e.response) === null || _c === void 0 ? void 0 : _c.statusText,
392 errors: [
393 {
394 reason: e.code,
395 },
396 ],
397 };
398 if (this.retryOptions.maxRetries > 0 &&
399 this.retryOptions.retryableErrorFn(apiError)) {
400 throw e;
401 }
402 else {
403 return bail(e);
404 }
405 }
406 }, {
407 retries: this.retryOptions.maxRetries,
408 factor: this.retryOptions.retryDelayMultiplier,
409 maxTimeout: this.retryOptions.maxRetryDelay * 1000, //convert to milliseconds
410 maxRetryTime: this.retryOptions.totalTimeout * 1000, //convert to milliseconds
411 });
412 this.uri = uri;
413 this.offset = 0;
414 // emit the newly generated URI for future reuse, if necessary.
415 this.emit('uri', uri);
416 return uri;
417 }
418 async continueUploading() {
419 var _a;
420 (_a = this.offset) !== null && _a !== void 0 ? _a : (await this.getAndSetOffset());
421 return this.startUploading();
422 }
423 async startUploading() {
424 const multiChunkMode = !!this.chunkSize;
425 let responseReceived = false;
426 this.numChunksReadInRequest = 0;
427 if (!this.offset) {
428 this.offset = 0;
429 }
430 // Check if the offset (server) is too far behind the current stream
431 if (this.offset < this.numBytesWritten) {
432 const delta = this.numBytesWritten - this.offset;
433 const message = `The offset is lower than the number of bytes written. The server has ${this.offset} bytes and while ${this.numBytesWritten} bytes has been uploaded - thus ${delta} bytes are missing. Stopping as this could result in data loss. Initiate a new upload to continue.`;
434 this.emit('error', new RangeError(message));
435 return;
436 }
437 // Check if we should 'fast-forward' to the relevant data to upload
438 if (this.numBytesWritten < this.offset) {
439 // 'fast-forward' to the byte where we need to upload.
440 // only push data from the byte after the one we left off on
441 const fastForwardBytes = this.offset - this.numBytesWritten;
442 for await (const _chunk of this.upstreamIterator(fastForwardBytes)) {
443 _chunk; // discard the data up until the point we want
444 }
445 this.numBytesWritten = this.offset;
446 }
447 let expectedUploadSize = undefined;
448 // Set `expectedUploadSize` to `contentLength - this.numBytesWritten`, if available
449 if (typeof this.contentLength === 'number') {
450 expectedUploadSize = this.contentLength - this.numBytesWritten;
451 }
452 // `expectedUploadSize` should be no more than the `chunkSize`.
453 // It's possible this is the last chunk request for a multiple
454 // chunk upload, thus smaller than the chunk size.
455 if (this.chunkSize) {
456 expectedUploadSize = expectedUploadSize
457 ? Math.min(this.chunkSize, expectedUploadSize)
458 : this.chunkSize;
459 }
460 // A queue for the upstream data
461 const upstreamQueue = this.upstreamIterator(expectedUploadSize);
462 // The primary read stream for this request. This stream retrieves no more
463 // than the exact requested amount from upstream.
464 const requestStream = new Readable({
465 read: async () => {
466 // Don't attempt to retrieve data upstream if we already have a response
467 if (responseReceived)
468 requestStream.push(null);
469 const result = await upstreamQueue.next();
470 if (result.value) {
471 this.numChunksReadInRequest++;
472 if (multiChunkMode) {
473 // save ever buffer used in the request in multi-chunk mode
474 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_addLocalBufferCache).call(this, result.value);
475 }
476 else {
477 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this);
478 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_addLocalBufferCache).call(this, result.value);
479 }
480 this.numBytesWritten += result.value.byteLength;
481 this.emit('progress', {
482 bytesWritten: this.numBytesWritten,
483 contentLength: this.contentLength,
484 });
485 requestStream.push(result.value);
486 }
487 if (result.done) {
488 requestStream.push(null);
489 }
490 },
491 });
492 let googAPIClient = `${getRuntimeTrackingString()} gccl/${packageJson.version}-${getModuleFormat()} gccl-invocation-id/${this.currentInvocationId.chunk}`;
493 if (__classPrivateFieldGet(this, _Upload_gcclGcsCmd, "f")) {
494 googAPIClient += ` gccl-gcs-cmd/${__classPrivateFieldGet(this, _Upload_gcclGcsCmd, "f")}`;
495 }
496 const headers = {
497 'User-Agent': getUserAgentString(),
498 'x-goog-api-client': googAPIClient,
499 };
500 // If using multiple chunk upload, set appropriate header
501 if (multiChunkMode) {
502 // We need to know how much data is available upstream to set the `Content-Range` header.
503 // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
504 for await (const chunk of this.upstreamIterator(expectedUploadSize)) {
505 // This will conveniently track and keep the size of the buffers.
506 // We will reach either the expected upload size or the remainder of the stream.
507 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_addLocalBufferCache).call(this, chunk);
508 }
509 // This is the sum from the `#addLocalBufferCache` calls
510 const bytesToUpload = this.localWriteCacheByteLength;
511 // Important: we want to know if the upstream has ended and the queue is empty before
512 // unshifting data back into the queue. This way we will know if this is the last request or not.
513 const isLastChunkOfUpload = !(await this.waitForNextChunk());
514 // Important: put the data back in the queue for the actual upload
515 this.prependLocalBufferToUpstream();
516 let totalObjectSize = this.contentLength;
517 if (typeof this.contentLength !== 'number' &&
518 isLastChunkOfUpload &&
519 !this.isPartialUpload) {
520 // Let's let the server know this is the last chunk of the object since we didn't set it before.
521 totalObjectSize = bytesToUpload + this.numBytesWritten;
522 }
523 // `- 1` as the ending byte is inclusive in the request.
524 const endingByte = bytesToUpload + this.numBytesWritten - 1;
525 // `Content-Length` for multiple chunk uploads is the size of the chunk,
526 // not the overall object
527 headers['Content-Length'] = bytesToUpload;
528 headers['Content-Range'] =
529 `bytes ${this.offset}-${endingByte}/${totalObjectSize}`;
530 }
531 else {
532 headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;
533 }
534 const reqOpts = {
535 method: 'PUT',
536 url: this.uri,
537 headers,
538 body: requestStream,
539 };
540 try {
541 const resp = await this.makeRequestStream(reqOpts);
542 if (resp) {
543 responseReceived = true;
544 await this.responseHandler(resp);
545 }
546 }
547 catch (e) {
548 const err = e;
549 if (this.retryOptions.retryableErrorFn(err)) {
550 this.attemptDelayedRetry({
551 status: NaN,
552 data: err,
553 });
554 return;
555 }
556 this.destroy(err);
557 }
558 }
559 // Process the API response to look for errors that came in
560 // the response body.
561 async responseHandler(resp) {
562 if (resp.data.error) {
563 this.destroy(resp.data.error);
564 return;
565 }
566 // At this point we can safely create a new id for the chunk
567 this.currentInvocationId.chunk = uuid.v4();
568 const moreDataToUpload = await this.waitForNextChunk();
569 const shouldContinueWithNextMultiChunkRequest = this.chunkSize &&
570 resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE &&
571 resp.headers.range &&
572 moreDataToUpload;
573 /**
574 * This is true when we're expecting to upload more data in a future request,
575 * yet the upstream for the upload session has been exhausted.
576 */
577 const shouldContinueUploadInAnotherRequest = this.isPartialUpload &&
578 resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE &&
579 !moreDataToUpload;
580 if (shouldContinueWithNextMultiChunkRequest) {
581 // Use the upper value in this header to determine where to start the next chunk.
582 // We should not assume that the server received all bytes sent in the request.
583 // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
584 const range = resp.headers.range;
585 this.offset = Number(range.split('-')[1]) + 1;
586 // We should not assume that the server received all bytes sent in the request.
587 // - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
588 const missingBytes = this.numBytesWritten - this.offset;
589 if (missingBytes) {
590 // As multi-chunk uploads send one chunk per request and pulls one
591 // chunk into the pipeline, prepending the missing bytes back should
592 // be fine for the next request.
593 this.prependLocalBufferToUpstream(missingBytes);
594 this.numBytesWritten -= missingBytes;
595 }
596 else {
597 // No bytes missing - no need to keep the local cache
598 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this);
599 }
600 // continue uploading next chunk
601 this.continueUploading();
602 }
603 else if (!this.isSuccessfulResponse(resp.status) &&
604 !shouldContinueUploadInAnotherRequest) {
605 const err = new Error('Upload failed');
606 err.code = resp.status;
607 err.name = 'Upload failed';
608 if (resp === null || resp === void 0 ? void 0 : resp.data) {
609 err.errors = [resp === null || resp === void 0 ? void 0 : resp.data];
610 }
611 this.destroy(err);
612 }
613 else {
614 // no need to keep the cache
615 __classPrivateFieldGet(this, _Upload_instances, "m", _Upload_resetLocalBuffersCache).call(this);
616 if (resp && resp.data) {
617 resp.data.size = Number(resp.data.size);
618 }
619 this.emit('metadata', resp.data);
620 // Allow the object (Upload) to continue naturally so the user's
621 // "finish" event fires.
622 this.emit('uploadFinished');
623 }
624 }
625 /**
626 * Check the status of an existing resumable upload.
627 *
628 * @param cfg A configuration to use. `uri` is required.
629 * @returns the current upload status
630 */
631 async checkUploadStatus(config = {}) {
632 let googAPIClient = `${getRuntimeTrackingString()} gccl/${packageJson.version}-${getModuleFormat()} gccl-invocation-id/${this.currentInvocationId.checkUploadStatus}`;
633 if (__classPrivateFieldGet(this, _Upload_gcclGcsCmd, "f")) {
634 googAPIClient += ` gccl-gcs-cmd/${__classPrivateFieldGet(this, _Upload_gcclGcsCmd, "f")}`;
635 }
636 const opts = {
637 method: 'PUT',
638 url: this.uri,
639 headers: {
640 'Content-Length': 0,
641 'Content-Range': 'bytes */*',
642 'User-Agent': getUserAgentString(),
643 'x-goog-api-client': googAPIClient,
644 },
645 };
646 try {
647 const resp = await this.makeRequest(opts);
648 // Successfully got the offset we can now create a new offset invocation id
649 this.currentInvocationId.checkUploadStatus = uuid.v4();
650 return resp;
651 }
652 catch (e) {
653 if (config.retry === false ||
654 !(e instanceof Error) ||
655 !this.retryOptions.retryableErrorFn(e)) {
656 throw e;
657 }
658 const retryDelay = this.getRetryDelay();
659 if (retryDelay <= 0) {
660 throw e;
661 }
662 await new Promise(res => setTimeout(res, retryDelay));
663 return this.checkUploadStatus(config);
664 }
665 }
666 async getAndSetOffset() {
667 try {
668 // we want to handle retries in this method.
669 const resp = await this.checkUploadStatus({ retry: false });
670 if (resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE) {
671 if (typeof resp.headers.range === 'string') {
672 this.offset = Number(resp.headers.range.split('-')[1]) + 1;
673 return;
674 }
675 }
676 this.offset = 0;
677 }
678 catch (e) {
679 const err = e;
680 if (this.retryOptions.retryableErrorFn(err)) {
681 this.attemptDelayedRetry({
682 status: NaN,
683 data: err,
684 });
685 return;
686 }
687 this.destroy(err);
688 }
689 }
690 async makeRequest(reqOpts) {
691 if (this.encryption) {
692 reqOpts.headers = reqOpts.headers || {};
693 reqOpts.headers['x-goog-encryption-algorithm'] = 'AES256';
694 reqOpts.headers['x-goog-encryption-key'] = this.encryption.key.toString();
695 reqOpts.headers['x-goog-encryption-key-sha256'] =
696 this.encryption.hash.toString();
697 }
698 if (this.userProject) {
699 reqOpts.params = reqOpts.params || {};
700 reqOpts.params.userProject = this.userProject;
701 }
702 // Let gaxios know we will handle a 308 error code ourselves.
703 reqOpts.validateStatus = (status) => {
704 return (this.isSuccessfulResponse(status) ||
705 status === RESUMABLE_INCOMPLETE_STATUS_CODE);
706 };
707 const combinedReqOpts = {
708 ...this.customRequestOptions,
709 ...reqOpts,
710 headers: {
711 ...this.customRequestOptions.headers,
712 ...reqOpts.headers,
713 },
714 };
715 const res = await this.authClient.request(combinedReqOpts);
716 if (res.data && res.data.error) {
717 throw res.data.error;
718 }
719 return res;
720 }
721 async makeRequestStream(reqOpts) {
722 const controller = new AbortController();
723 const errorCallback = () => controller.abort();
724 this.once('error', errorCallback);
725 if (this.userProject) {
726 reqOpts.params = reqOpts.params || {};
727 reqOpts.params.userProject = this.userProject;
728 }
729 reqOpts.signal = controller.signal;
730 reqOpts.validateStatus = () => true;
731 const combinedReqOpts = {
732 ...this.customRequestOptions,
733 ...reqOpts,
734 headers: {
735 ...this.customRequestOptions.headers,
736 ...reqOpts.headers,
737 },
738 };
739 const res = await this.authClient.request(combinedReqOpts);
740 const successfulRequest = this.onResponse(res);
741 this.removeListener('error', errorCallback);
742 return successfulRequest ? res : null;
743 }
744 /**
745 * @return {bool} is the request good?
746 */
747 onResponse(resp) {
748 if (resp.status !== 200 &&
749 this.retryOptions.retryableErrorFn({
750 code: resp.status,
751 message: resp.statusText,
752 name: resp.statusText,
753 })) {
754 this.attemptDelayedRetry(resp);
755 return false;
756 }
757 this.emit('response', resp);
758 return true;
759 }
760 /**
761 * @param resp GaxiosResponse object from previous attempt
762 */
763 attemptDelayedRetry(resp) {
764 if (this.numRetries < this.retryOptions.maxRetries) {
765 if (resp.status === NOT_FOUND_STATUS_CODE &&
766 this.numChunksReadInRequest === 0) {
767 this.startUploading();
768 }
769 else {
770 const retryDelay = this.getRetryDelay();
771 if (retryDelay <= 0) {
772 this.destroy(new Error(`Retry total time limit exceeded - ${JSON.stringify(resp.data)}`));
773 return;
774 }
775 // Unshift the local cache back in case it's needed for the next request.
776 this.numBytesWritten -= this.localWriteCacheByteLength;
777 this.prependLocalBufferToUpstream();
778 // We don't know how much data has been received by the server.
779 // `continueUploading` will recheck the offset via `getAndSetOffset`.
780 // If `offset` < `numberBytesReceived` then we will raise a RangeError
781 // as we've streamed too much data that has been missed - this should
782 // not be the case for multi-chunk uploads as `lastChunkSent` is the
783 // body of the entire request.
784 this.offset = undefined;
785 setTimeout(this.continueUploading.bind(this), retryDelay);
786 }
787 this.numRetries++;
788 }
789 else {
790 this.destroy(new Error(`Retry limit exceeded - ${JSON.stringify(resp.data)}`));
791 }
792 }
793 /**
794 * The amount of time to wait before retrying the request, in milliseconds.
795 * If negative, do not retry.
796 *
797 * @returns the amount of time to wait, in milliseconds.
798 */
799 getRetryDelay() {
800 const randomMs = Math.round(Math.random() * 1000);
801 const waitTime = Math.pow(this.retryOptions.retryDelayMultiplier, this.numRetries) *
802 1000 +
803 randomMs;
804 const maxAllowableDelayMs = this.retryOptions.totalTimeout * 1000 -
805 (Date.now() - this.timeOfFirstRequest);
806 const maxRetryDelayMs = this.retryOptions.maxRetryDelay * 1000;
807 return Math.min(waitTime, maxRetryDelayMs, maxAllowableDelayMs);
808 }
809 /*
810 * Prepare user-defined API endpoint for compatibility with our API.
811 */
812 sanitizeEndpoint(url) {
813 if (!PROTOCOL_REGEX.test(url)) {
814 url = `https://${url}`;
815 }
816 return url.replace(/\/+$/, ''); // Remove trailing slashes
817 }
818 /**
819 * Check if a given status code is 2xx
820 *
821 * @param status The status code to check
822 * @returns if the status is 2xx
823 */
824 isSuccessfulResponse(status) {
825 return status >= 200 && status < 300;
826 }
827}
828_Upload_gcclGcsCmd = new WeakMap(), _Upload_instances = new WeakSet(), _Upload_resetLocalBuffersCache = function _Upload_resetLocalBuffersCache() {
829 this.localWriteCache = [];
830 this.localWriteCacheByteLength = 0;
831}, _Upload_addLocalBufferCache = function _Upload_addLocalBufferCache(buf) {
832 this.localWriteCache.push(buf);
833 this.localWriteCacheByteLength += buf.byteLength;
834};
835export function upload(cfg) {
836 return new Upload(cfg);
837}
838export function createURI(cfg, callback) {
839 const up = new Upload(cfg);
840 if (!callback) {
841 return up.createURI();
842 }
843 up.createURI().then(r => callback(null, r), callback);
844}
845/**
846 * Check the status of an existing resumable upload.
847 *
848 * @param cfg A configuration to use. `uri` is required.
849 * @returns the current upload status
850 */
851export function checkUploadStatus(cfg) {
852 const up = new Upload(cfg);
853 return up.checkUploadStatus();
854}