UNPKG

11.3 kBJavaScriptView Raw
1/**
2 * Created by Rodey on 2018/3/23.
3 */
4const util = require('../utils'),
5 extend = require('extend'),
6 FTP = require('ftp'),
7 vfs = require('vinyl-fs'),
8 mps = require('map-stream'),
9 parents = require('parents'),
10 crypto = require('crypto'),
11 EventEmitter = require('events').EventEmitter,
12 T = require('../tools');
13
14class UploaderFTP extends EventEmitter {
15 constructor(localPath, remotePath, authentications) {
16 super();
17 this.localPath = localPath;
18 this.remotePath = remotePath;
19 this.auths = authentications;
20 this.auths.port = 21;
21 this.auths.connTimeout = this.auths.pasvTimeout = this.auths.keepalive = this.auths.timeout;
22 this.ftp = new FTP();
23 this.mkDirCache = {};
24 this.finished = false;
25 this.fileCount = 0;
26 this.modCount = 0;
27 this.platform = 'unix';
28 this.host = this.auths.host;
29 // 上传方式 increment: 增量; full: 全量
30 this.type = 'full';
31 }
32
33 init() {}
34
35 setType(type) {
36 type && (this.type = type);
37 // 是否增量
38 this._isIncrementType = this.type === 'increment';
39 // 是否全量
40 this._isFullType = this.type === 'full';
41 }
42
43 start() {
44 if (!this.host && !util.isString(this.host)) {
45 return T.log.error(`× server host is undefined`);
46 }
47 if (!this.auths.user) {
48 return T.log.error(`× server connect username is undefined`);
49 }
50 if (!this.auths.pass) {
51 return T.log.error(`× server connect password is undefined`);
52 }
53 if (this.ftp) return this.startUpload();
54
55 let message;
56 if (this.auths.password) {
57 message = T.msg.gray(`→ [${this.host}] Authenticating with password.`);
58 }
59
60 this.emit('start', { message, uploader: this });
61
62 this.ftp.on('ready', this._onReady.bind(this));
63 this.ftp.on('error', this._onError.bind(this));
64 this.ftp.on('end', this._onEnd.bind(this));
65 this.ftp.on('close', this._onClose.bind(this));
66 this.ftp.connect(this.auths);
67 }
68
69 stop() {
70 this.sftp && this.sftp.end();
71 this.ssh2 && this.ssh2.end();
72 }
73
74 startUpload() {
75 this.time = Date.now();
76 // 读取路径
77 let stream = vfs
78 .src(this.localPath)
79 // 1 获取所有目录,并判断远程服务器是否存在,不存在则创建
80 .pipe(mps(this.checkDir.bind(this)));
81
82 stream.on('close', () => {
83 vfs
84 .src(this.localPath)
85 // 2 开始上传文件
86 .pipe(mps(this.onUpload.bind(this)))
87 .on('close', () => {
88 const duration = Date.now() - this.time;
89 const status = `
90 server info: host: ${this.host}; user: ${this.auths.username}
91 local path: ${this.localPath}
92 remote path: ${this.remotePath}
93 total files: ${this.fileCount}
94 modified files: ${this.fileCount - this.modCount}
95 uploaded files: ${this.fileCount - this.modCount}\n`;
96 this.emit('done', { fileCount: this.fileCount, modCount: this.modCount, status, duration, uploader: this });
97 this.fileCount = 0;
98 this.modCount = 0;
99 });
100 });
101 }
102
103 // 检查路径
104 checkDir(file, next) {
105 let stat = T.fs.statSync(file.path);
106 if (!stat.isDirectory()) {
107 return next(null, file);
108 }
109 let remotePath = this._getRemotePath(file);
110 let dirname = Uploader._normalizePath(T.Path.join(remotePath, ''));
111
112 let directories = parents(dirname)
113 .map(d => {
114 return d.replace(/^\/~/, '~');
115 })
116 .map(Uploader._normalizePath);
117
118 if (dirname.search(/^\//) === 0) {
119 directories = directories.map(d => {
120 if (d.search(/^\//) === 0) {
121 return d;
122 }
123 return '/' + d;
124 });
125 }
126
127 directories = directories.filter(d => d !== this._getRelativePath(this.remotePath));
128 directories = directories.filter(d => {
129 return d.length >= this.remotePath.length && !this.mkDirCache[d];
130 });
131
132 while (directories.length >= 0) {
133 if (directories.length === 0) return next(null, file);
134 let dir = directories.pop();
135 if (dir) {
136 this.mkDirCache[dir] = true;
137 this.hasExists(dir, isExists => {
138 !isExists && this.mkdirectory(dir);
139 });
140 }
141 }
142 }
143
144 hasExists(dir, cb) {
145 this.sftp.exists(dir, isExists => {
146 cb && util.isFunction(cb) && cb.call(this, isExists, dir);
147 });
148 }
149
150 mkdirectory(dir, cb) {
151 this.sftp.mkdir(dir, { mode: '0755' }, err => {
152 let message;
153 if (err) {
154 message = T.msg.red(`× [${T.getTime()}] '${dir}' mkdir Failed`);
155 this.emit('error', new Error(message, err));
156 T.log.error(message);
157 } else {
158 message = T.msg.green(`√ [${T.getTime()}] '${dir}' mkdir Successfully`);
159 this.emit('uploaded', { message, directory: dir });
160 cb && util.isFunction(cb) && cb.call(this, dir);
161 }
162 });
163 }
164
165 onUpload(file, next) {
166 let stat = T.fs.statSync(file.path);
167 if (stat.isDirectory()) {
168 return next(null, file);
169 }
170
171 let remotePath = this._getRemotePath(file);
172
173 // 全量上传
174 if (this._isFullType) {
175 return this._uploader(file, next, remotePath);
176 }
177
178 // 增量上传
179 this._isIncrementType &&
180 this.sftp.exists(remotePath, isExists => {
181 if (!isExists) {
182 this._uploader(file, next, remotePath);
183 } else {
184 this._downloader(file, next, remotePath);
185 }
186 });
187 }
188
189 _downloader(file, next, realPath) {
190 const stat = T.fs.statSync(file.path);
191 const fileContent = T.getFileContent(file.path);
192 const filename = T.Path.basename(realPath);
193 const localHash = crypto
194 .createHash('md5')
195 .update(fileContent)
196 .digest('hex');
197
198 const stream = this.sftp.createReadStream(realPath, {
199 flags: 'r',
200 encoding: null,
201 mode: '0666',
202 autoClose: true
203 });
204
205 let downloadBytes = '';
206
207 stream.on('data', chunk => {
208 downloadBytes += chunk;
209 });
210
211 stream.on('close', err => {
212 const remoteHash = crypto
213 .createHash('md5')
214 .update(downloadBytes)
215 .digest('hex');
216
217 let message;
218
219 if (localHash !== remoteHash) {
220 message = T.msg.green(`√ [${T.getTime()}] check hashContent ( Modified ), filename: '${filename}' `);
221 this.emit('uploaded', { message, file, realPath, size: file.stat.size });
222 this._uploader(file, next, realPath);
223 } else {
224 this.fileCount++;
225 this.modCount++;
226 message = T.msg.green(`√ [${T.getTime()}] check hashContent ( No Modify ), filename: '${filename}' `);
227 this.emit('uploaded', { message, file, realPath, size: file.stat.size });
228 next(null, file);
229 }
230 });
231 }
232
233 _uploader(file, next, realPath) {
234 let stream = this.sftp.createWriteStream(realPath, {
235 flags: 'w',
236 encoding: null,
237 mode: '0666',
238 autoClose: true
239 });
240
241 let uploadedBytes = 0;
242 let highWaterMark = stream.highWaterMark || 16 * 1000;
243 let size = file.stat.size;
244
245 file.pipe(stream);
246
247 stream.on('drain', () => {
248 uploadedBytes += highWaterMark;
249 this.emit('upload_progress', { file, realPath, total: size, uploaded: uploadedBytes });
250 });
251
252 stream.on('error', err => {
253 const message = T.msg.red(`× '${realPath}' upload error`);
254 this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(${message}: ${err.message}`) });
255 });
256
257 stream.on('close', err => {
258 if (err) {
259 const message = `${realPath} is upload fail`;
260 this.emit('error', new Error(message, err));
261 this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(${message}: ${err.message}`) });
262 } else {
263 this.fileCount++;
264 this.emit('uploaded', { message: this._uploadedText(realPath, size), file, realPath, size });
265 }
266 next(null, file);
267 });
268 }
269
270 _execMkdir(dir, cb) {
271 cb = (util.isFunction(cb) && cb) || function() {};
272
273 const dirs = parents(dir)
274 .filter(dir => ['.', '..'].indexOf(dir) < 0)
275 .filter(dir => !this.mkDirCache[dir])
276 .filter(dir => !!dir)
277 .map(d => this._getRelativePath(d));
278
279 while (dirs && dirs.length >= 0) {
280 if (dirs.length === 0) return cb.call(this);
281 const d = dirs.pop();
282 this.mkDirCache[d] = true;
283 this.hasExists(d, isD => {
284 !isD && this.mkdirectory(d);
285 });
286 }
287 }
288
289 _onReady() {
290 this.ssh2.sftp((err, sftp) => {
291 if (err) T.log.error(err.message);
292 // T.log.gray(`√ [${this.host}] SFTP Ready Successfully`);
293 sftp.on('end', () => {});
294
295 this.sftp = sftp;
296 const remotePath = this._getRelativePath(this.remotePath);
297 this._execMkdir(remotePath, this.startUpload);
298 });
299 }
300
301 _onError(err) {
302 T.log.error(err.message);
303 }
304
305 _onEnd() {
306 // T.log.gray(`^ [${this.host}] Connection end `);
307 }
308
309 _onClose() {
310 // T.log.gray(`√ [${this.host}] Connection Closed`);
311 }
312
313 _uploadedText(realPath, size) {
314 return `√ [${T.getTime()}] uploaded '${realPath}', ${T.msg.yellow(size / 1000 + ' kb')}`;
315 }
316
317 _getRemotePath(file) {
318 let remotePath = T.Path.join(this.remotePath, file.relative);
319 return this._getRelativePath(remotePath);
320 }
321
322 _getRelativePath(path) {
323 if (this.platform.toLowerCase() === 'win') {
324 path = path.replace(/\//gi, '\\');
325 } else {
326 path = path.replace(/(\\{1,2}|\/)/gi, '/');
327 }
328 return path;
329 }
330
331 static _normalizePath(path) {
332 return path.replace(/\\/g, '/');
333 }
334}
335
336module.exports = UploaderFTP;