UNPKG

2.92 kBJavaScriptView Raw
1var FTPClient = require ('node-ftp/ftp'),
2 util = require ('util'),
3 fs = require ('fs'),
4 ftpManager = require ('./ftp/model-manager');
5
6var pipeProgress = function (config) {
7 this.bytesToRead = 0;
8 this.bytesRead = 0;
9 this.bytesWritten = 0;
10 this.lastLogged = 0;
11 util.extend (this, config);
12}
13
14pipeProgress.prototype.watch = function () {
15 var self = this;
16 if (this.reader && this.readerWatch) {
17 this.reader.on (this.readerWatch, function (chunk) {
18 self.bytesRead += chunk.length;
19 });
20 } else if (this.writer && this.writerWatch) {
21 this.writer.on (this.writerWatch, function (chunk) {
22 self.bytesWritten += chunk.length;
23 });
24 }
25}
26
27var ftpModel = module.exports = function (modelBase) {
28
29 this.modelBase = modelBase;
30 this.url = modelBase.url;
31
32}
33
34util.extend(ftpModel.prototype, {
35
36 store: function (source) {
37
38 var self = this;
39
40 var isStream = source.from instanceof fs.ReadStream;
41
42 if (!isStream) {
43 self.emitError('Source is not ReadStream');
44 return;
45 }
46
47 var progress = new pipeProgress ({
48 reader: source.from,
49 readerWatch: 'data',
50 totalBytes: source.size
51 });
52
53 self.ftp = new FTPClient({ host: self.url.hostname});
54
55 self.ftp.on ('error', function (e) {
56 if (self.emitError(e)) {
57 self.ftp.end();
58 }
59 });
60
61 self.ftp.on ('timeout', function () {
62 if (self.emitError('connTimeout is over')) {
63 self.ftp.end();
64 }
65 });
66
67 self.readStream = source.from;
68
69 self.readStream.on ('data', function (chunk) {
70 self.modelBase.emit('data', chunk);
71 });
72
73 self.readStream.on ('error', function (err) {
74 console.log ('readStream error');
75 if (self.emitError(e)) {
76 self.ftp.end();
77 }
78 });
79
80 self.ftp.on('connect', function() {
81
82 var auth = self.url.auth.split (':');
83 //console.log("!#!#!#!#!#!#!#!#!#!#!#!#!#!!#ftp before auth -> ");
84 self.ftp.auth(auth[0], auth[1], function(e) {
85 // console.log("!#!#!#!#!#!#!#!#!#!#!#!#!#!!# ftp after auth -> ");
86
87 if (self.emitError(e)) {
88 self.ftp.end();
89 return;
90 }
91
92 var cwdTarget = self.url.pathname.substring(1);
93
94 self.ftp.cwd (cwdTarget, function (e) {
95
96 if (e) { //self.emitError(e)) {
97 self.ftp.end();
98 return;
99 }
100
101 if (self.progress) {
102 self.progress.watch ();
103 }
104
105 self.readStream.resume ();
106
107 var putResult = self.ftp.put(self.readStream, source.originalFileName, function(e) {
108
109 if (self.emitError(e)) {
110 self.ftp.end();
111 return;
112 }
113
114 self.ftp.end();
115
116 self.modelBase.emit('end');
117
118 });
119
120 });
121
122 });
123
124 });
125
126 // add self for watching into ftpModelManager
127 project.ftpModelManager.add(self, source);
128
129 return progress;
130 },
131
132 run: function () {
133 this.ftp.connect();
134 },
135
136 stop: function () {
137 this.ftp.end();
138 },
139
140 emitError: function (e) {
141 if (e) {
142 this.modelBase.emit('error', e);
143 return true;
144 } else {
145 return false;
146 }
147 }
148
149});