1 |
|
2 |
|
3 |
|
4 | const util = require('../utils'),
|
5 | extend = require('extend'),
|
6 | FTP = require('ftp'),
|
7 | vfs = require('vinyl-fs'),
|
8 | mps = require('map-stream'),
|
9 | parents = require('parents'),
|
10 | crypto = require('crypto'),
|
11 | EventEmitter = require('events').EventEmitter,
|
12 | T = require('../tools');
|
13 |
|
14 | class UploaderFTP extends EventEmitter {
|
15 | constructor(localPath, remotePath, authentications) {
|
16 | super();
|
17 | this.localPath = localPath;
|
18 | this.remotePath = remotePath;
|
19 | this.auths = authentications;
|
20 | this.auths.port = 21;
|
21 | this.auths.connTimeout = this.auths.pasvTimeout = this.auths.keepalive = this.auths.timeout;
|
22 | this.ftp = new FTP();
|
23 | this.mkDirCache = {};
|
24 | this.finished = false;
|
25 | this.fileCount = 0;
|
26 | this.modCount = 0;
|
27 | this.platform = 'unix';
|
28 | this.host = this.auths.host;
|
29 |
|
30 | this.type = 'full';
|
31 | }
|
32 |
|
33 | init() {}
|
34 |
|
35 | setType(type) {
|
36 | type && (this.type = type);
|
37 |
|
38 | this._isIncrementType = this.type === 'increment';
|
39 |
|
40 | this._isFullType = this.type === 'full';
|
41 | }
|
42 |
|
43 | start() {
|
44 | if (!this.host && !util.isString(this.host)) {
|
45 | return T.log.error(`× server host is undefined`);
|
46 | }
|
47 | if (!this.auths.user) {
|
48 | return T.log.error(`× server connect username is undefined`);
|
49 | }
|
50 | if (!this.auths.pass) {
|
51 | return T.log.error(`× server connect password is undefined`);
|
52 | }
|
53 | if (this.ftp) return this.startUpload();
|
54 |
|
55 | let message;
|
56 | if (this.auths.password) {
|
57 | message = T.msg.gray(`→ [${this.host}] Authenticating with password.`);
|
58 | }
|
59 |
|
60 | this.emit('start', { message, uploader: this });
|
61 |
|
62 | this.ftp.on('ready', this._onReady.bind(this));
|
63 | this.ftp.on('error', this._onError.bind(this));
|
64 | this.ftp.on('end', this._onEnd.bind(this));
|
65 | this.ftp.on('close', this._onClose.bind(this));
|
66 | this.ftp.connect(this.auths);
|
67 | }
|
68 |
|
69 | stop() {
|
70 | this.sftp && this.sftp.end();
|
71 | this.ssh2 && this.ssh2.end();
|
72 | }
|
73 |
|
74 | startUpload() {
|
75 | this.time = Date.now();
|
76 |
|
77 | let stream = vfs
|
78 | .src(this.localPath)
|
79 |
|
80 | .pipe(mps(this.checkDir.bind(this)));
|
81 |
|
82 | stream.on('close', () => {
|
83 | vfs
|
84 | .src(this.localPath)
|
85 |
|
86 | .pipe(mps(this.onUpload.bind(this)))
|
87 | .on('close', () => {
|
88 | const duration = Date.now() - this.time;
|
89 | const status = `
|
90 | server info: host: ${this.host}; user: ${this.auths.username}
|
91 | local path: ${this.localPath}
|
92 | remote path: ${this.remotePath}
|
93 | total files: ${this.fileCount}
|
94 | modified files: ${this.fileCount - this.modCount}
|
95 | uploaded files: ${this.fileCount - this.modCount}\n`;
|
96 | this.emit('done', { fileCount: this.fileCount, modCount: this.modCount, status, duration, uploader: this });
|
97 | this.fileCount = 0;
|
98 | this.modCount = 0;
|
99 | });
|
100 | });
|
101 | }
|
102 |
|
103 |
|
104 | checkDir(file, next) {
|
105 | let stat = T.fs.statSync(file.path);
|
106 | if (!stat.isDirectory()) {
|
107 | return next(null, file);
|
108 | }
|
109 | let remotePath = this._getRemotePath(file);
|
110 | let dirname = Uploader._normalizePath(T.Path.join(remotePath, ''));
|
111 |
|
112 | let directories = parents(dirname)
|
113 | .map(d => {
|
114 | return d.replace(/^\/~/, '~');
|
115 | })
|
116 | .map(Uploader._normalizePath);
|
117 |
|
118 | if (dirname.search(/^\//) === 0) {
|
119 | directories = directories.map(d => {
|
120 | if (d.search(/^\//) === 0) {
|
121 | return d;
|
122 | }
|
123 | return '/' + d;
|
124 | });
|
125 | }
|
126 |
|
127 | directories = directories.filter(d => d !== this._getRelativePath(this.remotePath));
|
128 | directories = directories.filter(d => {
|
129 | return d.length >= this.remotePath.length && !this.mkDirCache[d];
|
130 | });
|
131 |
|
132 | while (directories.length >= 0) {
|
133 | if (directories.length === 0) return next(null, file);
|
134 | let dir = directories.pop();
|
135 | if (dir) {
|
136 | this.mkDirCache[dir] = true;
|
137 | this.hasExists(dir, isExists => {
|
138 | !isExists && this.mkdirectory(dir);
|
139 | });
|
140 | }
|
141 | }
|
142 | }
|
143 |
|
144 | hasExists(dir, cb) {
|
145 | this.sftp.exists(dir, isExists => {
|
146 | cb && util.isFunction(cb) && cb.call(this, isExists, dir);
|
147 | });
|
148 | }
|
149 |
|
150 | mkdirectory(dir, cb) {
|
151 | this.sftp.mkdir(dir, { mode: '0755' }, err => {
|
152 | let message;
|
153 | if (err) {
|
154 | message = T.msg.red(`× [${T.getTime()}] '${dir}' mkdir Failed`);
|
155 | this.emit('error', new Error(message, err));
|
156 | T.log.error(message);
|
157 | } else {
|
158 | message = T.msg.green(`√ [${T.getTime()}] '${dir}' mkdir Successfully`);
|
159 | this.emit('uploaded', { message, directory: dir });
|
160 | cb && util.isFunction(cb) && cb.call(this, dir);
|
161 | }
|
162 | });
|
163 | }
|
164 |
|
165 | onUpload(file, next) {
|
166 | let stat = T.fs.statSync(file.path);
|
167 | if (stat.isDirectory()) {
|
168 | return next(null, file);
|
169 | }
|
170 |
|
171 | let remotePath = this._getRemotePath(file);
|
172 |
|
173 |
|
174 | if (this._isFullType) {
|
175 | return this._uploader(file, next, remotePath);
|
176 | }
|
177 |
|
178 |
|
179 | this._isIncrementType &&
|
180 | this.sftp.exists(remotePath, isExists => {
|
181 | if (!isExists) {
|
182 | this._uploader(file, next, remotePath);
|
183 | } else {
|
184 | this._downloader(file, next, remotePath);
|
185 | }
|
186 | });
|
187 | }
|
188 |
|
189 | _downloader(file, next, realPath) {
|
190 | const stat = T.fs.statSync(file.path);
|
191 | const fileContent = T.getFileContent(file.path);
|
192 | const filename = T.Path.basename(realPath);
|
193 | const localHash = crypto
|
194 | .createHash('md5')
|
195 | .update(fileContent)
|
196 | .digest('hex');
|
197 |
|
198 | const stream = this.sftp.createReadStream(realPath, {
|
199 | flags: 'r',
|
200 | encoding: null,
|
201 | mode: '0666',
|
202 | autoClose: true
|
203 | });
|
204 |
|
205 | let downloadBytes = '';
|
206 |
|
207 | stream.on('data', chunk => {
|
208 | downloadBytes += chunk;
|
209 | });
|
210 |
|
211 | stream.on('close', err => {
|
212 | const remoteHash = crypto
|
213 | .createHash('md5')
|
214 | .update(downloadBytes)
|
215 | .digest('hex');
|
216 |
|
217 | let message;
|
218 |
|
219 | if (localHash !== remoteHash) {
|
220 | message = T.msg.green(`√ [${T.getTime()}] check hashContent ( Modified ), filename: '${filename}' `);
|
221 | this.emit('uploaded', { message, file, realPath, size: file.stat.size });
|
222 | this._uploader(file, next, realPath);
|
223 | } else {
|
224 | this.fileCount++;
|
225 | this.modCount++;
|
226 | message = T.msg.green(`√ [${T.getTime()}] check hashContent ( No Modify ), filename: '${filename}' `);
|
227 | this.emit('uploaded', { message, file, realPath, size: file.stat.size });
|
228 | next(null, file);
|
229 | }
|
230 | });
|
231 | }
|
232 |
|
233 | _uploader(file, next, realPath) {
|
234 | let stream = this.sftp.createWriteStream(realPath, {
|
235 | flags: 'w',
|
236 | encoding: null,
|
237 | mode: '0666',
|
238 | autoClose: true
|
239 | });
|
240 |
|
241 | let uploadedBytes = 0;
|
242 | let highWaterMark = stream.highWaterMark || 16 * 1000;
|
243 | let size = file.stat.size;
|
244 |
|
245 | file.pipe(stream);
|
246 |
|
247 | stream.on('drain', () => {
|
248 | uploadedBytes += highWaterMark;
|
249 | this.emit('upload_progress', { file, realPath, total: size, uploaded: uploadedBytes });
|
250 | });
|
251 |
|
252 | stream.on('error', err => {
|
253 | const message = T.msg.red(`× '${realPath}' upload error`);
|
254 | this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(`× ${message}: ${err.message}`) });
|
255 | });
|
256 |
|
257 | stream.on('close', err => {
|
258 | if (err) {
|
259 | const message = `${realPath} is upload fail`;
|
260 | this.emit('error', new Error(message, err));
|
261 | this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(`× ${message}: ${err.message}`) });
|
262 | } else {
|
263 | this.fileCount++;
|
264 | this.emit('uploaded', { message: this._uploadedText(realPath, size), file, realPath, size });
|
265 | }
|
266 | next(null, file);
|
267 | });
|
268 | }
|
269 |
|
270 | _execMkdir(dir, cb) {
|
271 | cb = (util.isFunction(cb) && cb) || function() {};
|
272 |
|
273 | const dirs = parents(dir)
|
274 | .filter(dir => ['.', '..'].indexOf(dir) < 0)
|
275 | .filter(dir => !this.mkDirCache[dir])
|
276 | .filter(dir => !!dir)
|
277 | .map(d => this._getRelativePath(d));
|
278 |
|
279 | while (dirs && dirs.length >= 0) {
|
280 | if (dirs.length === 0) return cb.call(this);
|
281 | const d = dirs.pop();
|
282 | this.mkDirCache[d] = true;
|
283 | this.hasExists(d, isD => {
|
284 | !isD && this.mkdirectory(d);
|
285 | });
|
286 | }
|
287 | }
|
288 |
|
289 | _onReady() {
|
290 | this.ssh2.sftp((err, sftp) => {
|
291 | if (err) T.log.error(err.message);
|
292 |
|
293 | sftp.on('end', () => {});
|
294 |
|
295 | this.sftp = sftp;
|
296 | const remotePath = this._getRelativePath(this.remotePath);
|
297 | this._execMkdir(remotePath, this.startUpload);
|
298 | });
|
299 | }
|
300 |
|
301 | _onError(err) {
|
302 | T.log.error(err.message);
|
303 | }
|
304 |
|
305 | _onEnd() {
|
306 |
|
307 | }
|
308 |
|
309 | _onClose() {
|
310 |
|
311 | }
|
312 |
|
313 | _uploadedText(realPath, size) {
|
314 | return `√ [${T.getTime()}] uploaded '${realPath}', ${T.msg.yellow(size / 1000 + ' kb')}`;
|
315 | }
|
316 |
|
317 | _getRemotePath(file) {
|
318 | let remotePath = T.Path.join(this.remotePath, file.relative);
|
319 | return this._getRelativePath(remotePath);
|
320 | }
|
321 |
|
322 | _getRelativePath(path) {
|
323 | if (this.platform.toLowerCase() === 'win') {
|
324 | path = path.replace(/\//gi, '\\');
|
325 | } else {
|
326 | path = path.replace(/(\\{1,2}|\/)/gi, '/');
|
327 | }
|
328 | return path;
|
329 | }
|
330 |
|
331 | static _normalizePath(path) {
|
332 | return path.replace(/\\/g, '/');
|
333 | }
|
334 | }
|
335 |
|
336 | module.exports = UploaderFTP;
|