UNPKG

10.3 kBJavaScriptView Raw
1
2const fs = require('fs');
3const is = require('is-type-of');
4const util = require('util');
5const path = require('path');
6const mime = require('mime');
7const { isFile } = require('./common/utils/isFile');
8const { isArray } = require('./common/utils/isArray');
9const { isBuffer } = require('./common/utils/isBuffer');
10
11const proto = exports;
12
13/**
14 * Multipart operations
15 */
16
17/**
18 * Upload a file to OSS using multipart uploads
19 * @param {String} name
20 * @param {String|File|Buffer} file
21 * @param {Object} options
22 * {Object} options.callback The callback parameter is composed of a JSON string encoded in Base64
23 * {String} options.callback.url the OSS sends a callback request to this URL
24 * {String} options.callback.host The host header value for initiating callback requests
25 * {String} options.callback.body The value of the request body when a callback is initiated
26 * {String} options.callback.contentType The Content-Type of the callback requests initiatiated
27 * {Object} options.callback.customValue Custom parameters are a map of key-values, e.g:
28 * customValue = {
29 * key1: 'value1',
30 * key2: 'value2'
31 * }
32 */
33proto.multipartUpload = async function multipartUpload(name, file, options) {
34 this.resetCancelFlag();
35 options = options || {};
36 if (options.checkpoint && options.checkpoint.uploadId) {
37 return await this._resumeMultipart(options.checkpoint, options);
38 }
39
40 const minPartSize = 100 * 1024;
41 if (!options.mime) {
42 if (isFile(file)) {
43 options.mime = mime.getType(path.extname(file.name));
44 } else if (isBuffer(file)) {
45 options.mime = '';
46 } else {
47 options.mime = mime.getType(path.extname(file));
48 }
49 }
50 options.headers = options.headers || {};
51 this._convertMetaToHeaders(options.meta, options.headers);
52
53 const fileSize = await this._getFileSize(file);
54 if (fileSize < minPartSize) {
55 const stream = this._createStream(file, 0, fileSize);
56 options.contentLength = fileSize;
57
58 const result = await this.putStream(name, stream, options);
59 if (options && options.progress) {
60 await options.progress(1);
61 }
62
63 const ret = {
64 res: result.res,
65 bucket: this.options.bucket,
66 name,
67 etag: result.res.headers.etag
68 };
69
70 if ((options.headers && options.headers['x-oss-callback']) || options.callback) {
71 ret.data = result.data;
72 }
73
74 return ret;
75 }
76
77 if (options.partSize && !(parseInt(options.partSize, 10) === options.partSize)) {
78 throw new Error('partSize must be int number');
79 }
80
81 if (options.partSize && options.partSize < minPartSize) {
82 throw new Error(`partSize must not be smaller than ${minPartSize}`);
83 }
84
85 const initResult = await this.initMultipartUpload(name, options);
86 const { uploadId } = initResult;
87 const partSize = this._getPartSize(fileSize, options.partSize);
88
89 const checkpoint = {
90 file,
91 name,
92 fileSize,
93 partSize,
94 uploadId,
95 doneParts: []
96 };
97
98 if (options && options.progress) {
99 await options.progress(0, checkpoint, initResult.res);
100 }
101
102 return await this._resumeMultipart(checkpoint, options);
103};
104
105/*
106 * Resume multipart upload from checkpoint. The checkpoint will be
107 * updated after each successful part upload.
108 * @param {Object} checkpoint the checkpoint
109 * @param {Object} options
110 */
111proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
112 if (this.isCancel()) {
113 throw this._makeCancelEvent();
114 }
115 const {
116 file, fileSize, partSize, uploadId, doneParts, name
117 } = checkpoint;
118
119 const partOffs = this._divideParts(fileSize, partSize);
120 const numParts = partOffs.length;
121
122 let uploadPartJob = function uploadPartJob(self, partNo) {
123 // eslint-disable-next-line no-async-promise-executor
124 return new Promise(async (resolve, reject) => {
125 try {
126 if (!self.isCancel()) {
127 const pi = partOffs[partNo - 1];
128 const stream = self._createStream(file, pi.start, pi.end);
129 const data = {
130 stream,
131 size: pi.end - pi.start
132 };
133
134 if (isArray(self.multipartUploadStreams)) {
135 self.multipartUploadStreams.push(data.stream);
136 } else {
137 self.multipartUploadStreams = [data.stream];
138 }
139
140 const removeStreamFromMultipartUploadStreams = function () {
141 if (!stream.destroyed) {
142 stream.destroy();
143 }
144 const index = self.multipartUploadStreams.indexOf(stream);
145 if (index !== -1) {
146 self.multipartUploadStreams.splice(index, 1);
147 }
148 };
149
150 stream.on('close', removeStreamFromMultipartUploadStreams);
151 stream.on('error', removeStreamFromMultipartUploadStreams);
152
153 let result;
154 try {
155 result = await self._uploadPart(name, uploadId, partNo, data);
156 } catch (error) {
157 removeStreamFromMultipartUploadStreams();
158 if (error.status === 404) {
159 throw self._makeAbortEvent();
160 }
161 throw error;
162 }
163 if (!self.isCancel()) {
164 doneParts.push({
165 number: partNo,
166 etag: result.res.headers.etag
167 });
168 checkpoint.doneParts = doneParts;
169
170 if (options.progress) {
171 await options.progress(doneParts.length / numParts, checkpoint, result.res);
172 }
173 }
174 }
175 resolve();
176 } catch (err) {
177 err.partNum = partNo;
178 reject(err);
179 }
180 });
181 };
182
183 const all = Array.from(new Array(numParts), (x, i) => i + 1);
184 const done = doneParts.map(p => p.number);
185 const todo = all.filter(p => done.indexOf(p) < 0);
186
187 const defaultParallel = 5;
188 const parallel = options.parallel || defaultParallel;
189
190 if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
191 for (let i = 0; i < todo.length; i++) {
192 if (this.isCancel()) {
193 throw this._makeCancelEvent();
194 }
195 /* eslint no-await-in-loop: [0] */
196 await uploadPartJob(this, todo[i]);
197 }
198 } else {
199 // upload in parallel
200 const jobErr = await this._parallelNode(todo, parallel, uploadPartJob);
201
202 const abortEvent = jobErr.find(err => err.name === 'abort');
203 if (abortEvent) throw abortEvent;
204
205 if (this.isCancel()) {
206 uploadPartJob = null;
207 throw this._makeCancelEvent();
208 }
209
210 if (jobErr && jobErr.length > 0) {
211 jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${jobErr[0].partNum}`;
212 throw jobErr[0];
213 }
214 }
215
216 return await this.completeMultipartUpload(name, uploadId, doneParts, options);
217};
218
219/**
220 * Get file size
221 */
222proto._getFileSize = async function _getFileSize(file) {
223 if (isBuffer(file)) {
224 return file.length;
225 } else if (isFile(file)) {
226 return file.size;
227 } if (is.string(file)) {
228 const stat = await this._statFile(file);
229 return stat.size;
230 }
231
232 throw new Error('_getFileSize requires Buffer/File/String.');
233};
234
235/*
236 * Readable stream for Web File
237 */
238const { Readable } = require('stream');
239
240function WebFileReadStream(file, options) {
241 if (!(this instanceof WebFileReadStream)) {
242 return new WebFileReadStream(file, options);
243 }
244
245 Readable.call(this, options);
246
247 this.file = file;
248 this.reader = new FileReader();
249 this.start = 0;
250 this.finish = false;
251 this.fileBuffer = null;
252}
253util.inherits(WebFileReadStream, Readable);
254
255WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) {
256 if (this.fileBuffer) {
257 let pushRet = true;
258 while (pushRet && this.fileBuffer && this.start < this.fileBuffer.length) {
259 const { start } = this;
260 let end = start + size;
261 end = end > this.fileBuffer.length ? this.fileBuffer.length : end;
262 this.start = end;
263 pushRet = this.push(this.fileBuffer.slice(start, end));
264 }
265 }
266};
267
268WebFileReadStream.prototype._read = function _read(size) {
269 if ((this.file && this.start >= this.file.size) ||
270 (this.fileBuffer && this.start >= this.fileBuffer.length) ||
271 (this.finish) || (this.start === 0 && !this.file)) {
272 if (!this.finish) {
273 this.fileBuffer = null;
274 this.finish = true;
275 }
276 this.push(null);
277 return;
278 }
279
280 const defaultReadSize = 16 * 1024;
281 size = size || defaultReadSize;
282
283 const that = this;
284 this.reader.onload = function (e) {
285 that.fileBuffer = Buffer.from(new Uint8Array(e.target.result));
286 that.file = null;
287 that.readFileAndPush(size);
288 };
289 this.reader.onerror = function onload(e) {
290 const error = e.srcElement && e.srcElement.error;
291 if (error) {
292 throw error;
293 }
294 throw e;
295 };
296
297 if (this.start === 0) {
298 this.reader.readAsArrayBuffer(this.file);
299 } else {
300 this.readFileAndPush(size);
301 }
302};
303
304proto._createStream = function _createStream(file, start, end) {
305 if (is.readableStream(file)) {
306 return file;
307 } else if (isFile(file)) {
308 return new WebFileReadStream(file.slice(start, end));
309 } else if (isBuffer(file)) {
310 const iterable = file.subarray(start, end);
311 // we can't use Readable.from() since it is only support in Node v10
312 return new Readable({
313 read() {
314 this.push(iterable);
315 this.push(null);
316 }
317 });
318 } else if (is.string(file)) {
319 return fs.createReadStream(file, {
320 start,
321 end: end - 1
322 });
323 }
324 throw new Error('_createStream requires Buffer/File/String.');
325};
326
327proto._getPartSize = function _getPartSize(fileSize, partSize) {
328 const maxNumParts = 10 * 1000;
329 const defaultPartSize = 1 * 1024 * 1024;
330
331 if (!partSize) partSize = defaultPartSize;
332 const safeSize = Math.ceil(fileSize / maxNumParts);
333
334 if (partSize < safeSize) {
335 partSize = safeSize;
336 console.warn(`partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000`);
337 }
338 return partSize;
339};
340
341proto._divideParts = function _divideParts(fileSize, partSize) {
342 const numParts = Math.ceil(fileSize / partSize);
343
344 const partOffs = [];
345 for (let i = 0; i < numParts; i++) {
346 const start = partSize * i;
347 const end = Math.min(start + partSize, fileSize);
348
349 partOffs.push({
350 start,
351 end
352 });
353 }
354
355 return partOffs;
356};