UNPKG

13.5 kBJavaScriptView Raw
1var Crypto = require('crypto');
2var Events = require('events');
3var Net = require('net');
4var tls = require('tls');
5var ConnectionConfig = require('./ConnectionConfig');
6var Protocol = require('./protocol/Protocol');
7var BlobData = require('./protocol/sequences/BlobData');
8var Util = require('util');
9
10module.exports = Connection;
11Util.inherits(Connection, Events.EventEmitter);
12function Connection(options) {
13 Events.EventEmitter.call(this);
14
15 this.config = options.config;
16
17 this._socket = options.socket;
18 this._protocol = new Protocol({config: this.config, connection: this});
19 this._connectCalled = false;
20 this.state = 'disconnected';
21 this.threadId = null;
22}
23
24Connection.createQuery = function createQuery(values, callback) {
25 if (values instanceof BlobData) {
26 return values;
27 }
28
29 var cb = wrapCallbackInDomain(null, callback);
30 var options = {};
31
32 if (typeof values === 'function') {
33 cb = wrapCallbackInDomain(null, values);
34 return new BlobData(values, options, cb);
35 }
36
37 if (typeof values === 'object') {
38 for (var prop in values) {
39 options[prop] = values[prop];
40 }
41
42 return new BlobData(values, options, cb);
43 }
44
45 // options.values = values;
46
47 if (cb === undefined && callback !== undefined) {
48 throw new TypeError('argument callback must be a function when provided');
49 }
50
51 return new BlobData(values, values, cb);
52};
53
54Connection.prototype.connect = function connect(options, callback) {
55 if (!callback && typeof options === 'function') {
56 callback = options;
57 options = {};
58 }
59
60 if (!this._connectCalled) {
61 this._connectCalled = true;
62
63 // Connect either via a UNIX domain socket or a TCP socket.
64 this._socket = (this.config.socketPath)
65 ? Net.createConnection(this.config.socketPath)
66 : Net.createConnection(this.config.port, this.config.host);
67
68 // Connect socket to connection domain
69 if (Events.usingDomains) {
70 this._socket.domain = this.domain;
71 }
72
73 var connection = this;
74 this._protocol.on('data', function(data) {
75 // if(data.length <= 100){
76 connection._socket.write(data);
77 // } else {
78 // var length = 100;
79 // var first = Buffer.alloc(length);
80 // data.copy(first, 0, 0, length);
81 // connection._socket.write(first);
82 // setTimeout(()=>{
83 // length = data.length - 100;
84 // first = Buffer.alloc(length);
85 // data.copy(first, 0, 100, 100 + length);
86 // connection._socket.write(first);
87 // }, 2000)
88 // }
89 });
90 this._socket.on('data', wrapToDomain(connection, function (data) {
91 connection._protocol.write(data);
92 }));
93 this._protocol.on('end', function() {
94 connection._socket.end();
95 });
96 this._socket.on('end', wrapToDomain(connection, function () {
97 connection._protocol.end();
98 }));
99
100 this._socket.on('error', this._handleNetworkError.bind(this));
101 this._socket.on('connect', this._handleProtocolConnect.bind(this));
102 this._protocol.on('handshake', this._handleProtocolHandshake.bind(this));
103 this._protocol.on('initialize', this._handleProtocolInitialize.bind(this));
104 this._protocol.on('unhandledError', this._handleProtocolError.bind(this));
105 this._protocol.on('drain', this._handleProtocolDrain.bind(this));
106 this._protocol.on('end', this._handleProtocolEnd.bind(this));
107 this._protocol.on('enqueue', this._handleProtocolEnqueue.bind(this));
108
109 if (this.config.connectTimeout) {
110 var handleConnectTimeout = this._handleConnectTimeout.bind(this);
111
112 this._socket.setTimeout(this.config.connectTimeout, handleConnectTimeout);
113 this._socket.once('connect', function() {
114 this.setTimeout(0, handleConnectTimeout);
115 });
116 }
117 }
118 if(callback){
119 callback();
120 }
121 //这里暂时不需要握手
122 //this._protocol.handshake(options, wrapCallbackInDomain(this, callback));
123};
124
125Connection.prototype.changeUser = function changeUser(options, callback) {
126 if (!callback && typeof options === 'function') {
127 callback = options;
128 options = {};
129 }
130
131 this._implyConnect();
132
133 var charsetNumber = (options.charset)
134 ? ConnectionConfig.getCharsetNumber(options.charset)
135 : this.config.charsetNumber;
136
137 return this._protocol.changeUser({
138 user : options.user || this.config.user,
139 password : options.password || this.config.password,
140 database : options.database || this.config.database,
141 timeout : options.timeout,
142 charsetNumber : charsetNumber,
143 currentConfig : this.config
144 }, wrapCallbackInDomain(this, callback));
145};
146
147Connection.prototype.query = function query(values, cb) {
148 var query = Connection.createQuery(values, cb);
149 query._connection = this;
150
151 if (query._callback) {
152 query._callback = wrapCallbackInDomain(this, query._callback);
153 }
154
155 this._implyConnect();
156
157 return this._protocol._enqueue(query);
158};
159
160Connection.prototype.ping = function ping(options, callback) {
161 if (!callback && typeof options === 'function') {
162 callback = options;
163 options = {};
164 }
165
166 this._implyConnect();
167 this._protocol.ping(options, wrapCallbackInDomain(this, callback));
168};
169
170Connection.prototype.statistics = function statistics(options, callback) {
171 if (!callback && typeof options === 'function') {
172 callback = options;
173 options = {};
174 }
175
176 this._implyConnect();
177 this._protocol.stats(options, wrapCallbackInDomain(this, callback));
178};
179
180Connection.prototype.end = function end(options, callback) {
181 var cb = callback;
182 var opts = options;
183
184 if (!callback && typeof options === 'function') {
185 cb = options;
186 opts = null;
187 }
188
189 // create custom options reference
190 opts = Object.create(opts || null);
191
192 if (opts.timeout === undefined) {
193 // default timeout of 30 seconds
194 opts.timeout = 30000;
195 }
196
197 this._implyConnect();
198 this._protocol.quit(opts, wrapCallbackInDomain(this, cb));
199};
200
201Connection.prototype.destroy = function() {
202 this.state = 'disconnected';
203 this._implyConnect();
204 this._socket.destroy();
205 this._protocol.destroy();
206};
207
208Connection.prototype.pause = function() {
209 this._socket.pause();
210 this._protocol.pause();
211};
212
213Connection.prototype.resume = function() {
214 this._socket.resume();
215 this._protocol.resume();
216};
217
218Connection.prototype.escape = function(value) {
219 return value;//SqlString.escape(value, false, this.config.timezone);
220};
221
222Connection.prototype.escapeId = function escapeId(value) {
223 return value;//SqlString.escapeId(value, false);
224};
225
226Connection.prototype.format = function(sql, values) {
227 if (typeof this.config.queryFormat === 'function') {
228 return this.config.queryFormat.call(this, sql, values, this.config.timezone);
229 }
230 return sql;//SqlString.format(sql, values, this.config.stringifyObjects, this.config.timezone);
231};
232
233if (tls.TLSSocket) {
234 // 0.11+ environment
235 Connection.prototype._startTLS = function _startTLS(onSecure) {
236 var connection = this;
237
238 createSecureContext(this.config, function (err, secureContext) {
239 if (err) {
240 onSecure(err);
241 return;
242 }
243
244 // "unpipe"
245 connection._socket.removeAllListeners('data');
246 connection._protocol.removeAllListeners('data');
247
248 // socket <-> encrypted
249 var rejectUnauthorized = connection.config.ssl.rejectUnauthorized;
250 var secureEstablished = false;
251 var secureSocket = new tls.TLSSocket(connection._socket, {
252 rejectUnauthorized : rejectUnauthorized,
253 requestCert : true,
254 secureContext : secureContext,
255 isServer : false
256 });
257
258 // error handler for secure socket
259 secureSocket.on('_tlsError', function(err) {
260 if (secureEstablished) {
261 connection._handleNetworkError(err);
262 } else {
263 onSecure(err);
264 }
265 });
266
267 // cleartext <-> protocol
268 secureSocket.pipe(connection._protocol);
269 connection._protocol.on('data', function(data) {
270 secureSocket.write(data);
271 });
272
273 secureSocket.on('secure', function() {
274 secureEstablished = true;
275
276 onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
277 });
278
279 // start TLS communications
280 secureSocket._start();
281 });
282 };
283} else {
284 // pre-0.11 environment
285 Connection.prototype._startTLS = function _startTLS(onSecure) {
286 // before TLS:
287 // _socket <-> _protocol
288 // after:
289 // _socket <-> securePair.encrypted <-> securePair.cleartext <-> _protocol
290
291 var connection = this;
292 var credentials = Crypto.createCredentials({
293 ca : this.config.ssl.ca,
294 cert : this.config.ssl.cert,
295 ciphers : this.config.ssl.ciphers,
296 key : this.config.ssl.key,
297 passphrase : this.config.ssl.passphrase
298 });
299
300 var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
301 var secureEstablished = false;
302 var securePair = tls.createSecurePair(credentials, false, true, rejectUnauthorized);
303
304 // error handler for secure pair
305 securePair.on('error', function(err) {
306 if (secureEstablished) {
307 connection._handleNetworkError(err);
308 } else {
309 onSecure(err);
310 }
311 });
312
313 // "unpipe"
314 this._socket.removeAllListeners('data');
315 this._protocol.removeAllListeners('data');
316
317 // socket <-> encrypted
318 securePair.encrypted.pipe(this._socket);
319 this._socket.on('data', function(data) {
320 securePair.encrypted.write(data);
321 });
322
323 // cleartext <-> protocol
324 securePair.cleartext.pipe(this._protocol);
325 this._protocol.on('data', function(data) {
326 securePair.cleartext.write(data);
327 });
328
329 // secure established
330 securePair.on('secure', function() {
331 secureEstablished = true;
332
333 if (!rejectUnauthorized) {
334 onSecure();
335 return;
336 }
337
338 var verifyError = this.ssl.verifyError();
339 var err = verifyError;
340
341 // node.js 0.6 support
342 if (typeof err === 'string') {
343 err = new Error(verifyError);
344 err.code = verifyError;
345 }
346
347 onSecure(err);
348 });
349
350 // node.js 0.8 bug
351 securePair._cycle = securePair.cycle;
352 securePair.cycle = function cycle() {
353 if (this.ssl && this.ssl.error) {
354 this.error();
355 }
356
357 return this._cycle.apply(this, arguments);
358 };
359 };
360}
361
362Connection.prototype._handleConnectTimeout = function() {
363 if (this._socket) {
364 this._socket.setTimeout(0);
365 this._socket.destroy();
366 }
367
368 var err = new Error('connect ETIMEDOUT');
369 err.errorno = 'ETIMEDOUT';
370 err.code = 'ETIMEDOUT';
371 err.syscall = 'connect';
372
373 this._handleNetworkError(err);
374};
375
376Connection.prototype._handleNetworkError = function(err) {
377 this._protocol.handleNetworkError(err);
378};
379
380Connection.prototype._handleProtocolError = function(err) {
381 this.state = 'protocol_error';
382 this.emit('error', err);
383};
384
385Connection.prototype._handleProtocolDrain = function() {
386 this.emit('drain');
387};
388
389Connection.prototype._handleProtocolConnect = function() {
390 this.state = 'connected';
391 this.emit('connect');
392};
393
394Connection.prototype._handleProtocolHandshake = function _handleProtocolHandshake() {
395 this.state = 'authenticated';
396};
397
398Connection.prototype._handleProtocolInitialize = function _handleProtocolInitialize(packet) {
399 this.threadId = packet.threadId;
400};
401
402Connection.prototype._handleProtocolEnd = function(err) {
403 this.state = 'disconnected';
404 this.emit('end', err);
405};
406
407Connection.prototype._handleProtocolEnqueue = function _handleProtocolEnqueue(sequence) {
408 this.emit('enqueue', sequence);
409};
410
411Connection.prototype._implyConnect = function() {
412 if (!this._connectCalled) {
413 this.connect();
414 }
415};
416
417function createSecureContext (config, cb) {
418 var context = null;
419 var error = null;
420
421 try {
422 context = tls.createSecureContext({
423 ca : config.ssl.ca,
424 cert : config.ssl.cert,
425 ciphers : config.ssl.ciphers,
426 key : config.ssl.key,
427 passphrase : config.ssl.passphrase
428 });
429 } catch (err) {
430 error = err;
431 }
432
433 cb(error, context);
434}
435
436function unwrapFromDomain(fn) {
437 return function () {
438 var domains = [];
439 var ret;
440
441 while (process.domain) {
442 domains.shift(process.domain);
443 process.domain.exit();
444 }
445
446 try {
447 ret = fn.apply(this, arguments);
448 } finally {
449 for (var i = 0; i < domains.length; i++) {
450 domains[i].enter();
451 }
452 }
453
454 return ret;
455 };
456}
457
458function wrapCallbackInDomain(ee, fn) {
459 if (typeof fn !== 'function' || fn.domain) {
460 return fn;
461 }
462
463 var domain = process.domain;
464
465 if (domain) {
466 return domain.bind(fn);
467 } else if (ee) {
468 return unwrapFromDomain(wrapToDomain(ee, fn));
469 } else {
470 return fn;
471 }
472}
473
474function wrapToDomain(ee, fn) {
475 return function () {
476 if (Events.usingDomains && ee.domain) {
477 ee.domain.enter();
478 fn.apply(this, arguments);
479 ee.domain.exit();
480 } else {
481 fn.apply(this, arguments);
482 }
483 };
484}