1 | const { dir } = require('console');
|
2 |
|
3 |
|
4 |
|
5 |
|
6 | const util = require('../utils'),
|
7 | extend = require('extend'),
|
8 | SSH2 = require('ssh2').Client,
|
9 | vfs = require('vinyl-fs'),
|
10 | mps = require('map-stream'),
|
11 | parents = require('parents'),
|
12 | crypto = require('crypto'),
|
13 | EventEmitter = require('events').EventEmitter,
|
14 | T = require('../tools');
|
15 |
|
16 | class Uploader extends EventEmitter {
|
17 | constructor(localPath, remotePath, authentications) {
|
18 | super();
|
19 | this.localPath = localPath;
|
20 | this.remotePath = remotePath;
|
21 | this.auths = authentications;
|
22 | this.ssh2 = new SSH2();
|
23 | this.sftp = null;
|
24 | this.mkDirCache = {};
|
25 | this.finished = false;
|
26 | this.fileCount = 0;
|
27 | this.modCount = 0;
|
28 | this.platform = 'unix';
|
29 | this.host = this.auths.host;
|
30 |
|
31 | this.type = 'full';
|
32 | this.errCaches = {};
|
33 | }
|
34 |
|
35 | init() {}
|
36 |
|
37 | setType(type) {
|
38 | type && (this.type = type);
|
39 |
|
40 | this._isIncrementType = this.type === 'increment';
|
41 |
|
42 | this._isFullType = this.type === 'full';
|
43 | }
|
44 |
|
45 | start() {
|
46 | if (!this.host && !util.isString(this.host)) {
|
47 | return T.log.error(`× server host is undefined`);
|
48 | }
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 |
|
55 | if (this.sftp) return this.startUpload();
|
56 |
|
57 | let message;
|
58 | if (this.auths.password) {
|
59 | message = T.msg.gray(`→ [${this.host}] Authenticating with password.`);
|
60 | } else if (this.auths.key) {
|
61 | message = T.msg.gray(`→ [${this.host}] Authenticating with private key.`);
|
62 | }
|
63 |
|
64 | this.emit('start', { message, uploader: this });
|
65 |
|
66 | this.ssh2.on('ready', this._onReady.bind(this));
|
67 | this.ssh2.on('error', this._onError.bind(this));
|
68 | this.ssh2.on('end', this._onEnd.bind(this));
|
69 | this.ssh2.on('close', this._onClose.bind(this));
|
70 | this.ssh2.connect(this.auths);
|
71 | }
|
72 |
|
73 | stop() {
|
74 | this.sftp && this.sftp.end();
|
75 | this.ssh2 && this.ssh2.end();
|
76 | }
|
77 |
|
78 | startUpload() {
|
79 | this.time = Date.now();
|
80 |
|
81 | let stream = vfs
|
82 | .src(this.localPath)
|
83 |
|
84 | .pipe(mps(this.checkDir.bind(this)));
|
85 |
|
86 | stream.on('close', () => {
|
87 | vfs
|
88 | .src(this.localPath)
|
89 |
|
90 | .pipe(mps(this.onUpload.bind(this)))
|
91 | .on('close', () => {
|
92 | const duration = Date.now() - this.time;
|
93 | const status = `
|
94 | server info: host: ${this.host}; user: ${this.auths.username}
|
95 | local path: ${this.localPath}
|
96 | remote path: ${this.remotePath}
|
97 | total files: ${this.fileCount}
|
98 | modified files: ${this.fileCount - this.modCount}
|
99 | uploaded files: ${this.fileCount - this.modCount}\n`;
|
100 | this.emit('done', { fileCount: this.fileCount, modCount: this.modCount, status, duration, uploader: this });
|
101 | this.fileCount = 0;
|
102 | this.modCount = 0;
|
103 | this.errCaches = {};
|
104 | });
|
105 | });
|
106 | }
|
107 |
|
108 |
|
109 | checkDir(file, next) {
|
110 | let stat = T.fs.statSync(file.path);
|
111 | if (!stat.isDirectory()) {
|
112 | return next(null, file);
|
113 | }
|
114 | let remotePath = this._getRemotePath(file);
|
115 | let dirname = Uploader._normalizePath(T.Path.join(remotePath, ''));
|
116 |
|
117 | let directories = parents(dirname)
|
118 | .map(d => {
|
119 | return d.replace(/^\/~/, '~');
|
120 | })
|
121 | .map(Uploader._normalizePath);
|
122 |
|
123 | /^\
|
124 |
|
125 | directories = directories.filter(d => d !== this._getRelativePath(this.remotePath));
|
126 | directories = directories.filter(d => {
|
127 | return d.length >= this.remotePath.length && !this.mkDirCache[d];
|
128 | });
|
129 |
|
130 | while (directories.length >= 0) {
|
131 | if (directories.length === 0) return next(null, file);
|
132 | let dir = directories.pop();
|
133 | if (dir) {
|
134 | this.mkDirCache[dir] = true;
|
135 | this.hasExists(dir, isExists => {
|
136 | !isExists && this.mkdirectory(dir);
|
137 | });
|
138 | }
|
139 | }
|
140 | }
|
141 |
|
142 | hasExists(dir, cb) {
|
143 | this.sftp.exists(dir, isExists => {
|
144 | cb && util.isFunction(cb) && cb.call(this, isExists, dir);
|
145 | });
|
146 | }
|
147 |
|
148 | mkdirectory(dir, cb) {
|
149 | this.sftp.mkdir(dir, { mode: '0777' }, err => {
|
150 | let message;
|
151 | if (err) {
|
152 | message = T.msg.red(`× [${T.getTime()}] '${dir}' mkdir Failed`);
|
153 | this.emit('error', new Error(message, err));
|
154 | T.log.error(message);
|
155 | } else {
|
156 | message = T.msg.green(`√ [${T.getTime()}] '${dir}' mkdir Successfully`);
|
157 | this.emit('uploaded', { message, directory: dir });
|
158 | cb && util.isFunction(cb) && cb.call(this, dir);
|
159 | }
|
160 | });
|
161 | }
|
162 |
|
163 | onUpload(file, next) {
|
164 | let stat = T.fs.statSync(file.path);
|
165 | if (stat.isDirectory()) {
|
166 | return next(null, file);
|
167 | }
|
168 |
|
169 | let remotePath = this._getRemotePath(file);
|
170 |
|
171 |
|
172 | if (this._isFullType) {
|
173 | return this._uploader(file, next, remotePath);
|
174 | }
|
175 |
|
176 |
|
177 | this._isIncrementType &&
|
178 | this.sftp.exists(remotePath, isExists => {
|
179 | if (!isExists) {
|
180 | this._uploader(file, next, remotePath);
|
181 | } else {
|
182 | this._downloader(file, next, remotePath);
|
183 | }
|
184 | });
|
185 | }
|
186 |
|
187 | _downloader(file, next, realPath) {
|
188 | const stat = T.fs.statSync(file.path);
|
189 | const fileContent = T.getFileContent(file.path);
|
190 | const filename = T.Path.basename(realPath);
|
191 | const localHash = crypto
|
192 | .createHash('md5')
|
193 | .update(fileContent)
|
194 | .digest('hex');
|
195 |
|
196 | const stream = this.sftp.createReadStream(realPath, {
|
197 | flags: 'r',
|
198 | encoding: null,
|
199 | mode: '0666',
|
200 | autoClose: true
|
201 | });
|
202 |
|
203 | let downloadBytes = '';
|
204 |
|
205 | stream.on('data', chunk => {
|
206 | downloadBytes += chunk;
|
207 | });
|
208 |
|
209 | stream.on('close', err => {
|
210 | const remoteHash = crypto
|
211 | .createHash('md5')
|
212 | .update(downloadBytes)
|
213 | .digest('hex');
|
214 |
|
215 | let message;
|
216 |
|
217 | if (localHash !== remoteHash) {
|
218 | message = T.msg.green(`√ [${T.getTime()}] check hashContent ( Modified ), filename: '${filename}' `);
|
219 | this.emit('uploaded', { message, file, realPath, size: file.stat.size });
|
220 | this._uploader(file, next, realPath);
|
221 | } else {
|
222 | this.fileCount++;
|
223 | this.modCount++;
|
224 | message = T.msg.green(`√ [${T.getTime()}] check hashContent ( No Modify ), filename: '${filename}' `);
|
225 | this.emit('uploaded', { message, file, realPath, size: file.stat.size });
|
226 | next(null, file);
|
227 | }
|
228 | });
|
229 | }
|
230 |
|
231 | _uploader(file, next, realPath) {
|
232 | let stream = this.sftp.createWriteStream(realPath, {
|
233 | flags: 'w',
|
234 | encoding: null,
|
235 | mode: '0666',
|
236 | autoClose: true
|
237 | });
|
238 |
|
239 | let uploadedBytes = 0;
|
240 | let highWaterMark = stream.highWaterMark || 16 * 1000;
|
241 | let size = file.stat.size;
|
242 | this.errCaches[realPath] = false;
|
243 |
|
244 | file.pipe(stream);
|
245 |
|
246 | stream.on('drain', () => {
|
247 | uploadedBytes += highWaterMark;
|
248 | this.emit('upload_progress', { file, realPath, total: size, uploaded: uploadedBytes });
|
249 | });
|
250 |
|
251 | stream.on('error', err => {
|
252 | this.errCaches[realPath] = true;
|
253 | const message = T.msg.red(`× '${realPath}' upload error: ${err.message}`);
|
254 | this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(`× ${message}: ${err.message}`) });
|
255 | });
|
256 |
|
257 | stream.on('close', err => {
|
258 | if (err || this.errCaches[realPath]) {
|
259 | const message = `${realPath} is upload fail`;
|
260 | this.emit('error', new Error(message, err));
|
261 | this.emit('uploaded', { message, file, realPath, size, error: T.msg.red(`× ${message}: ${err.message}`) });
|
262 | } else {
|
263 | this.fileCount++;
|
264 | this.emit('uploaded', { message: this._uploadedText(realPath, size), file, realPath, size });
|
265 | }
|
266 | next(null, file);
|
267 | });
|
268 | }
|
269 |
|
270 | _execMkdir(dir, cb) {
|
271 | cb = (util.isFunction(cb) && cb) || function() {};
|
272 |
|
273 | let dirs = parents(dir)
|
274 | .filter(dir => ['.', '..'].indexOf(dir) < 0)
|
275 | .filter(dir => !this.mkDirCache[dir])
|
276 | .filter(dir => !!dir)
|
277 | .map(d => this._getRelativePath(d));
|
278 |
|
279 | /^\
|
280 |
|
281 | while (dirs && dirs.length >= 0) {
|
282 | if (dirs.length === 0) return cb.call(this);
|
283 | const d = dirs.pop();
|
284 | this.mkDirCache[d] = true;
|
285 | this.hasExists(d, isD => {
|
286 | !isD && this.mkdirectory(d);
|
287 | });
|
288 | }
|
289 | }
|
290 |
|
291 | _onReady() {
|
292 | this.ssh2.sftp((err, sftp) => {
|
293 | if (err) T.log.error(err.message);
|
294 |
|
295 | sftp.on('end', () => {});
|
296 |
|
297 | this.sftp = sftp;
|
298 | const remotePath = this._getRelativePath(this.remotePath);
|
299 | this._execMkdir(remotePath, this.startUpload);
|
300 | });
|
301 | }
|
302 |
|
303 | _onError(err) {
|
304 | T.log.error(err.message);
|
305 | }
|
306 |
|
307 | _onEnd() {
|
308 |
|
309 | }
|
310 |
|
311 | _onClose() {
|
312 |
|
313 | }
|
314 |
|
315 | _uploadedText(realPath, size) {
|
316 | return `√ [${T.getTime()}] uploaded '${realPath}', ${T.msg.yellow(size / 1000 + ' kb')}`;
|
317 | }
|
318 |
|
319 | _getRemotePath(file) {
|
320 | let remotePath = T.Path.join(this.remotePath, file.relative);
|
321 | return this._getRelativePath(remotePath);
|
322 | }
|
323 |
|
324 | _getRelativePath(path) {
|
325 | if (this.platform.toLowerCase() === 'win') {
|
326 | path = path.replace(/\//gi, '\\');
|
327 | } else {
|
328 | path = path.replace(/(\\{1,2}|\/)/gi, '/');
|
329 | }
|
330 | return path;
|
331 | }
|
332 |
|
333 | _fixPrefix(dirs) {
|
334 | dirs = dirs.map((d) => /^\//.test(d) ? d : '/' + d);
|
335 | return dirs;
|
336 | }
|
337 |
|
338 | static _normalizePath(path) {
|
339 | return path.replace(/\\/g, '/');
|
340 | }
|
341 | }
|
342 |
|
343 | module.exports = Uploader;
|