UNPKG

11.5 kBJavaScriptView Raw
1const { dir } = require('console');
2
3/**
4 * Created by Rodey on 2018/3/23.
5 */
6const util = require('../utils'),
7 extend = require('extend'),
8 SSH2 = require('ssh2').Client,
9 vfs = require('vinyl-fs'),
10 mps = require('map-stream'),
11 parents = require('parents'),
12 crypto = require('crypto'),
13 EventEmitter = require('events').EventEmitter,
14 T = require('../tools');
15
16class Uploader extends EventEmitter {
17 constructor(localPath, remotePath, authentications) {
18 super();
19 this.localPath = localPath;
20 this.remotePath = remotePath;
21 this.auths = authentications;
22 this.ssh2 = new SSH2();
23 this.sftp = null;
24 this.mkDirCache = {};
25 this.finished = false;
26 this.fileCount = 0;
27 this.modCount = 0;
28 this.platform = 'unix';
29 this.host = this.auths.host;
30 // 上传方式 increment: 增量; full: 全量
31 this.type = 'full';
32 this.errCaches = {};
33 }
34
35 init() {}
36
37 setType(type) {
38 type && (this.type = type);
39 // 是否增量
40 this._isIncrementType = this.type === 'increment';
41 // 是否全量
42 this._isFullType = this.type === 'full';
43 }
44
45 start() {
46 if (!this.host && !util.isString(this.host)) {
47 return T.log.error(`× server host is undefined`);
48 }
49 // if (!this.auths.user) {
50 // return T.log.error(`× server connect username is undefined`);
51 // }
52 // if (!this.auths.pass) {
53 // return T.log.error(`× server connect password is undefined`);
54 // }
55 if (this.sftp) return this.startUpload();
56
57 let message;
58 if (this.auths.password) {
59 message = T.msg.gray(`→ [${this.host}] Authenticating with password.`);
60 } else if (this.auths.key) {
61 message = T.msg.gray(`→ [${this.host}] Authenticating with private key.`);
62 }
63
64 this.emit('start', { message, uploader: this });
65
66 this.ssh2.on('ready', this._onReady.bind(this));
67 this.ssh2.on('error', this._onError.bind(this));
68 this.ssh2.on('end', this._onEnd.bind(this));
69 this.ssh2.on('close', this._onClose.bind(this));
70 this.ssh2.connect(this.auths);
71 }
72
73 stop() {
74 this.sftp && this.sftp.end();
75 this.ssh2 && this.ssh2.end();
76 }
77
78 startUpload() {
79 this.time = Date.now();
80 // 读取路径
81 let stream = vfs
82 .src(this.localPath)
83 // 1 获取所有目录,并判断远程服务器是否存在,不存在则创建
84 .pipe(mps(this.checkDir.bind(this)));
85
86 stream.on('close', () => {
87 vfs
88 .src(this.localPath)
89 // 2 开始上传文件
90 .pipe(mps(this.onUpload.bind(this)))
91 .on('close', () => {
92 const duration = Date.now() - this.time;
93 const status = `
94 server info: host: ${this.host}; user: ${this.auths.username}
95 local path: ${this.localPath}
96 remote path: ${this.remotePath}
97 total files: ${this.fileCount}
98 modified files: ${this.fileCount - this.modCount}
99 uploaded files: ${this.fileCount - this.modCount}\n`;
100 this.emit('done', { fileCount: this.fileCount, modCount: this.modCount, status, duration, uploader: this });
101 this.fileCount = 0;
102 this.modCount = 0;
103 this.errCaches = {};
104 });
105 });
106 }
107
108 // 检查路径
109 checkDir(file, next) {
110 let stat = T.fs.statSync(file.path);
111 if (!stat.isDirectory()) {
112 return next(null, file);
113 }
114 let remotePath = this._getRemotePath(file);
115 let dirname = Uploader._normalizePath(T.Path.join(remotePath, ''));
116
117 let directories = parents(dirname)
118 .map(d => {
119 return d.replace(/^\/~/, '~');
120 })
121 .map(Uploader._normalizePath);
122
123 /^\//.test(dirname) && (directories = this._fixPrefix(directories || []));
124
125 directories = directories.filter(d => d !== this._getRelativePath(this.remotePath));
126 directories = directories.filter(d => {
127 return d.length >= this.remotePath.length && !this.mkDirCache[d];
128 });
129
130 while (directories.length >= 0) {
131 if (directories.length === 0) return next(null, file);
132 let dir = directories.pop();
133 if (dir) {
134 this.mkDirCache[dir] = true;
135 this.hasExists(dir, isExists => {
136 !isExists && this.mkdirectory(dir);
137 });
138 }
139 }
140 }
141
142 hasExists(dir, cb) {
143 this.sftp.exists(dir, isExists => {
144 cb && util.isFunction(cb) && cb.call(this, isExists, dir);
145 });
146 }
147
148 mkdirectory(dir, cb) {
149 this.sftp.mkdir(dir, { mode: '0777' }, err => {
150 let message;
151 if (err) {
152 message = T.msg.red(`× [${T.getTime()}] '${dir}' mkdir Failed`);
153 this.emit('error', new Error(message, err));
154 T.log.error(message);
155 } else {
156 message = T.msg.green(`√ [${T.getTime()}] '${dir}' mkdir Successfully`);
157 this.emit('uploaded', { message, directory: dir });
158 cb && util.isFunction(cb) && cb.call(this, dir);
159 }
160 });
161 }
162
163 onUpload(file, next) {
164 let stat = T.fs.statSync(file.path);
165 if (stat.isDirectory()) {
166 return next(null, file);
167 }
168
169 let remotePath = this._getRemotePath(file);
170
171 // 全量上传
172 if (this._isFullType) {
173 return this._uploader(file, next, remotePath);
174 }
175
176 // 增量上传
177 this._isIncrementType &&
178 this.sftp.exists(remotePath, isExists => {
179 if (!isExists) {
180 this._uploader(file, next, remotePath);
181 } else {
182 this._downloader(file, next, remotePath);
183 }
184 });
185 }
186
187 _downloader(file, next, realPath) {
188 const stat = T.fs.statSync(file.path);
189 const fileContent = T.getFileContent(file.path);
190 const filename = T.Path.basename(realPath);
191 const localHash = crypto
192 .createHash('md5')
193 .update(fileContent)
194 .digest('hex');
195
196 const stream = this.sftp.createReadStream(realPath, {
197 flags: 'r',
198 encoding: null,
199 mode: '0666',
200 autoClose: true
201 });
202
203 let downloadBytes = '';
204
205 stream.on('data', chunk => {
206 downloadBytes += chunk;
207 });
208
209 stream.on('close', err => {
210 const remoteHash = crypto
211 .createHash('md5')
212 .update(downloadBytes)
213 .digest('hex');
214
215 let message;
216
217 if (localHash !== remoteHash) {
218 message = T.msg.green(`√ [${T.getTime()}] check hashContent ( Modified ), filename: '${filename}' `);
219 this.emit('uploaded', { message, file, realPath, size: file.stat.size });
220 this._uploader(file, next, realPath);
221 } else {
222 this.fileCount++;
223 this.modCount++;
224 message = T.msg.green(`√ [${T.getTime()}] check hashContent ( No Modify ), filename: '${filename}' `);
225 this.emit('uploaded', { message, file, realPath, size: file.stat.size });
226 next(null, file);
227 }
228 });
229 }
230
231 _uploader(file, next, realPath) {
232 let stream = this.sftp.createWriteStream(realPath, {
233 flags: 'w',
234 encoding: null,
235 mode: '0666',
236 autoClose: true
237 });
238
239 let uploadedBytes = 0;
240 let highWaterMark = stream.highWaterMark || 16 * 1000;
241 let size = file.stat.size;
242 this.errCaches[realPath] = false;
243
244 file.pipe(stream);
245
246 stream.on('drain', () => {
247 uploadedBytes += highWaterMark;
248 this.emit('upload_progress', { file, realPath, total: size, uploaded: uploadedBytes });
249 });
250
251 stream.on('error', err => {
252 this.errCaches[realPath] = true;
253 const message = T.msg.red(`× '${realPath}' upload error: ${err.message}`);
254 this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(${message}: ${err.message}`) });
255 });
256
257 stream.on('close', err => {
258 if (err || this.errCaches[realPath]) {
259 const message = `${realPath} is upload fail`;
260 this.emit('error', new Error(message, err));
261 this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(${message}: ${err.message}`) });
262 } else {
263 this.fileCount++;
264 this.emit('uploaded', { message: this._uploadedText(realPath, size), file, realPath, size });
265 }
266 next(null, file);
267 });
268 }
269
270 _execMkdir(dir, cb) {
271 cb = (util.isFunction(cb) && cb) || function() {};
272
273 let dirs = parents(dir)
274 .filter(dir => ['.', '..'].indexOf(dir) < 0)
275 .filter(dir => !this.mkDirCache[dir])
276 .filter(dir => !!dir)
277 .map(d => this._getRelativePath(d));
278
279 /^\//.test(dir) && (dirs = this._fixPrefix(dirs || []));
280
281 while (dirs && dirs.length >= 0) {
282 if (dirs.length === 0) return cb.call(this);
283 const d = dirs.pop();
284 this.mkDirCache[d] = true;
285 this.hasExists(d, isD => {
286 !isD && this.mkdirectory(d);
287 });
288 }
289 }
290
291 _onReady() {
292 this.ssh2.sftp((err, sftp) => {
293 if (err) T.log.error(err.message);
294 // T.log.gray(`√ [${this.host}] SFTP Ready Successfully`);
295 sftp.on('end', () => {});
296
297 this.sftp = sftp;
298 const remotePath = this._getRelativePath(this.remotePath);
299 this._execMkdir(remotePath, this.startUpload);
300 });
301 }
302
303 _onError(err) {
304 T.log.error(err.message);
305 }
306
307 _onEnd() {
308 // T.log.gray(`^ [${this.host}] Connection end `);
309 }
310
311 _onClose() {
312 // T.log.gray(`√ [${this.host}] Connection Closed`);
313 }
314
315 _uploadedText(realPath, size) {
316 return `√ [${T.getTime()}] uploaded '${realPath}', ${T.msg.yellow(size / 1000 + ' kb')}`;
317 }
318
319 _getRemotePath(file) {
320 let remotePath = T.Path.join(this.remotePath, file.relative);
321 return this._getRelativePath(remotePath);
322 }
323
324 _getRelativePath(path) {
325 if (this.platform.toLowerCase() === 'win') {
326 path = path.replace(/\//gi, '\\');
327 } else {
328 path = path.replace(/(\\{1,2}|\/)/gi, '/');
329 }
330 return path;
331 }
332
333 _fixPrefix(dirs) {
334 dirs = dirs.map((d) => /^\//.test(d) ? d : '/' + d);
335 return dirs;
336 }
337
338 static _normalizePath(path) {
339 return path.replace(/\\/g, '/');
340 }
341}
342
343module.exports = Uploader;