UNPKG

6.35 kBJavaScriptView Raw
1import tls from 'tls';
2import net from 'net';
3import winston from 'winston';
4import pool from 'generic-pool';
5import { EventEmitter } from 'events';
6
7class ConnectionInterface extends EventEmitter {
8 constructor(socket, id) {
9 super();
10 this.id = id;
11 this.socket = socket;
12 this.socket.on('error', e => this.emit('error', e));
13 this.socket.on('close', e => this.emit('close', e));
14 }
15
16 send(buffer) {
17 this.socket.write(buffer);
18 }
19
20 destroy() {
21 this.socket.destroy();
22 }
23
24 end() {
25 this.socket.end();
26 }
27
28 get readyState() {
29 return this.socket.readyState;
30 }
31}
32
33export default class TcpPool {
34 constructor(interfaceConstructor, options) {
35 this.Parser = interfaceConstructor;
36 this.options = Object.assign({}, options);
37 this.name = this.options.name || `${interfaceConstructor.name} Connection Pool`;
38 this.connectionCount = 0;
39
40 const factory = {
41 create: () => this.connect(),
42 destroy: client => this.disconnect(client),
43 validate: client => this.validate(client),
44 };
45 const config = {
46 max: this.options.max || 10,
47 min: this.options.min || 1,
48 acquireTimeoutMillis: this.options.acquireTimeoutMillis || 15000,
49 idleTimeoutMillis: this.options.idleTimeoutMillis || 30000,
50 testOnBorrow: true,
51 };
52 this.pool = pool.createPool(factory, config);
53 }
54
55 async acquire(context) {
56 const logger = this.loggerForContext(context);
57 logger.info(`Acquiring connection from ${this.name} pool`);
58 try {
59 const conn = await this.pool.acquire();
60 logger.info(`Returning connection #${conn.id} from ${this.name} pool`);
61 conn.context = context;
62 return conn;
63 } catch (error) {
64 logger.error(`Failed to acquire connection from ${this.name} pool`, {
65 error: error.message || error,
66 });
67 throw error;
68 }
69 }
70
71 release(conn) {
72 const logger = this.loggerForContext(conn.context);
73 logger.info(`Releasing connection #${conn.id} into ${this.name} pool`);
74 this.reset(conn);
75 // eslint-disable-next-line no-param-reassign
76 delete conn.context;
77 this.pool.release(conn);
78 }
79
80 destroy(conn) {
81 const logger = this.loggerForContext(conn.context);
82 logger.info(`Destroying connection #${conn.id} of ${this.name} pool`);
83 this.reset(conn);
84 // eslint-disable-next-line no-param-reassign
85 delete conn.context;
86 this.pool.destroy(conn);
87 }
88
89 async destroyAllNow() {
90 winston.debug(`Pool ${this.name} shutting down`);
91 await this.pool.drain();
92 winston.debug(`Pool ${this.name} drained`);
93 await this.pool.clear();
94 winston.debug(`Pool ${this.name} cleared`);
95 }
96
97 async connect() {
98 this.connectionCount += 1;
99 const myId = this.connectionCount;
100 winston.info(`Pool ${this.name} socket #${myId} connecting`);
101 let attemptCompleted = false;
102 let socket;
103
104 return new Promise((accept, reject) => {
105 let resolved = false;
106 const connectionHandler = async () => {
107 if (!attemptCompleted) {
108 winston.info(`Pool ${this.name} socket #${myId} connected`);
109 attemptCompleted = true;
110 socket.removeAllListeners();
111 const connectionParser = new (this.Parser)(socket, myId);
112 if (typeof connectionParser.initializeConnection === 'function') {
113 try {
114 await connectionParser.initializeConnection();
115 } catch (error) {
116 reject(error);
117 return;
118 }
119 }
120 this.reset(connectionParser);
121 resolved = true;
122 accept(connectionParser);
123 }
124 };
125
126 try {
127 if (this.options.insecure === true) {
128 socket = net.connect({
129 host: this.options.host,
130 port: this.options.port,
131 }, connectionHandler);
132 } else {
133 const tlsOptions = Object.assign({
134 secureProtocol: 'TLSv1_2_client_method',
135 host: this.options.host,
136 port: this.options.port,
137 }, this.options.tlsOptions);
138 socket = tls.connect(tlsOptions, connectionHandler);
139 }
140
141 socket.once('error', (error) => {
142 winston.error(`Error on Pool ${this.name} socket #${myId}`, {
143 message: error.message,
144 stack: error.stack,
145 });
146 if (!attemptCompleted) {
147 attemptCompleted = true;
148 socket.end();
149 // Reject after a second to give some backoff time
150 if (!resolved) {
151 setTimeout(() => reject(error), 1000);
152 resolved = true;
153 }
154 }
155 });
156 } catch (error) {
157 winston.error(`Error on Pool ${this.name}`, {
158 message: error.message,
159 stack: error.stack,
160 });
161 if (!resolved) {
162 reject(error);
163 }
164 }
165 });
166 }
167
168 loggerForContext(context) {
169 if (this.options.loggerFromContext) {
170 return this.options.loggerFromContext(context) || winston;
171 }
172 return winston;
173 }
174
175 reset(conn) {
176 conn.removeAllListeners();
177 conn.on('error', error => this.onError(conn, error));
178 conn.on('close', error => this.onClose(conn, error));
179 }
180
181 onError(conn, error) {
182 const logger = this.loggerForContext(conn.context);
183 logger.error(`Error on Pool ${this.name} socket #${conn.id}`, {
184 message: error.message,
185 stack: error.stack,
186 });
187 conn.end();
188 this.pool.destroy(conn);
189 }
190
191 onClose(conn) {
192 const logger = this.loggerForContext(conn.context);
193 logger.info(`Pool ${this.name} socket #${conn.id} closed`);
194 }
195
196 validate(conn) {
197 return new Promise((accept) => {
198 if (typeof conn.validate === 'function') {
199 Promise.resolve(conn.validate())
200 .then(isValid => accept(isValid));
201 } else {
202 if (conn.readyState === 'open') {
203 accept(true);
204 return;
205 }
206 winston.error(`Invalid connection in Pool ${this.name} socket #${conn.id}`);
207 accept(false);
208 }
209 });
210 }
211
212 disconnect(conn) {
213 return new Promise((accept, reject) => {
214 try {
215 winston.debug(`Pool ${this.name} socket #${conn.id} closing`);
216 conn.destroy();
217 accept();
218 } catch (error) {
219 reject(error);
220 }
221 });
222 }
223}
224
225TcpPool.ConnectionInterface = ConnectionInterface;