UNPKG

15.8 kBJavaScriptView Raw
1'use strict'
2/**
3 * Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
4 * All rights reserved.
5 *
6 * This source code is licensed under the MIT license found in the
7 * README.md file in the root directory of this source tree.
8 */
9
10var EventEmitter = require('events').EventEmitter
11var util = require('util')
12var utils = require('./utils')
13var sasl = require('./sasl')
14var pgPass = require('pgpass')
15var TypeOverrides = require('./type-overrides')
16
17var ConnectionParameters = require('./connection-parameters')
18var Query = require('./query')
19var defaults = require('./defaults')
20var Connection = require('./connection')
21
22var Client = function (config) {
23 EventEmitter.call(this)
24
25 this.connectionParameters = new ConnectionParameters(config)
26 this.user = this.connectionParameters.user
27 this.database = this.connectionParameters.database
28 this.port = this.connectionParameters.port
29 this.host = this.connectionParameters.host
30
31 // "hiding" the password so it doesn't show up in stack traces
32 // or if the client is console.logged
33 Object.defineProperty(this, 'password', {
34 configurable: true,
35 enumerable: false,
36 writable: true,
37 value: this.connectionParameters.password,
38 })
39
40 this.replication = this.connectionParameters.replication
41
42 var c = config || {}
43
44 this._Promise = c.Promise || global.Promise
45 this._types = new TypeOverrides(c.types)
46 this._ending = false
47 this._connecting = false
48 this._connected = false
49 this._connectionError = false
50 this._queryable = true
51
52 this.connection =
53 c.connection ||
54 new Connection({
55 stream: c.stream,
56 ssl: this.connectionParameters.ssl,
57 keepAlive: c.keepAlive || false,
58 keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
59 encoding: this.connectionParameters.client_encoding || 'utf8',
60 })
61 this.queryQueue = []
62 this.binary = c.binary || defaults.binary
63 this.processID = null
64 this.secretKey = null
65 this.ssl = this.connectionParameters.ssl || false
66 this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0
67}
68
69util.inherits(Client, EventEmitter)
70
71Client.prototype._errorAllQueries = function (err) {
72 const enqueueError = (query) => {
73 process.nextTick(() => {
74 query.handleError(err, this.connection)
75 })
76 }
77
78 if (this.activeQuery) {
79 enqueueError(this.activeQuery)
80 this.activeQuery = null
81 }
82
83 this.queryQueue.forEach(enqueueError)
84 this.queryQueue.length = 0
85}
86
87Client.prototype._connect = function (callback) {
88 var self = this
89 var con = this.connection
90 if (this._connecting || this._connected) {
91 const err = new Error('Client has already been connected. You cannot reuse a client.')
92 process.nextTick(() => {
93 callback(err)
94 })
95 return
96 }
97 this._connecting = true
98
99 var connectionTimeoutHandle
100 if (this._connectionTimeoutMillis > 0) {
101 connectionTimeoutHandle = setTimeout(() => {
102 con._ending = true
103 con.stream.destroy(new Error('timeout expired'))
104 }, this._connectionTimeoutMillis)
105 }
106
107 if (this.host && this.host.indexOf('/') === 0) {
108 con.connect(this.host + '/.s.PGSQL.' + this.port)
109 } else {
110 con.connect(this.port, this.host)
111 }
112
113 // once connection is established send startup message
114 con.on('connect', function () {
115 if (self.ssl) {
116 con.requestSsl()
117 } else {
118 con.startup(self.getStartupConf())
119 }
120 })
121
122 con.on('sslconnect', function () {
123 con.startup(self.getStartupConf())
124 })
125
126 function checkPgPass(cb) {
127 return function (msg) {
128 if (typeof self.password === 'function') {
129 self._Promise
130 .resolve()
131 .then(() => self.password())
132 .then((pass) => {
133 if (pass !== undefined) {
134 if (typeof pass !== 'string') {
135 con.emit('error', new TypeError('Password must be a string'))
136 return
137 }
138 self.connectionParameters.password = self.password = pass
139 } else {
140 self.connectionParameters.password = self.password = null
141 }
142 cb(msg)
143 })
144 .catch((err) => {
145 con.emit('error', err)
146 })
147 } else if (self.password !== null) {
148 cb(msg)
149 } else {
150 pgPass(self.connectionParameters, function (pass) {
151 if (undefined !== pass) {
152 self.connectionParameters.password = self.password = pass
153 }
154 cb(msg)
155 })
156 }
157 }
158 }
159
160 // password request handling
161 con.on(
162 'authenticationCleartextPassword',
163 checkPgPass(function () {
164 con.password(self.password)
165 })
166 )
167
168 // password request handling
169 con.on(
170 'authenticationMD5Password',
171 checkPgPass(function (msg) {
172 con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt))
173 })
174 )
175
176 // password request handling (SASL)
177 var saslSession
178 con.on(
179 'authenticationSASL',
180 checkPgPass(function (msg) {
181 saslSession = sasl.startSession(msg.mechanisms)
182
183 con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response)
184 })
185 )
186
187 // password request handling (SASL)
188 con.on('authenticationSASLContinue', function (msg) {
189 sasl.continueSession(saslSession, self.password, msg.data)
190
191 con.sendSCRAMClientFinalMessage(saslSession.response)
192 })
193
194 // password request handling (SASL)
195 con.on('authenticationSASLFinal', function (msg) {
196 sasl.finalizeSession(saslSession, msg.data)
197
198 saslSession = null
199 })
200
201 con.once('backendKeyData', function (msg) {
202 self.processID = msg.processID
203 self.secretKey = msg.secretKey
204 })
205
206 const connectingErrorHandler = (err) => {
207 if (this._connectionError) {
208 return
209 }
210 this._connectionError = true
211 clearTimeout(connectionTimeoutHandle)
212 if (callback) {
213 return callback(err)
214 }
215 this.emit('error', err)
216 }
217
218 const connectedErrorHandler = (err) => {
219 this._queryable = false
220 this._errorAllQueries(err)
221 this.emit('error', err)
222 }
223
224 const connectedErrorMessageHandler = (msg) => {
225 const activeQuery = this.activeQuery
226
227 if (!activeQuery) {
228 connectedErrorHandler(msg)
229 return
230 }
231
232 this.activeQuery = null
233 activeQuery.handleError(msg, con)
234 }
235
236 con.on('error', connectingErrorHandler)
237 con.on('errorMessage', connectingErrorHandler)
238
239 // hook up query handling events to connection
240 // after the connection initially becomes ready for queries
241 con.once('readyForQuery', function () {
242 self._connecting = false
243 self._connected = true
244 self._attachListeners(con)
245 con.removeListener('error', connectingErrorHandler)
246 con.removeListener('errorMessage', connectingErrorHandler)
247 con.on('error', connectedErrorHandler)
248 con.on('errorMessage', connectedErrorMessageHandler)
249 clearTimeout(connectionTimeoutHandle)
250
251 // process possible callback argument to Client#connect
252 if (callback) {
253 callback(null, self)
254 // remove callback for proper error handling
255 // after the connect event
256 callback = null
257 }
258 self.emit('connect')
259 })
260
261 con.on('readyForQuery', function () {
262 var activeQuery = self.activeQuery
263 self.activeQuery = null
264 self.readyForQuery = true
265 if (activeQuery) {
266 activeQuery.handleReadyForQuery(con)
267 }
268 self._pulseQueryQueue()
269 })
270
271 con.once('end', () => {
272 const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly')
273
274 clearTimeout(connectionTimeoutHandle)
275 this._errorAllQueries(error)
276
277 if (!this._ending) {
278 // if the connection is ended without us calling .end()
279 // on this client then we have an unexpected disconnection
280 // treat this as an error unless we've already emitted an error
281 // during connection.
282 if (this._connecting && !this._connectionError) {
283 if (callback) {
284 callback(error)
285 } else {
286 connectedErrorHandler(error)
287 }
288 } else if (!this._connectionError) {
289 connectedErrorHandler(error)
290 }
291 }
292
293 process.nextTick(() => {
294 this.emit('end')
295 })
296 })
297
298 con.on('notice', function (msg) {
299 self.emit('notice', msg)
300 })
301}
302
303Client.prototype.connect = function (callback) {
304 if (callback) {
305 this._connect(callback)
306 return
307 }
308
309 return new this._Promise((resolve, reject) => {
310 this._connect((error) => {
311 if (error) {
312 reject(error)
313 } else {
314 resolve()
315 }
316 })
317 })
318}
319
320Client.prototype._attachListeners = function (con) {
321 const self = this
322 // delegate rowDescription to active query
323 con.on('rowDescription', function (msg) {
324 self.activeQuery.handleRowDescription(msg)
325 })
326
327 // delegate dataRow to active query
328 con.on('dataRow', function (msg) {
329 self.activeQuery.handleDataRow(msg)
330 })
331
332 // delegate portalSuspended to active query
333 // eslint-disable-next-line no-unused-vars
334 con.on('portalSuspended', function (msg) {
335 self.activeQuery.handlePortalSuspended(con)
336 })
337
338 // delegate emptyQuery to active query
339 // eslint-disable-next-line no-unused-vars
340 con.on('emptyQuery', function (msg) {
341 self.activeQuery.handleEmptyQuery(con)
342 })
343
344 // delegate commandComplete to active query
345 con.on('commandComplete', function (msg) {
346 self.activeQuery.handleCommandComplete(msg, con)
347 })
348
349 // if a prepared statement has a name and properly parses
350 // we track that its already been executed so we don't parse
351 // it again on the same client
352 // eslint-disable-next-line no-unused-vars
353 con.on('parseComplete', function (msg) {
354 if (self.activeQuery.name) {
355 con.parsedStatements[self.activeQuery.name] = self.activeQuery.text
356 }
357 })
358
359 // eslint-disable-next-line no-unused-vars
360 con.on('copyInResponse', function (msg) {
361 self.activeQuery.handleCopyInResponse(self.connection)
362 })
363
364 con.on('copyData', function (msg) {
365 self.activeQuery.handleCopyData(msg, self.connection)
366 })
367
368 con.on('notification', function (msg) {
369 self.emit('notification', msg)
370 })
371}
372
373Client.prototype.getStartupConf = function () {
374 var params = this.connectionParameters
375
376 var data = {
377 user: params.user,
378 database: params.database,
379 }
380
381 var appName = params.application_name || params.fallback_application_name
382 if (appName) {
383 data.application_name = appName
384 }
385 if (params.replication) {
386 data.replication = '' + params.replication
387 }
388 if (params.statement_timeout) {
389 data.statement_timeout = String(parseInt(params.statement_timeout, 10))
390 }
391 if (params.idle_in_transaction_session_timeout) {
392 data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10))
393 }
394
395 return data
396}
397
398Client.prototype.cancel = function (client, query) {
399 if (client.activeQuery === query) {
400 var con = this.connection
401
402 if (this.host && this.host.indexOf('/') === 0) {
403 con.connect(this.host + '/.s.PGSQL.' + this.port)
404 } else {
405 con.connect(this.port, this.host)
406 }
407
408 // once connection is established send cancel message
409 con.on('connect', function () {
410 con.cancel(client.processID, client.secretKey)
411 })
412 } else if (client.queryQueue.indexOf(query) !== -1) {
413 client.queryQueue.splice(client.queryQueue.indexOf(query), 1)
414 }
415}
416
417Client.prototype.setTypeParser = function (oid, format, parseFn) {
418 return this._types.setTypeParser(oid, format, parseFn)
419}
420
421Client.prototype.getTypeParser = function (oid, format) {
422 return this._types.getTypeParser(oid, format)
423}
424
425// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c
426Client.prototype.escapeIdentifier = function (str) {
427 return '"' + str.replace(/"/g, '""') + '"'
428}
429
430// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c
431Client.prototype.escapeLiteral = function (str) {
432 var hasBackslash = false
433 var escaped = "'"
434
435 for (var i = 0; i < str.length; i++) {
436 var c = str[i]
437 if (c === "'") {
438 escaped += c + c
439 } else if (c === '\\') {
440 escaped += c + c
441 hasBackslash = true
442 } else {
443 escaped += c
444 }
445 }
446
447 escaped += "'"
448
449 if (hasBackslash === true) {
450 escaped = ' E' + escaped
451 }
452
453 return escaped
454}
455
456Client.prototype._pulseQueryQueue = function () {
457 if (this.readyForQuery === true) {
458 this.activeQuery = this.queryQueue.shift()
459 if (this.activeQuery) {
460 this.readyForQuery = false
461 this.hasExecuted = true
462
463 const queryError = this.activeQuery.submit(this.connection)
464 if (queryError) {
465 process.nextTick(() => {
466 this.activeQuery.handleError(queryError, this.connection)
467 this.readyForQuery = true
468 this._pulseQueryQueue()
469 })
470 }
471 } else if (this.hasExecuted) {
472 this.activeQuery = null
473 this.emit('drain')
474 }
475 }
476}
477
478Client.prototype.query = function (config, values, callback) {
479 // can take in strings, config object or query object
480 var query
481 var result
482 var readTimeout
483 var readTimeoutTimer
484 var queryCallback
485
486 if (config === null || config === undefined) {
487 throw new TypeError('Client was passed a null or undefined query')
488 } else if (typeof config.submit === 'function') {
489 readTimeout = config.query_timeout || this.connectionParameters.query_timeout
490 result = query = config
491 if (typeof values === 'function') {
492 query.callback = query.callback || values
493 }
494 } else {
495 readTimeout = this.connectionParameters.query_timeout
496 query = new Query(config, values, callback)
497 if (!query.callback) {
498 result = new this._Promise((resolve, reject) => {
499 query.callback = (err, res) => (err ? reject(err) : resolve(res))
500 })
501 }
502 }
503
504 if (readTimeout) {
505 queryCallback = query.callback
506
507 readTimeoutTimer = setTimeout(() => {
508 var error = new Error('Query read timeout')
509
510 process.nextTick(() => {
511 query.handleError(error, this.connection)
512 })
513
514 queryCallback(error)
515
516 // we already returned an error,
517 // just do nothing if query completes
518 query.callback = () => {}
519
520 // Remove from queue
521 var index = this.queryQueue.indexOf(query)
522 if (index > -1) {
523 this.queryQueue.splice(index, 1)
524 }
525
526 this._pulseQueryQueue()
527 }, readTimeout)
528
529 query.callback = (err, res) => {
530 clearTimeout(readTimeoutTimer)
531 queryCallback(err, res)
532 }
533 }
534
535 if (this.binary && !query.binary) {
536 query.binary = true
537 }
538
539 if (query._result && !query._result._types) {
540 query._result._types = this._types
541 }
542
543 if (!this._queryable) {
544 process.nextTick(() => {
545 query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
546 })
547 return result
548 }
549
550 if (this._ending) {
551 process.nextTick(() => {
552 query.handleError(new Error('Client was closed and is not queryable'), this.connection)
553 })
554 return result
555 }
556
557 this.queryQueue.push(query)
558 this._pulseQueryQueue()
559 return result
560}
561
562Client.prototype.end = function (cb) {
563 this._ending = true
564
565 // if we have never connected, then end is a noop, callback immediately
566 if (!this.connection._connecting) {
567 if (cb) {
568 cb()
569 } else {
570 return this._Promise.resolve()
571 }
572 }
573
574 if (this.activeQuery || !this._queryable) {
575 // if we have an active query we need to force a disconnect
576 // on the socket - otherwise a hung query could block end forever
577 this.connection.stream.destroy()
578 } else {
579 this.connection.end()
580 }
581
582 if (cb) {
583 this.connection.once('end', cb)
584 } else {
585 return new this._Promise((resolve) => {
586 this.connection.once('end', resolve)
587 })
588 }
589}
590
591// expose a Query constructor
592Client.Query = Query
593
594module.exports = Client