1 | import tls from 'tls';
|
2 | import net from 'net';
|
3 | import winston from 'winston';
|
4 | import pool from 'generic-pool';
|
5 | import { EventEmitter } from 'events';
|
6 |
|
7 | class ConnectionInterface extends EventEmitter {
|
8 | constructor(socket, id) {
|
9 | super();
|
10 | this.id = id;
|
11 | this.socket = socket;
|
12 | this.socket.on('error', e => this.emit('error', e));
|
13 | this.socket.on('close', e => this.emit('close', e));
|
14 | }
|
15 |
|
16 | send(buffer) {
|
17 | this.socket.write(buffer);
|
18 | }
|
19 |
|
20 | destroy() {
|
21 | this.socket.destroy();
|
22 | }
|
23 |
|
24 | end() {
|
25 | this.socket.end();
|
26 | }
|
27 |
|
28 | get readyState() {
|
29 | return this.socket.readyState;
|
30 | }
|
31 | }
|
32 |
|
33 | export default class TcpPool {
|
34 | constructor(interfaceConstructor, options) {
|
35 | this.Parser = interfaceConstructor;
|
36 | this.options = Object.assign({}, options);
|
37 | this.name = this.options.name || `${interfaceConstructor.name} Connection Pool`;
|
38 | this.connectionCount = 0;
|
39 |
|
40 | const factory = {
|
41 | create: () => this.connect(),
|
42 | destroy: client => this.disconnect(client),
|
43 | validate: client => this.validate(client),
|
44 | };
|
45 | const config = {
|
46 | max: this.options.max || 10,
|
47 | min: this.options.min || 1,
|
48 | acquireTimeoutMillis: this.options.acquireTimeoutMillis || 15000,
|
49 | idleTimeoutMillis: this.options.idleTimeoutMillis || 30000,
|
50 | testOnBorrow: true,
|
51 | };
|
52 | this.pool = pool.createPool(factory, config);
|
53 | }
|
54 |
|
55 | async acquire(context) {
|
56 | const logger = this.loggerForContext(context);
|
57 | logger.info(`Acquiring connection from ${this.name} pool`);
|
58 | try {
|
59 | const conn = await this.pool.acquire();
|
60 | logger.info(`Returning connection #${conn.id} from ${this.name} pool`);
|
61 | conn.context = context;
|
62 | return conn;
|
63 | } catch (error) {
|
64 | logger.error(`Failed to acquire connection from ${this.name} pool`, {
|
65 | error: error.message || error,
|
66 | });
|
67 | throw error;
|
68 | }
|
69 | }
|
70 |
|
71 | release(conn) {
|
72 | const logger = this.loggerForContext(conn.context);
|
73 | logger.info(`Releasing connection #${conn.id} into ${this.name} pool`);
|
74 | this.reset(conn);
|
75 |
|
76 | delete conn.context;
|
77 | this.pool.release(conn);
|
78 | }
|
79 |
|
80 | destroy(conn) {
|
81 | const logger = this.loggerForContext(conn.context);
|
82 | logger.info(`Destroying connection #${conn.id} of ${this.name} pool`);
|
83 | this.reset(conn);
|
84 |
|
85 | delete conn.context;
|
86 | this.pool.destroy(conn);
|
87 | }
|
88 |
|
89 | async destroyAllNow() {
|
90 | winston.debug(`Pool ${this.name} shutting down`);
|
91 | await this.pool.drain();
|
92 | winston.debug(`Pool ${this.name} drained`);
|
93 | await this.pool.clear();
|
94 | winston.debug(`Pool ${this.name} cleared`);
|
95 | }
|
96 |
|
97 | async connect() {
|
98 | this.connectionCount += 1;
|
99 | const myId = this.connectionCount;
|
100 | winston.info(`Pool ${this.name} socket #${myId} connecting`);
|
101 | let attemptCompleted = false;
|
102 | let socket;
|
103 |
|
104 | return new Promise((accept, reject) => {
|
105 | let resolved = false;
|
106 | const connectionHandler = async () => {
|
107 | if (!attemptCompleted) {
|
108 | winston.info(`Pool ${this.name} socket #${myId} connected`);
|
109 | attemptCompleted = true;
|
110 | socket.removeAllListeners();
|
111 | const connectionParser = new (this.Parser)(socket, myId);
|
112 | if (typeof connectionParser.initializeConnection === 'function') {
|
113 | try {
|
114 | await connectionParser.initializeConnection();
|
115 | } catch (error) {
|
116 | reject(error);
|
117 | return;
|
118 | }
|
119 | }
|
120 | this.reset(connectionParser);
|
121 | resolved = true;
|
122 | accept(connectionParser);
|
123 | }
|
124 | };
|
125 |
|
126 | try {
|
127 | if (this.options.insecure === true) {
|
128 | socket = net.connect({
|
129 | host: this.options.host,
|
130 | port: this.options.port,
|
131 | }, connectionHandler);
|
132 | } else {
|
133 | const tlsOptions = Object.assign({
|
134 | secureProtocol: 'TLSv1_2_client_method',
|
135 | host: this.options.host,
|
136 | port: this.options.port,
|
137 | }, this.options.tlsOptions);
|
138 | socket = tls.connect(tlsOptions, connectionHandler);
|
139 | }
|
140 |
|
141 | socket.once('error', (error) => {
|
142 | winston.error(`Error on Pool ${this.name} socket #${myId}`, {
|
143 | message: error.message,
|
144 | stack: error.stack,
|
145 | });
|
146 | if (!attemptCompleted) {
|
147 | attemptCompleted = true;
|
148 | socket.end();
|
149 |
|
150 | if (!resolved) {
|
151 | setTimeout(() => reject(error), 1000);
|
152 | resolved = true;
|
153 | }
|
154 | }
|
155 | });
|
156 | } catch (error) {
|
157 | winston.error(`Error on Pool ${this.name}`, {
|
158 | message: error.message,
|
159 | stack: error.stack,
|
160 | });
|
161 | if (!resolved) {
|
162 | reject(error);
|
163 | }
|
164 | }
|
165 | });
|
166 | }
|
167 |
|
168 | loggerForContext(context) {
|
169 | if (this.options.loggerFromContext) {
|
170 | return this.options.loggerFromContext(context) || winston;
|
171 | }
|
172 | return winston;
|
173 | }
|
174 |
|
175 | reset(conn) {
|
176 | conn.removeAllListeners();
|
177 | conn.on('error', error => this.onError(conn, error));
|
178 | conn.on('close', error => this.onClose(conn, error));
|
179 | }
|
180 |
|
181 | onError(conn, error) {
|
182 | const logger = this.loggerForContext(conn.context);
|
183 | logger.error(`Error on Pool ${this.name} socket #${conn.id}`, {
|
184 | message: error.message,
|
185 | stack: error.stack,
|
186 | });
|
187 | conn.end();
|
188 | this.pool.destroy(conn);
|
189 | }
|
190 |
|
191 | onClose(conn) {
|
192 | const logger = this.loggerForContext(conn.context);
|
193 | logger.info(`Pool ${this.name} socket #${conn.id} closed`);
|
194 | }
|
195 |
|
196 | validate(conn) {
|
197 | return new Promise((accept) => {
|
198 | if (typeof conn.validate === 'function') {
|
199 | Promise.resolve(conn.validate())
|
200 | .then(isValid => accept(isValid));
|
201 | } else {
|
202 | if (conn.readyState === 'open') {
|
203 | accept(true);
|
204 | return;
|
205 | }
|
206 | winston.error(`Invalid connection in Pool ${this.name} socket #${conn.id}`);
|
207 | accept(false);
|
208 | }
|
209 | });
|
210 | }
|
211 |
|
212 | disconnect(conn) {
|
213 | return new Promise((accept, reject) => {
|
214 | try {
|
215 | winston.debug(`Pool ${this.name} socket #${conn.id} closing`);
|
216 | conn.destroy();
|
217 | accept();
|
218 | } catch (error) {
|
219 | reject(error);
|
220 | }
|
221 | });
|
222 | }
|
223 | }
|
224 |
|
225 | TcpPool.ConnectionInterface = ConnectionInterface;
|