UNPKG

24.5 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6
7var _tls = require('tls');
8
9var _tls2 = _interopRequireDefault(_tls);
10
11var _net = require('net');
12
13var _net2 = _interopRequireDefault(_net);
14
15var _winston = require('winston');
16
17var _winston2 = _interopRequireDefault(_winston);
18
19var _genericPool = require('generic-pool');
20
21var _genericPool2 = _interopRequireDefault(_genericPool);
22
23var _events = require('events');
24
25function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
26
27class ConnectionInterface extends _events.EventEmitter {
28 constructor(socket, id) {
29 super();
30 this.id = id;
31 this.socket = socket;
32 this.socket.on('error', e => this.emit('error', e));
33 this.socket.on('close', e => this.emit('close', e));
34 }
35
36 send(buffer) {
37 this.socket.write(buffer);
38 }
39
40 destroy() {
41 this.socket.destroy();
42 }
43
44 end() {
45 this.socket.end();
46 }
47
48 get readyState() {
49 return this.socket.readyState;
50 }
51}
52
53class TcpPool {
54 constructor(interfaceConstructor, options) {
55 this.Parser = interfaceConstructor;
56 this.options = Object.assign({}, options);
57 this.name = this.options.name || `${interfaceConstructor.name} Connection Pool`;
58 this.connectionCount = 0;
59
60 const factory = {
61 create: () => this.connect(),
62 destroy: client => this.disconnect(client),
63 validate: client => this.validate(client)
64 };
65 const config = {
66 max: this.options.max || 10,
67 min: this.options.min || 1,
68 acquireTimeoutMillis: this.options.acquireTimeoutMillis || 15000,
69 idleTimeoutMillis: this.options.idleTimeoutMillis || 30000,
70 testOnBorrow: true
71 };
72 this.pool = _genericPool2.default.createPool(factory, config);
73 }
74
75 async acquire(context) {
76 const logger = this.loggerForContext(context);
77 logger.info(`Acquiring connection from ${this.name} pool`);
78 try {
79 const conn = await this.pool.acquire();
80 logger.info(`Returning connection #${conn.id} from ${this.name} pool`);
81 conn.context = context;
82 return conn;
83 } catch (error) {
84 logger.error(`Failed to acquire connection from ${this.name} pool`, {
85 error: error.message || error
86 });
87 throw error;
88 }
89 }
90
91 release(conn) {
92 const logger = this.loggerForContext(conn.context);
93 logger.info(`Releasing connection #${conn.id} into ${this.name} pool`);
94 this.reset(conn);
95 // eslint-disable-next-line no-param-reassign
96 delete conn.context;
97 this.pool.release(conn);
98 }
99
100 destroy(conn) {
101 const logger = this.loggerForContext(conn.context);
102 logger.info(`Destroying connection #${conn.id} of ${this.name} pool`);
103 this.reset(conn);
104 // eslint-disable-next-line no-param-reassign
105 delete conn.context;
106 this.pool.destroy(conn);
107 }
108
109 async destroyAllNow() {
110 _winston2.default.debug(`Pool ${this.name} shutting down`);
111 await this.pool.drain();
112 _winston2.default.debug(`Pool ${this.name} drained`);
113 await this.pool.clear();
114 _winston2.default.debug(`Pool ${this.name} cleared`);
115 }
116
117 async connect() {
118 this.connectionCount += 1;
119 const myId = this.connectionCount;
120 _winston2.default.info(`Pool ${this.name} socket #${myId} connecting`);
121 let attemptCompleted = false;
122 let socket;
123
124 return new Promise((accept, reject) => {
125 let resolved = false;
126 const connectionHandler = async () => {
127 if (!attemptCompleted) {
128 _winston2.default.info(`Pool ${this.name} socket #${myId} connected`);
129 attemptCompleted = true;
130 socket.removeAllListeners();
131 const connectionParser = new this.Parser(socket, myId);
132 if (typeof connectionParser.initializeConnection === 'function') {
133 try {
134 await connectionParser.initializeConnection();
135 } catch (error) {
136 reject(error);
137 return;
138 }
139 }
140 this.reset(connectionParser);
141 resolved = true;
142 accept(connectionParser);
143 }
144 };
145
146 try {
147 if (this.options.insecure === true) {
148 socket = _net2.default.connect({
149 host: this.options.host,
150 port: this.options.port
151 }, connectionHandler);
152 } else {
153 const tlsOptions = Object.assign({
154 secureProtocol: 'TLSv1_2_client_method',
155 host: this.options.host,
156 port: this.options.port
157 }, this.options.tlsOptions);
158 socket = _tls2.default.connect(tlsOptions, connectionHandler);
159 }
160
161 socket.once('error', error => {
162 _winston2.default.error(`Error on Pool ${this.name} socket #${myId}`, {
163 message: error.message,
164 stack: error.stack
165 });
166 if (!attemptCompleted) {
167 attemptCompleted = true;
168 socket.end();
169 // Reject after a second to give some backoff time
170 if (!resolved) {
171 setTimeout(() => reject(error), 1000);
172 resolved = true;
173 }
174 }
175 });
176 } catch (error) {
177 _winston2.default.error(`Error on Pool ${this.name}`, {
178 message: error.message,
179 stack: error.stack
180 });
181 if (!resolved) {
182 reject(error);
183 }
184 }
185 });
186 }
187
188 loggerForContext(context) {
189 if (this.options.loggerFromContext) {
190 return this.options.loggerFromContext(context) || _winston2.default;
191 }
192 return _winston2.default;
193 }
194
195 reset(conn) {
196 conn.removeAllListeners();
197 conn.on('error', error => this.onError(conn, error));
198 conn.on('close', error => this.onClose(conn, error));
199 }
200
201 onError(conn, error) {
202 const logger = this.loggerForContext(conn.context);
203 logger.error(`Error on Pool ${this.name} socket #${conn.id}`, {
204 message: error.message,
205 stack: error.stack
206 });
207 conn.end();
208 this.pool.destroy(conn);
209 }
210
211 onClose(conn) {
212 const logger = this.loggerForContext(conn.context);
213 logger.info(`Pool ${this.name} socket #${conn.id} closed`);
214 }
215
216 validate(conn) {
217 return new Promise(accept => {
218 if (typeof conn.validate === 'function') {
219 Promise.resolve(conn.validate()).then(isValid => accept(isValid));
220 } else {
221 if (conn.readyState === 'open') {
222 accept(true);
223 return;
224 }
225 _winston2.default.error(`Invalid connection in Pool ${this.name} socket #${conn.id}`);
226 accept(false);
227 }
228 });
229 }
230
231 disconnect(conn) {
232 return new Promise((accept, reject) => {
233 try {
234 _winston2.default.debug(`Pool ${this.name} socket #${conn.id} closing`);
235 conn.destroy();
236 accept();
237 } catch (error) {
238 reject(error);
239 }
240 });
241 }
242}
243
244exports.default = TcpPool;
245TcpPool.ConnectionInterface = ConnectionInterface;
246//# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"sources":["../src/index.js"],"names":["ConnectionInterface","constructor","socket","id","on","e","emit","send","buffer","write","destroy","end","readyState","TcpPool","interfaceConstructor","options","Parser","Object","assign","name","connectionCount","factory","create","connect","client","disconnect","validate","config","max","min","acquireTimeoutMillis","idleTimeoutMillis","testOnBorrow","pool","createPool","acquire","context","logger","loggerForContext","info","conn","error","message","release","reset","destroyAllNow","debug","drain","clear","myId","attemptCompleted","Promise","accept","reject","resolved","connectionHandler","removeAllListeners","connectionParser","initializeConnection","insecure","host","port","tlsOptions","secureProtocol","once","stack","setTimeout","loggerFromContext","onError","onClose","resolve","then","isValid"],"mappings":";;;;;;AAAA;;;;AACA;;;;AACA;;;;AACA;;;;AACA;;;;AAEA,MAAMA,mBAAN,8BAA+C;AAC7CC,cAAYC,MAAZ,EAAoBC,EAApB,EAAwB;AACtB;AACA,SAAKA,EAAL,GAAUA,EAAV;AACA,SAAKD,MAAL,GAAcA,MAAd;AACA,SAAKA,MAAL,CAAYE,EAAZ,CAAe,OAAf,EAAwBC,KAAK,KAAKC,IAAL,CAAU,OAAV,EAAmBD,CAAnB,CAA7B;AACA,SAAKH,MAAL,CAAYE,EAAZ,CAAe,OAAf,EAAwBC,KAAK,KAAKC,IAAL,CAAU,OAAV,EAAmBD,CAAnB,CAA7B;AACD;;AAEDE,OAAKC,MAAL,EAAa;AACX,SAAKN,MAAL,CAAYO,KAAZ,CAAkBD,MAAlB;AACD;;AAEDE,YAAU;AACR,SAAKR,MAAL,CAAYQ,OAAZ;AACD;;AAEDC,QAAM;AACJ,SAAKT,MAAL,CAAYS,GAAZ;AACD;;AAED,MAAIC,UAAJ,GAAiB;AACf,WAAO,KAAKV,MAAL,CAAYU,UAAnB;AACD;AAvB4C;;AA0BhC,MAAMC,OAAN,CAAc;AAC3BZ,cAAYa,oBAAZ,EAAkCC,OAAlC,EAA2C;AACzC,SAAKC,MAAL,GAAcF,oBAAd;AACA,SAAKC,OAAL,GAAeE,OAAOC,MAAP,CAAc,EAAd,EAAkBH,OAAlB,CAAf;AACA,SAAKI,IAAL,GAAY,KAAKJ,OAAL,CAAaI,IAAb,IAAsB,GAAEL,qBAAqBK,IAAK,kBAA9D;AACA,SAAKC,eAAL,GAAuB,CAAvB;;AAEA,UAAMC,UAAU;AACdC,cAAQ,MAAM,KAAKC,OAAL,EADA;AAEdb,eAASc,UAAU,KAAKC,UAAL,CAAgBD,MAAhB,CAFL;AAGdE,gBAAUF,UAAU,KAAKE,QAAL,CAAcF,MAAd;AAHN,KAAhB;AAKA,UAAMG,SAAS;AACbC,WAAK,KAAKb,OAAL,CAAaa,GAAb,IAAoB,EADZ;AAEbC,WAAK,KAAKd,OAAL,CAAac,GAAb,IAAoB,CAFZ;AAGbC,4BAAsB,KAAKf,OAAL,CAAae,oBAAb,IAAqC,KAH9C;AAIbC,yBAAmB,KAAKhB,OAAL,CAAagB,iBAAb,IAAkC,KAJxC;AAKbC,oBAAc;AALD,KAAf;AAOA,SAAKC,IAAL,GAAY,sBAAKC,UAAL,CAAgBb,OAAhB,EAAyBM,MAAzB,CAAZ;AACD;;AAED,QAAMQ,OAAN,CAAcC,OAAd,EAAuB;AACrB,UAAMC,SAAS,KAAKC,gBAAL,CAAsBF,OAAtB,CAAf;AACAC,WAAOE,IAAP,CAAa,6BAA4B,KAAKpB,IAAK,OAAnD;AACA,QAAI;AACF,YAAMqB,OAAO,MAAM,KAAKP,IAAL,CAAUE,OAAV,EAAnB;AACAE,aAAOE,IAAP,CAAa,yBAAwBC,KAAKrC,EAAG,SAAQ,KAAKgB,IAAK,OAA/D;AACAqB,WAAKJ,OAAL,GAAeA,OAAf;AACA,aAAOI,IAAP;AACD,KALD,CAKE,OAAOC,KAAP,EAAc;AACdJ,aAAOI,KAAP,CAAc,qCAAoC,KAAKtB,IAAK,OAA5D,EAAoE;AAClEsB,eAAOA,MAAMC,OAAN,IAAiBD;AAD0C,OAApE;AAGA,YAAMA,KAAN;AACD;AACF;;AAEDE,UAAQH,IAAR,EAAc;AACZ,UAAMH,SAAS,KAAKC,gBAAL,CAAsBE,KAAKJ,OAA3B,CAAf;AACAC,WAAOE,IAAP,CAAa,yBAAwBC,KAAKrC,EAAG,SAAQ,KAAKgB,IAAK,OAA/D;AACA,SAAKyB,KAAL,CAAWJ,IAAX;AACA;AACA,WAAOA,KAAKJ,OAAZ;AACA,SAAKH,IAAL,CAAUU,OAAV,CAAkBH,IAAlB;AACD;;AAED9B,UAAQ8B,IAAR,EAAc;AACZ,UAAMH,SAAS,KAAKC,gBAAL,CAAsBE,KAAKJ,OAA3B,CAAf;AACAC,WAAOE,IAAP,CAAa,0BAAyBC,KAAKrC,EAAG,OAAM,KAAKgB,IAAK,OAA9D;AACA,SAAKyB,KAAL,CAAWJ,IAAX;AACA;AACA,WAAOA,KAAKJ,OAAZ;AACA,SAAKH,IAAL,CAAUvB,OAAV,CAAkB8B,IAAlB;AACD;;AAED,QAAMK,aAAN,GAAsB;AACpB,sBAAQC,KAAR,CAAe,QAAO,KAAK3B,IAAK,gBAAhC;AACA,UAAM,KAAKc,IAAL,CAAUc,KAAV,EAAN;AACA,sBAAQD,KAAR,CAAe,QAAO,KAAK3B,IAAK,UAAhC;AACA,UAAM,KAAKc,IAAL,CAAUe,KAAV,EAAN;AACA,sBAAQF,KAAR,CAAe,QAAO,KAAK3B,IAAK,UAAhC;AACD;;AAED,QAAMI,OAAN,GAAgB;AACd,SAAKH,eAAL,IAAwB,CAAxB;AACA,UAAM6B,OAAO,KAAK7B,eAAlB;AACA,sBAAQmB,IAAR,CAAc,QAAO,KAAKpB,IAAK,YAAW8B,IAAK,aAA/C;AACA,QAAIC,mBAAmB,KAAvB;AACA,QAAIhD,MAAJ;;AAEA,WAAO,IAAIiD,OAAJ,CAAY,CAACC,MAAD,EAASC,MAAT,KAAoB;AACrC,UAAIC,WAAW,KAAf;AACA,YAAMC,oBAAoB,YAAY;AACpC,YAAI,CAACL,gBAAL,EAAuB;AACrB,4BAAQX,IAAR,CAAc,QAAO,KAAKpB,IAAK,YAAW8B,IAAK,YAA/C;AACAC,6BAAmB,IAAnB;AACAhD,iBAAOsD,kBAAP;AACA,gBAAMC,mBAAmB,IAAK,KAAKzC,MAAV,CAAkBd,MAAlB,EAA0B+C,IAA1B,CAAzB;AACA,cAAI,OAAOQ,iBAAiBC,oBAAxB,KAAiD,UAArD,EAAiE;AAC/D,gBAAI;AACF,oBAAMD,iBAAiBC,oBAAjB,EAAN;AACD,aAFD,CAEE,OAAOjB,KAAP,EAAc;AACdY,qBAAOZ,KAAP;AACA;AACD;AACF;AACD,eAAKG,KAAL,CAAWa,gBAAX;AACAH,qBAAW,IAAX;AACAF,iBAAOK,gBAAP;AACD;AACF,OAlBD;;AAoBA,UAAI;AACF,YAAI,KAAK1C,OAAL,CAAa4C,QAAb,KAA0B,IAA9B,EAAoC;AAClCzD,mBAAS,cAAIqB,OAAJ,CAAY;AACnBqC,kBAAM,KAAK7C,OAAL,CAAa6C,IADA;AAEnBC,kBAAM,KAAK9C,OAAL,CAAa8C;AAFA,WAAZ,EAGNN,iBAHM,CAAT;AAID,SALD,MAKO;AACL,gBAAMO,aAAa7C,OAAOC,MAAP,CAAc;AAC/B6C,4BAAgB,uBADe;AAE/BH,kBAAM,KAAK7C,OAAL,CAAa6C,IAFY;AAG/BC,kBAAM,KAAK9C,OAAL,CAAa8C;AAHY,WAAd,EAIhB,KAAK9C,OAAL,CAAa+C,UAJG,CAAnB;AAKA5D,mBAAS,cAAIqB,OAAJ,CAAYuC,UAAZ,EAAwBP,iBAAxB,CAAT;AACD;;AAEDrD,eAAO8D,IAAP,CAAY,OAAZ,EAAsBvB,KAAD,IAAW;AAC9B,4BAAQA,KAAR,CAAe,iBAAgB,KAAKtB,IAAK,YAAW8B,IAAK,EAAzD,EAA4D;AAC1DP,qBAASD,MAAMC,OAD2C;AAE1DuB,mBAAOxB,MAAMwB;AAF6C,WAA5D;AAIA,cAAI,CAACf,gBAAL,EAAuB;AACrBA,+BAAmB,IAAnB;AACAhD,mBAAOS,GAAP;AACA;AACA,gBAAI,CAAC2C,QAAL,EAAe;AACbY,yBAAW,MAAMb,OAAOZ,KAAP,CAAjB,EAAgC,IAAhC;AACAa,yBAAW,IAAX;AACD;AACF;AACF,SAdD;AAeD,OA9BD,CA8BE,OAAOb,KAAP,EAAc;AACd,0BAAQA,KAAR,CAAe,iBAAgB,KAAKtB,IAAK,EAAzC,EAA4C;AAC1CuB,mBAASD,MAAMC,OAD2B;AAE1CuB,iBAAOxB,MAAMwB;AAF6B,SAA5C;AAIA,YAAI,CAACX,QAAL,EAAe;AACbD,iBAAOZ,KAAP;AACD;AACF;AACF,KA7DM,CAAP;AA8DD;;AAEDH,mBAAiBF,OAAjB,EAA0B;AACxB,QAAI,KAAKrB,OAAL,CAAaoD,iBAAjB,EAAoC;AAClC,aAAO,KAAKpD,OAAL,CAAaoD,iBAAb,CAA+B/B,OAA/B,sBAAP;AACD;AACD;AACD;;AAEDQ,QAAMJ,IAAN,EAAY;AACVA,SAAKgB,kBAAL;AACAhB,SAAKpC,EAAL,CAAQ,OAAR,EAAiBqC,SAAS,KAAK2B,OAAL,CAAa5B,IAAb,EAAmBC,KAAnB,CAA1B;AACAD,SAAKpC,EAAL,CAAQ,OAAR,EAAiBqC,SAAS,KAAK4B,OAAL,CAAa7B,IAAb,EAAmBC,KAAnB,CAA1B;AACD;;AAED2B,UAAQ5B,IAAR,EAAcC,KAAd,EAAqB;AACnB,UAAMJ,SAAS,KAAKC,gBAAL,CAAsBE,KAAKJ,OAA3B,CAAf;AACAC,WAAOI,KAAP,CAAc,iBAAgB,KAAKtB,IAAK,YAAWqB,KAAKrC,EAAG,EAA3D,EAA8D;AAC5DuC,eAASD,MAAMC,OAD6C;AAE5DuB,aAAOxB,MAAMwB;AAF+C,KAA9D;AAIAzB,SAAK7B,GAAL;AACA,SAAKsB,IAAL,CAAUvB,OAAV,CAAkB8B,IAAlB;AACD;;AAED6B,UAAQ7B,IAAR,EAAc;AACZ,UAAMH,SAAS,KAAKC,gBAAL,CAAsBE,KAAKJ,OAA3B,CAAf;AACAC,WAAOE,IAAP,CAAa,QAAO,KAAKpB,IAAK,YAAWqB,KAAKrC,EAAG,SAAjD;AACD;;AAEDuB,WAASc,IAAT,EAAe;AACb,WAAO,IAAIW,OAAJ,CAAaC,MAAD,IAAY;AAC7B,UAAI,OAAOZ,KAAKd,QAAZ,KAAyB,UAA7B,EAAyC;AACvCyB,gBAAQmB,OAAR,CAAgB9B,KAAKd,QAAL,EAAhB,EACG6C,IADH,CACQC,WAAWpB,OAAOoB,OAAP,CADnB;AAED,OAHD,MAGO;AACL,YAAIhC,KAAK5B,UAAL,KAAoB,MAAxB,EAAgC;AAC9BwC,iBAAO,IAAP;AACA;AACD;AACD,0BAAQX,KAAR,CAAe,8BAA6B,KAAKtB,IAAK,YAAWqB,KAAKrC,EAAG,EAAzE;AACAiD,eAAO,KAAP;AACD;AACF,KAZM,CAAP;AAaD;;AAED3B,aAAWe,IAAX,EAAiB;AACf,WAAO,IAAIW,OAAJ,CAAY,CAACC,MAAD,EAASC,MAAT,KAAoB;AACrC,UAAI;AACF,0BAAQP,KAAR,CAAe,QAAO,KAAK3B,IAAK,YAAWqB,KAAKrC,EAAG,UAAnD;AACAqC,aAAK9B,OAAL;AACA0C;AACD,OAJD,CAIE,OAAOX,KAAP,EAAc;AACdY,eAAOZ,KAAP;AACD;AACF,KARM,CAAP;AASD;AA7L0B;;kBAAR5B,O;AAgMrBA,QAAQb,mBAAR,GAA8BA,mBAA9B","file":"index.js","sourcesContent":["import tls from 'tls';\nimport net from 'net';\nimport winston from 'winston';\nimport pool from 'generic-pool';\nimport { EventEmitter } from 'events';\n\nclass ConnectionInterface extends EventEmitter {\n  constructor(socket, id) {\n    super();\n    this.id = id;\n    this.socket = socket;\n    this.socket.on('error', e => this.emit('error', e));\n    this.socket.on('close', e => this.emit('close', e));\n  }\n\n  send(buffer) {\n    this.socket.write(buffer);\n  }\n\n  destroy() {\n    this.socket.destroy();\n  }\n\n  end() {\n    this.socket.end();\n  }\n\n  get readyState() {\n    return this.socket.readyState;\n  }\n}\n\nexport default class TcpPool {\n  constructor(interfaceConstructor, options) {\n    this.Parser = interfaceConstructor;\n    this.options = Object.assign({}, options);\n    this.name = this.options.name || `${interfaceConstructor.name} Connection Pool`;\n    this.connectionCount = 0;\n\n    const factory = {\n      create: () => this.connect(),\n      destroy: client => this.disconnect(client),\n      validate: client => this.validate(client),\n    };\n    const config = {\n      max: this.options.max || 10,\n      min: this.options.min || 1,\n      acquireTimeoutMillis: this.options.acquireTimeoutMillis || 15000,\n      idleTimeoutMillis: this.options.idleTimeoutMillis || 30000,\n      testOnBorrow: true,\n    };\n    this.pool = pool.createPool(factory, config);\n  }\n\n  async acquire(context) {\n    const logger = this.loggerForContext(context);\n    logger.info(`Acquiring connection from ${this.name} pool`);\n    try {\n      const conn = await this.pool.acquire();\n      logger.info(`Returning connection #${conn.id} from ${this.name} pool`);\n      conn.context = context;\n      return conn;\n    } catch (error) {\n      logger.error(`Failed to acquire connection from ${this.name} pool`, {\n        error: error.message || error,\n      });\n      throw error;\n    }\n  }\n\n  release(conn) {\n    const logger = this.loggerForContext(conn.context);\n    logger.info(`Releasing connection #${conn.id} into ${this.name} pool`);\n    this.reset(conn);\n    // eslint-disable-next-line no-param-reassign\n    delete conn.context;\n    this.pool.release(conn);\n  }\n\n  destroy(conn) {\n    const logger = this.loggerForContext(conn.context);\n    logger.info(`Destroying connection #${conn.id} of ${this.name} pool`);\n    this.reset(conn);\n    // eslint-disable-next-line no-param-reassign\n    delete conn.context;\n    this.pool.destroy(conn);\n  }\n\n  async destroyAllNow() {\n    winston.debug(`Pool ${this.name} shutting down`);\n    await this.pool.drain();\n    winston.debug(`Pool ${this.name} drained`);\n    await this.pool.clear();\n    winston.debug(`Pool ${this.name} cleared`);\n  }\n\n  async connect() {\n    this.connectionCount += 1;\n    const myId = this.connectionCount;\n    winston.info(`Pool ${this.name} socket #${myId} connecting`);\n    let attemptCompleted = false;\n    let socket;\n\n    return new Promise((accept, reject) => {\n      let resolved = false;\n      const connectionHandler = async () => {\n        if (!attemptCompleted) {\n          winston.info(`Pool ${this.name} socket #${myId} connected`);\n          attemptCompleted = true;\n          socket.removeAllListeners();\n          const connectionParser = new (this.Parser)(socket, myId);\n          if (typeof connectionParser.initializeConnection === 'function') {\n            try {\n              await connectionParser.initializeConnection();\n            } catch (error) {\n              reject(error);\n              return;\n            }\n          }\n          this.reset(connectionParser);\n          resolved = true;\n          accept(connectionParser);\n        }\n      };\n\n      try {\n        if (this.options.insecure === true) {\n          socket = net.connect({\n            host: this.options.host,\n            port: this.options.port,\n          }, connectionHandler);\n        } else {\n          const tlsOptions = Object.assign({\n            secureProtocol: 'TLSv1_2_client_method',\n            host: this.options.host,\n            port: this.options.port,\n          }, this.options.tlsOptions);\n          socket = tls.connect(tlsOptions, connectionHandler);\n        }\n\n        socket.once('error', (error) => {\n          winston.error(`Error on Pool ${this.name} socket #${myId}`, {\n            message: error.message,\n            stack: error.stack,\n          });\n          if (!attemptCompleted) {\n            attemptCompleted = true;\n            socket.end();\n            // Reject after a second to give some backoff time\n            if (!resolved) {\n              setTimeout(() => reject(error), 1000);\n              resolved = true;\n            }\n          }\n        });\n      } catch (error) {\n        winston.error(`Error on Pool ${this.name}`, {\n          message: error.message,\n          stack: error.stack,\n        });\n        if (!resolved) {\n          reject(error);\n        }\n      }\n    });\n  }\n\n  loggerForContext(context) {\n    if (this.options.loggerFromContext) {\n      return this.options.loggerFromContext(context) || winston;\n    }\n    return winston;\n  }\n\n  reset(conn) {\n    conn.removeAllListeners();\n    conn.on('error', error => this.onError(conn, error));\n    conn.on('close', error => this.onClose(conn, error));\n  }\n\n  onError(conn, error) {\n    const logger = this.loggerForContext(conn.context);\n    logger.error(`Error on Pool ${this.name} socket #${conn.id}`, {\n      message: error.message,\n      stack: error.stack,\n    });\n    conn.end();\n    this.pool.destroy(conn);\n  }\n\n  onClose(conn) {\n    const logger = this.loggerForContext(conn.context);\n    logger.info(`Pool ${this.name} socket #${conn.id} closed`);\n  }\n\n  validate(conn) {\n    return new Promise((accept) => {\n      if (typeof conn.validate === 'function') {\n        Promise.resolve(conn.validate())\n          .then(isValid => accept(isValid));\n      } else {\n        if (conn.readyState === 'open') {\n          accept(true);\n          return;\n        }\n        winston.error(`Invalid connection in Pool ${this.name} socket #${conn.id}`);\n        accept(false);\n      }\n    });\n  }\n\n  disconnect(conn) {\n    return new Promise((accept, reject) => {\n      try {\n        winston.debug(`Pool ${this.name} socket #${conn.id} closing`);\n        conn.destroy();\n        accept();\n      } catch (error) {\n        reject(error);\n      }\n    });\n  }\n}\n\nTcpPool.ConnectionInterface = ConnectionInterface;\n"]}
\No newline at end of file