UNPKG

24.5 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6
7var _tls = require('tls');
8
9var _tls2 = _interopRequireDefault(_tls);
10
11var _net = require('net');
12
13var _net2 = _interopRequireDefault(_net);
14
15var _winston = require('winston');
16
17var _winston2 = _interopRequireDefault(_winston);
18
19var _genericPool = require('generic-pool');
20
21var _genericPool2 = _interopRequireDefault(_genericPool);
22
23var _events = require('events');
24
25function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
26
27class ConnectionInterface extends _events.EventEmitter {
28 constructor(socket, id) {
29 super();
30 this.id = id;
31 this.socket = socket;
32 this.socket.on('error', e => this.emit('error', e));
33 this.socket.on('close', e => this.emit('close', e));
34 }
35
36 send(buffer) {
37 this.socket.write(buffer);
38 }
39
40 destroy() {
41 this.socket.destroy();
42 }
43
44 end() {
45 this.socket.end();
46 }
47
48 get readyState() {
49 return this.socket.readyState;
50 }
51}
52
53class TcpPool {
54 constructor(interfaceConstructor, options) {
55 this.Parser = interfaceConstructor;
56 this.options = Object.assign({}, options);
57 this.name = this.options.name || `${interfaceConstructor.name} Connection Pool`;
58 this.connectionCount = 0;
59
60 const factory = {
61 create: () => this.connect(),
62 destroy: client => this.disconnect(client),
63 validate: client => this.validate(client)
64 };
65 const config = {
66 max: this.options.max || 10,
67 min: this.options.min || 1,
68 acquireTimeoutMillis: this.options.acquireTimeoutMillis || 15000,
69 idleTimeoutMillis: this.options.idleTimeoutMillis || 30000,
70 testOnBorrow: true
71 };
72 this.pool = _genericPool2.default.createPool(factory, config);
73 }
74
75 async acquire(context) {
76 const logger = this.loggerForContext(context);
77 logger.info(`Acquiring connection from ${this.name} pool`);
78 try {
79 const conn = await this.pool.acquire();
80 logger.info(`Returning connection #${conn.id} from ${this.name} pool`);
81 conn.context = context;
82 return conn;
83 } catch (error) {
84 logger.error(`Failed to acquire connection from ${this.name} pool`, {
85 error: error.message || error
86 });
87 throw error;
88 }
89 }
90
91 release(conn) {
92 const logger = this.loggerForContext(conn.context);
93 logger.info(`Releasing connection #${conn.id} into ${this.name} pool`);
94 this.reset(conn);
95 // eslint-disable-next-line no-param-reassign
96 delete conn.context;
97 this.pool.release(conn);
98 }
99
100 destroy(conn) {
101 const logger = this.loggerForContext(conn.context);
102 logger.info(`Destroying connection #${conn.id} of ${this.name} pool`);
103 this.reset(conn);
104 // eslint-disable-next-line no-param-reassign
105 delete conn.context;
106 this.pool.destroy(conn);
107 }
108
109 async destroyAllNow() {
110 _winston2.default.debug(`Pool ${this.name} shutting down`);
111 await this.pool.drain();
112 _winston2.default.debug(`Pool ${this.name} drained`);
113 await this.pool.clear();
114 _winston2.default.debug(`Pool ${this.name} cleared`);
115 }
116
117 async connect() {
118 this.connectionCount += 1;
119 const myId = this.connectionCount;
120 _winston2.default.info(`Pool ${this.name} socket #${myId} connecting`);
121 let attemptCompleted = false;
122 let socket;
123
124 return new Promise((accept, reject) => {
125 let resolved = false;
126 const connectionHandler = async () => {
127 if (!attemptCompleted) {
128 _winston2.default.info(`Pool ${this.name} socket #${myId} connected`);
129 attemptCompleted = true;
130 socket.removeAllListeners();
131 const connectionParser = new this.Parser(socket, myId);
132 if (typeof connectionParser.initializeConnection === 'function') {
133 try {
134 await connectionParser.initializeConnection();
135 } catch (error) {
136 reject(error);
137 return;
138 }
139 }
140 this.reset(connectionParser);
141 resolved = true;
142 accept(connectionParser);
143 }
144 };
145
146 try {
147 if (this.options.insecure === true) {
148 socket = _net2.default.connect({
149 host: this.options.host,
150 port: this.options.port
151 }, connectionHandler);
152 } else {
153 const tlsOptions = Object.assign({
154 secureProtocol: 'TLSv1_2_client_method',
155 host: this.options.host,
156 port: this.options.port
157 }, this.options.tlsOptions);
158 socket = _tls2.default.connect(tlsOptions, connectionHandler);
159 }
160
161 socket.once('error', error => {
162 _winston2.default.error(`Error on Pool ${this.name} socket #${myId}`, {
163 message: error.message,
164 stack: error.stack
165 });
166 if (!attemptCompleted) {
167 attemptCompleted = true;
168 socket.end();
169 // Reject after a second to give some backoff time
170 if (!resolved) {
171 setTimeout(() => reject(error), 1000);
172 resolved = true;
173 }
174 }
175 });
176 } catch (error) {
177 _winston2.default.error(`Error on Pool ${this.name}`, {
178 message: error.message,
179 stack: error.stack
180 });
181 if (!resolved) {
182 reject(error);
183 }
184 }
185 });
186 }
187
188 loggerForContext(context) {
189 if (this.options.loggerFromContext) {
190 return this.options.loggerFromContext(context) || _winston2.default;
191 }
192 return _winston2.default;
193 }
194
195 reset(conn) {
196 conn.removeAllListeners();
197 conn.on('error', error => this.onError(conn, error));
198 conn.on('close', error => this.onClose(conn, error));
199 }
200
201 onError(conn, error) {
202 const logger = this.loggerForContext(conn.context);
203 logger.error(`Error on Pool ${this.name} socket #${conn.id}`, {
204 message: error.message,
205 stack: error.stack
206 });
207 conn.end();
208 this.pool.destroy(conn);
209 }
210
211 onClose(conn) {
212 const logger = this.loggerForContext(conn.context);
213 logger.info(`Pool ${this.name} socket #${conn.id} closed`);
214 }
215
216 validate(conn) {
217 return new Promise(accept => {
218 if (typeof conn.validate === 'function') {
219 Promise.resolve(conn.validate()).then(isValid => accept(isValid));
220 } else {
221 if (conn.readyState === 'open') {
222 accept(true);
223 return;
224 }
225 _winston2.default.error(`Invalid connection in Pool ${this.name} socket #${conn.id}`);
226 accept(false);
227 }
228 });
229 }
230
231 disconnect(conn) {
232 return new Promise((accept, reject) => {
233 try {
234 _winston2.default.debug(`Pool ${this.name} socket #${conn.id} closing`);
235 conn.destroy();
236 accept();
237 } catch (error) {
238 reject(error);
239 }
240 });
241 }
242}
243
244exports.default = TcpPool;
245TcpPool.ConnectionInterface = ConnectionInterface;
246//# sourceMappingURL=data:application/json;charset=utf-8;base64,
\No newline at end of file