1 | 'use strict'
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 | var net = require('net')
|
11 | var EventEmitter = require('events').EventEmitter
|
12 | var util = require('util')
|
13 |
|
14 | const { parse, serialize } = require('pg-protocol')
|
15 |
|
16 |
|
17 | var Connection = function (config) {
|
18 | EventEmitter.call(this)
|
19 | config = config || {}
|
20 | this.stream = config.stream || new net.Socket()
|
21 | this._keepAlive = config.keepAlive
|
22 | this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
|
23 | this.lastBuffer = false
|
24 | this.parsedStatements = {}
|
25 | this.ssl = config.ssl || false
|
26 | this._ending = false
|
27 | this._emitMessage = false
|
28 | var self = this
|
29 | this.on('newListener', function (eventName) {
|
30 | if (eventName === 'message') {
|
31 | self._emitMessage = true
|
32 | }
|
33 | })
|
34 | }
|
35 |
|
36 | util.inherits(Connection, EventEmitter)
|
37 |
|
38 | Connection.prototype.connect = function (port, host) {
|
39 | var self = this
|
40 |
|
41 | this._connecting = true
|
42 | this.stream.setNoDelay(true)
|
43 | this.stream.connect(port, host)
|
44 |
|
45 | this.stream.once('connect', function () {
|
46 | if (self._keepAlive) {
|
47 | self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
|
48 | }
|
49 | self.emit('connect')
|
50 | })
|
51 |
|
52 | const reportStreamError = function (error) {
|
53 |
|
54 | if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
|
55 | return
|
56 | }
|
57 | self.emit('error', error)
|
58 | }
|
59 | this.stream.on('error', reportStreamError)
|
60 |
|
61 | this.stream.on('close', function () {
|
62 | self.emit('end')
|
63 | })
|
64 |
|
65 | if (!this.ssl) {
|
66 | return this.attachListeners(this.stream)
|
67 | }
|
68 |
|
69 | this.stream.once('data', function (buffer) {
|
70 | var responseCode = buffer.toString('utf8')
|
71 | switch (responseCode) {
|
72 | case 'S':
|
73 | break
|
74 | case 'N':
|
75 | self.stream.end()
|
76 | return self.emit('error', new Error('The server does not support SSL connections'))
|
77 | default:
|
78 |
|
79 | self.stream.end()
|
80 | return self.emit('error', new Error('There was an error establishing an SSL connection'))
|
81 | }
|
82 | var tls = require('tls')
|
83 | const options = Object.assign(
|
84 | {
|
85 | socket: self.stream,
|
86 | },
|
87 | self.ssl
|
88 | )
|
89 | if (net.isIP(host) === 0) {
|
90 | options.servername = host
|
91 | }
|
92 | self.stream = tls.connect(options)
|
93 | self.attachListeners(self.stream)
|
94 | self.stream.on('error', reportStreamError)
|
95 |
|
96 | self.emit('sslconnect')
|
97 | })
|
98 | }
|
99 |
|
100 | Connection.prototype.attachListeners = function (stream) {
|
101 | stream.on('end', () => {
|
102 | this.emit('end')
|
103 | })
|
104 | parse(stream, (msg) => {
|
105 | var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
|
106 | if (this._emitMessage) {
|
107 | this.emit('message', msg)
|
108 | }
|
109 | this.emit(eventName, msg)
|
110 | })
|
111 | }
|
112 |
|
113 | Connection.prototype.requestSsl = function () {
|
114 | this.stream.write(serialize.requestSsl())
|
115 | }
|
116 |
|
117 | Connection.prototype.startup = function (config) {
|
118 | this.stream.write(serialize.startup(config))
|
119 | }
|
120 |
|
121 | Connection.prototype.cancel = function (processID, secretKey) {
|
122 | this._send(serialize.cancel(processID, secretKey))
|
123 | }
|
124 |
|
125 | Connection.prototype.password = function (password) {
|
126 | this._send(serialize.password(password))
|
127 | }
|
128 |
|
129 | Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
|
130 | this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
|
131 | }
|
132 |
|
133 | Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
|
134 | this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
|
135 | }
|
136 |
|
137 | Connection.prototype._send = function (buffer) {
|
138 | if (!this.stream.writable) {
|
139 | return false
|
140 | }
|
141 | return this.stream.write(buffer)
|
142 | }
|
143 |
|
144 | Connection.prototype.query = function (text) {
|
145 | this._send(serialize.query(text))
|
146 | }
|
147 |
|
148 |
|
149 | Connection.prototype.parse = function (query) {
|
150 | this._send(serialize.parse(query))
|
151 | }
|
152 |
|
153 |
|
154 |
|
155 | Connection.prototype.bind = function (config) {
|
156 | this._send(serialize.bind(config))
|
157 | }
|
158 |
|
159 |
|
160 |
|
161 | Connection.prototype.execute = function (config) {
|
162 | this._send(serialize.execute(config))
|
163 | }
|
164 |
|
165 | const flushBuffer = serialize.flush()
|
166 | Connection.prototype.flush = function () {
|
167 | if (this.stream.writable) {
|
168 | this.stream.write(flushBuffer)
|
169 | }
|
170 | }
|
171 |
|
172 | const syncBuffer = serialize.sync()
|
173 | Connection.prototype.sync = function () {
|
174 | this._ending = true
|
175 | this._send(syncBuffer)
|
176 | this._send(flushBuffer)
|
177 | }
|
178 |
|
179 | const endBuffer = serialize.end()
|
180 |
|
181 | Connection.prototype.end = function () {
|
182 |
|
183 | this._ending = true
|
184 | if (!this._connecting || !this.stream.writable) {
|
185 | this.stream.end()
|
186 | return
|
187 | }
|
188 | return this.stream.write(endBuffer, () => {
|
189 | this.stream.end()
|
190 | })
|
191 | }
|
192 |
|
193 | Connection.prototype.close = function (msg) {
|
194 | this._send(serialize.close(msg))
|
195 | }
|
196 |
|
197 | Connection.prototype.describe = function (msg) {
|
198 | this._send(serialize.describe(msg))
|
199 | }
|
200 |
|
201 | Connection.prototype.sendCopyFromChunk = function (chunk) {
|
202 | this._send(serialize.copyData(chunk))
|
203 | }
|
204 |
|
205 | Connection.prototype.endCopyFrom = function () {
|
206 | this._send(serialize.copyDone())
|
207 | }
|
208 |
|
209 | Connection.prototype.sendCopyFail = function (msg) {
|
210 | this._send(serialize.copyFail(msg))
|
211 | }
|
212 |
|
213 | module.exports = Connection
|