UNPKG

11.5 kBJavaScriptView Raw
1/**
2 * Created by Rodey on 2018/3/23.
3 */
4const util = require('../utils'),
5 extend = require('extend'),
6 SSH2 = require('ssh2').Client,
7 vfs = require('vinyl-fs'),
8 mps = require('map-stream'),
9 parents = require('parents'),
10 crypto = require('crypto'),
11 EventEmitter = require('events').EventEmitter,
12 T = require('../tools');
13
14class Uploader extends EventEmitter {
15 constructor(localPath, remotePath, authentications) {
16 super();
17 this.localPath = localPath;
18 this.remotePath = remotePath;
19 this.auths = authentications;
20 this.ssh2 = new SSH2();
21 this.sftp = null;
22 this.mkDirCache = {};
23 this.finished = false;
24 this.fileCount = 0;
25 this.modCount = 0;
26 this.platform = 'unix';
27 this.host = this.auths.host;
28 // 上传方式 increment: 增量; full: 全量
29 this.type = 'full';
30 this.errCaches = {};
31 }
32
33 init() {}
34
35 setType(type) {
36 type && (this.type = type);
37 // 是否增量
38 this._isIncrementType = this.type === 'increment';
39 // 是否全量
40 this._isFullType = this.type === 'full';
41 }
42
43 start() {
44 if (!this.host && !util.isString(this.host)) {
45 return T.log.error(`× server host is undefined`);
46 }
47 // if (!this.auths.user) {
48 // return T.log.error(`× server connect username is undefined`);
49 // }
50 // if (!this.auths.pass) {
51 // return T.log.error(`× server connect password is undefined`);
52 // }
53 if (this.sftp) return this.startUpload();
54
55 let message;
56 if (this.auths.password) {
57 message = T.msg.gray(`→ [${this.host}] Authenticating with password.`);
58 } else if (this.auths.key) {
59 message = T.msg.gray(`→ [${this.host}] Authenticating with private key.`);
60 }
61
62 this.emit('start', { message, uploader: this });
63
64 this.ssh2.on('ready', this._onReady.bind(this));
65 this.ssh2.on('error', this._onError.bind(this));
66 this.ssh2.on('end', this._onEnd.bind(this));
67 this.ssh2.on('close', this._onClose.bind(this));
68 this.ssh2.connect(this.auths);
69 }
70
71 stop() {
72 this.sftp && this.sftp.end();
73 this.ssh2 && this.ssh2.end();
74 }
75
76 startUpload() {
77 this.time = Date.now();
78 // 读取路径
79 let stream = vfs
80 .src(this.localPath)
81 // 1 获取所有目录,并判断远程服务器是否存在,不存在则创建
82 .pipe(mps(this.checkDir.bind(this)));
83
84 stream.on('close', () => {
85 vfs
86 .src(this.localPath)
87 // 2 开始上传文件
88 .pipe(mps(this.onUpload.bind(this)))
89 .on('close', () => {
90 const duration = Date.now() - this.time;
91 const status = `
92 server info: host: ${this.host}; user: ${this.auths.username}
93 local path: ${this.localPath}
94 remote path: ${this.remotePath}
95 total files: ${this.fileCount}
96 modified files: ${this.fileCount - this.modCount}
97 uploaded files: ${this.fileCount - this.modCount}\n`;
98 this.emit('done', { fileCount: this.fileCount, modCount: this.modCount, status, duration, uploader: this });
99 this.fileCount = 0;
100 this.modCount = 0;
101 this.errCaches = {};
102 });
103 });
104 }
105
106 // 检查路径
107 checkDir(file, next) {
108 let stat = T.fs.statSync(file.path);
109 if (!stat.isDirectory()) {
110 return next(null, file);
111 }
112 let remotePath = this._getRemotePath(file);
113 let dirname = Uploader._normalizePath(T.Path.join(remotePath, ''));
114
115 let directories = parents(dirname)
116 .map(d => {
117 return d.replace(/^\/~/, '~');
118 })
119 .map(Uploader._normalizePath);
120
121 if (dirname.search(/^\//) === 0) {
122 directories = directories.map(d => {
123 if (d.search(/^\//) === 0) {
124 return d;
125 }
126 return '/' + d;
127 });
128 }
129
130 directories = directories.filter(d => d !== this._getRelativePath(this.remotePath));
131 directories = directories.filter(d => {
132 return d.length >= this.remotePath.length && !this.mkDirCache[d];
133 });
134
135 while (directories.length >= 0) {
136 if (directories.length === 0) return next(null, file);
137 let dir = directories.pop();
138 if (dir) {
139 this.mkDirCache[dir] = true;
140 this.hasExists(dir, isExists => {
141 !isExists && this.mkdirectory(dir);
142 });
143 }
144 }
145 }
146
147 hasExists(dir, cb) {
148 this.sftp.exists(dir, isExists => {
149 cb && util.isFunction(cb) && cb.call(this, isExists, dir);
150 });
151 }
152
153 mkdirectory(dir, cb) {
154 this.sftp.mkdir(dir, { mode: '0755' }, err => {
155 let message;
156 if (err) {
157 message = T.msg.red(`× [${T.getTime()}] '${dir}' mkdir Failed`);
158 this.emit('error', new Error(message, err));
159 T.log.error(message);
160 } else {
161 message = T.msg.green(`√ [${T.getTime()}] '${dir}' mkdir Successfully`);
162 this.emit('uploaded', { message, directory: dir });
163 cb && util.isFunction(cb) && cb.call(this, dir);
164 }
165 });
166 }
167
168 onUpload(file, next) {
169 let stat = T.fs.statSync(file.path);
170 if (stat.isDirectory()) {
171 return next(null, file);
172 }
173
174 let remotePath = this._getRemotePath(file);
175
176 // 全量上传
177 if (this._isFullType) {
178 return this._uploader(file, next, remotePath);
179 }
180
181 // 增量上传
182 this._isIncrementType &&
183 this.sftp.exists(remotePath, isExists => {
184 if (!isExists) {
185 this._uploader(file, next, remotePath);
186 } else {
187 this._downloader(file, next, remotePath);
188 }
189 });
190 }
191
192 _downloader(file, next, realPath) {
193 const stat = T.fs.statSync(file.path);
194 const fileContent = T.getFileContent(file.path);
195 const filename = T.Path.basename(realPath);
196 const localHash = crypto
197 .createHash('md5')
198 .update(fileContent)
199 .digest('hex');
200
201 const stream = this.sftp.createReadStream(realPath, {
202 flags: 'r',
203 encoding: null,
204 mode: '0666',
205 autoClose: true
206 });
207
208 let downloadBytes = '';
209
210 stream.on('data', chunk => {
211 downloadBytes += chunk;
212 });
213
214 stream.on('close', err => {
215 const remoteHash = crypto
216 .createHash('md5')
217 .update(downloadBytes)
218 .digest('hex');
219
220 let message;
221
222 if (localHash !== remoteHash) {
223 message = T.msg.green(`√ [${T.getTime()}] check hashContent ( Modified ), filename: '${filename}' `);
224 this.emit('uploaded', { message, file, realPath, size: file.stat.size });
225 this._uploader(file, next, realPath);
226 } else {
227 this.fileCount++;
228 this.modCount++;
229 message = T.msg.green(`√ [${T.getTime()}] check hashContent ( No Modify ), filename: '${filename}' `);
230 this.emit('uploaded', { message, file, realPath, size: file.stat.size });
231 next(null, file);
232 }
233 });
234 }
235
236 _uploader(file, next, realPath) {
237 let stream = this.sftp.createWriteStream(realPath, {
238 flags: 'w',
239 encoding: null,
240 mode: '0666',
241 autoClose: true
242 });
243
244 let uploadedBytes = 0;
245 let highWaterMark = stream.highWaterMark || 16 * 1000;
246 let size = file.stat.size;
247 this.errCaches[realPath] = false;
248
249 file.pipe(stream);
250
251 stream.on('drain', () => {
252 uploadedBytes += highWaterMark;
253 this.emit('upload_progress', { file, realPath, total: size, uploaded: uploadedBytes });
254 });
255
256 stream.on('error', err => {
257 this.errCaches[realPath] = true;
258 const message = T.msg.red(`× '${realPath}' upload error: ${err.message}`);
259 this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(${message}: ${err.message}`) });
260 });
261
262 stream.on('close', err => {
263 if (err || this.errCaches[realPath]) {
264 const message = `${realPath} is upload fail`;
265 this.emit('error', new Error(message, err));
266 this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(${message}: ${err.message}`) });
267 } else {
268 this.fileCount++;
269 this.emit('uploaded', { message: this._uploadedText(realPath, size), file, realPath, size });
270 }
271 next(null, file);
272 });
273 }
274
275 _execMkdir(dir, cb) {
276 cb = (util.isFunction(cb) && cb) || function() {};
277
278 const dirs = parents(dir)
279 .filter(dir => ['.', '..'].indexOf(dir) < 0)
280 .filter(dir => !this.mkDirCache[dir])
281 .filter(dir => !!dir)
282 .map(d => this._getRelativePath(d));
283
284 while (dirs && dirs.length >= 0) {
285 if (dirs.length === 0) return cb.call(this);
286 const d = dirs.pop();
287 this.mkDirCache[d] = true;
288 this.hasExists(d, isD => {
289 !isD && this.mkdirectory(d);
290 });
291 }
292 }
293
294 _onReady() {
295 this.ssh2.sftp((err, sftp) => {
296 if (err) T.log.error(err.message);
297 // T.log.gray(`√ [${this.host}] SFTP Ready Successfully`);
298 sftp.on('end', () => {});
299
300 this.sftp = sftp;
301 const remotePath = this._getRelativePath(this.remotePath);
302 this._execMkdir(remotePath, this.startUpload);
303 });
304 }
305
306 _onError(err) {
307 T.log.error(err.message);
308 }
309
310 _onEnd() {
311 // T.log.gray(`^ [${this.host}] Connection end `);
312 }
313
314 _onClose() {
315 // T.log.gray(`√ [${this.host}] Connection Closed`);
316 }
317
318 _uploadedText(realPath, size) {
319 return `√ [${T.getTime()}] uploaded '${realPath}', ${T.msg.yellow(size / 1000 + ' kb')}`;
320 }
321
322 _getRemotePath(file) {
323 let remotePath = T.Path.join(this.remotePath, file.relative);
324 return this._getRelativePath(remotePath);
325 }
326
327 _getRelativePath(path) {
328 if (this.platform.toLowerCase() === 'win') {
329 path = path.replace(/\//gi, '\\');
330 } else {
331 path = path.replace(/(\\{1,2}|\/)/gi, '/');
332 }
333 return path;
334 }
335
336 static _normalizePath(path) {
337 return path.replace(/\\/g, '/');
338 }
339}
340
341module.exports = Uploader;