UNPKG

5.63 kBJavaScriptView Raw
1'use strict'
2/**
3 * Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
4 * All rights reserved.
5 *
6 * This source code is licensed under the MIT license found in the
7 * README.md file in the root directory of this source tree.
8 */
9
10var net = require('net')
11var EventEmitter = require('events').EventEmitter
12var util = require('util')
13
14const { parse, serialize } = require('pg-protocol')
15
16// TODO(bmc) support binary mode at some point
17var Connection = function (config) {
18 EventEmitter.call(this)
19 config = config || {}
20 this.stream = config.stream || new net.Socket()
21 this._keepAlive = config.keepAlive
22 this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
23 this.lastBuffer = false
24 this.parsedStatements = {}
25 this.ssl = config.ssl || false
26 this._ending = false
27 this._emitMessage = false
28 var self = this
29 this.on('newListener', function (eventName) {
30 if (eventName === 'message') {
31 self._emitMessage = true
32 }
33 })
34}
35
36util.inherits(Connection, EventEmitter)
37
38Connection.prototype.connect = function (port, host) {
39 var self = this
40
41 this._connecting = true
42 this.stream.setNoDelay(true)
43 this.stream.connect(port, host)
44
45 this.stream.once('connect', function () {
46 if (self._keepAlive) {
47 self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
48 }
49 self.emit('connect')
50 })
51
52 const reportStreamError = function (error) {
53 // errors about disconnections should be ignored during disconnect
54 if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
55 return
56 }
57 self.emit('error', error)
58 }
59 this.stream.on('error', reportStreamError)
60
61 this.stream.on('close', function () {
62 self.emit('end')
63 })
64
65 if (!this.ssl) {
66 return this.attachListeners(this.stream)
67 }
68
69 this.stream.once('data', function (buffer) {
70 var responseCode = buffer.toString('utf8')
71 switch (responseCode) {
72 case 'S': // Server supports SSL connections, continue with a secure connection
73 break
74 case 'N': // Server does not support SSL connections
75 self.stream.end()
76 return self.emit('error', new Error('The server does not support SSL connections'))
77 default:
78 // Any other response byte, including 'E' (ErrorResponse) indicating a server error
79 self.stream.end()
80 return self.emit('error', new Error('There was an error establishing an SSL connection'))
81 }
82 var tls = require('tls')
83 const options = Object.assign(
84 {
85 socket: self.stream,
86 },
87 self.ssl
88 )
89 if (net.isIP(host) === 0) {
90 options.servername = host
91 }
92 self.stream = tls.connect(options)
93 self.attachListeners(self.stream)
94 self.stream.on('error', reportStreamError)
95
96 self.emit('sslconnect')
97 })
98}
99
100Connection.prototype.attachListeners = function (stream) {
101 stream.on('end', () => {
102 this.emit('end')
103 })
104 parse(stream, (msg) => {
105 var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
106 if (this._emitMessage) {
107 this.emit('message', msg)
108 }
109 this.emit(eventName, msg)
110 })
111}
112
113Connection.prototype.requestSsl = function () {
114 this.stream.write(serialize.requestSsl())
115}
116
117Connection.prototype.startup = function (config) {
118 this.stream.write(serialize.startup(config))
119}
120
121Connection.prototype.cancel = function (processID, secretKey) {
122 this._send(serialize.cancel(processID, secretKey))
123}
124
125Connection.prototype.password = function (password) {
126 this._send(serialize.password(password))
127}
128
129Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
130 this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
131}
132
133Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
134 this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
135}
136
137Connection.prototype._send = function (buffer) {
138 if (!this.stream.writable) {
139 return false
140 }
141 return this.stream.write(buffer)
142}
143
144Connection.prototype.query = function (text) {
145 this._send(serialize.query(text))
146}
147
148// send parse message
149Connection.prototype.parse = function (query) {
150 this._send(serialize.parse(query))
151}
152
153// send bind message
154// "more" === true to buffer the message until flush() is called
155Connection.prototype.bind = function (config) {
156 this._send(serialize.bind(config))
157}
158
159// send execute message
160// "more" === true to buffer the message until flush() is called
161Connection.prototype.execute = function (config) {
162 this._send(serialize.execute(config))
163}
164
165const flushBuffer = serialize.flush()
166Connection.prototype.flush = function () {
167 if (this.stream.writable) {
168 this.stream.write(flushBuffer)
169 }
170}
171
172const syncBuffer = serialize.sync()
173Connection.prototype.sync = function () {
174 this._ending = true
175 this._send(syncBuffer)
176 this._send(flushBuffer)
177}
178
179const endBuffer = serialize.end()
180
181Connection.prototype.end = function () {
182 // 0x58 = 'X'
183 this._ending = true
184 if (!this._connecting || !this.stream.writable) {
185 this.stream.end()
186 return
187 }
188 return this.stream.write(endBuffer, () => {
189 this.stream.end()
190 })
191}
192
193Connection.prototype.close = function (msg) {
194 this._send(serialize.close(msg))
195}
196
197Connection.prototype.describe = function (msg) {
198 this._send(serialize.describe(msg))
199}
200
201Connection.prototype.sendCopyFromChunk = function (chunk) {
202 this._send(serialize.copyData(chunk))
203}
204
205Connection.prototype.endCopyFrom = function () {
206 this._send(serialize.copyDone())
207}
208
209Connection.prototype.sendCopyFail = function (msg) {
210 this._send(serialize.copyFail(msg))
211}
212
213module.exports = Connection