1 | 'use strict'
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 | var EventEmitter = require('events').EventEmitter
|
11 | var util = require('util')
|
12 | var utils = require('./utils')
|
13 | var sasl = require('./sasl')
|
14 | var pgPass = require('pgpass')
|
15 | var TypeOverrides = require('./type-overrides')
|
16 |
|
17 | var ConnectionParameters = require('./connection-parameters')
|
18 | var Query = require('./query')
|
19 | var defaults = require('./defaults')
|
20 | var Connection = require('./connection')
|
21 |
|
22 | var Client = function (config) {
|
23 | EventEmitter.call(this)
|
24 |
|
25 | this.connectionParameters = new ConnectionParameters(config)
|
26 | this.user = this.connectionParameters.user
|
27 | this.database = this.connectionParameters.database
|
28 | this.port = this.connectionParameters.port
|
29 | this.host = this.connectionParameters.host
|
30 |
|
31 |
|
32 |
|
33 | Object.defineProperty(this, 'password', {
|
34 | configurable: true,
|
35 | enumerable: false,
|
36 | writable: true,
|
37 | value: this.connectionParameters.password,
|
38 | })
|
39 |
|
40 | this.replication = this.connectionParameters.replication
|
41 |
|
42 | var c = config || {}
|
43 |
|
44 | this._Promise = c.Promise || global.Promise
|
45 | this._types = new TypeOverrides(c.types)
|
46 | this._ending = false
|
47 | this._connecting = false
|
48 | this._connected = false
|
49 | this._connectionError = false
|
50 | this._queryable = true
|
51 |
|
52 | this.connection =
|
53 | c.connection ||
|
54 | new Connection({
|
55 | stream: c.stream,
|
56 | ssl: this.connectionParameters.ssl,
|
57 | keepAlive: c.keepAlive || false,
|
58 | keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
|
59 | encoding: this.connectionParameters.client_encoding || 'utf8',
|
60 | })
|
61 | this.queryQueue = []
|
62 | this.binary = c.binary || defaults.binary
|
63 | this.processID = null
|
64 | this.secretKey = null
|
65 | this.ssl = this.connectionParameters.ssl || false
|
66 | this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0
|
67 | }
|
68 |
|
69 | util.inherits(Client, EventEmitter)
|
70 |
|
71 | Client.prototype._errorAllQueries = function (err) {
|
72 | const enqueueError = (query) => {
|
73 | process.nextTick(() => {
|
74 | query.handleError(err, this.connection)
|
75 | })
|
76 | }
|
77 |
|
78 | if (this.activeQuery) {
|
79 | enqueueError(this.activeQuery)
|
80 | this.activeQuery = null
|
81 | }
|
82 |
|
83 | this.queryQueue.forEach(enqueueError)
|
84 | this.queryQueue.length = 0
|
85 | }
|
86 |
|
87 | Client.prototype._connect = function (callback) {
|
88 | var self = this
|
89 | var con = this.connection
|
90 | if (this._connecting || this._connected) {
|
91 | const err = new Error('Client has already been connected. You cannot reuse a client.')
|
92 | process.nextTick(() => {
|
93 | callback(err)
|
94 | })
|
95 | return
|
96 | }
|
97 | this._connecting = true
|
98 |
|
99 | var connectionTimeoutHandle
|
100 | if (this._connectionTimeoutMillis > 0) {
|
101 | connectionTimeoutHandle = setTimeout(() => {
|
102 | con._ending = true
|
103 | con.stream.destroy(new Error('timeout expired'))
|
104 | }, this._connectionTimeoutMillis)
|
105 | }
|
106 |
|
107 | if (this.host && this.host.indexOf('/') === 0) {
|
108 | con.connect(this.host + '/.s.PGSQL.' + this.port)
|
109 | } else {
|
110 | con.connect(this.port, this.host)
|
111 | }
|
112 |
|
113 |
|
114 | con.on('connect', function () {
|
115 | if (self.ssl) {
|
116 | con.requestSsl()
|
117 | } else {
|
118 | con.startup(self.getStartupConf())
|
119 | }
|
120 | })
|
121 |
|
122 | con.on('sslconnect', function () {
|
123 | con.startup(self.getStartupConf())
|
124 | })
|
125 |
|
126 | function checkPgPass(cb) {
|
127 | return function (msg) {
|
128 | if (typeof self.password === 'function') {
|
129 | self._Promise
|
130 | .resolve()
|
131 | .then(() => self.password())
|
132 | .then((pass) => {
|
133 | if (pass !== undefined) {
|
134 | if (typeof pass !== 'string') {
|
135 | con.emit('error', new TypeError('Password must be a string'))
|
136 | return
|
137 | }
|
138 | self.connectionParameters.password = self.password = pass
|
139 | } else {
|
140 | self.connectionParameters.password = self.password = null
|
141 | }
|
142 | cb(msg)
|
143 | })
|
144 | .catch((err) => {
|
145 | con.emit('error', err)
|
146 | })
|
147 | } else if (self.password !== null) {
|
148 | cb(msg)
|
149 | } else {
|
150 | pgPass(self.connectionParameters, function (pass) {
|
151 | if (undefined !== pass) {
|
152 | self.connectionParameters.password = self.password = pass
|
153 | }
|
154 | cb(msg)
|
155 | })
|
156 | }
|
157 | }
|
158 | }
|
159 |
|
160 |
|
161 | con.on(
|
162 | 'authenticationCleartextPassword',
|
163 | checkPgPass(function () {
|
164 | con.password(self.password)
|
165 | })
|
166 | )
|
167 |
|
168 |
|
169 | con.on(
|
170 | 'authenticationMD5Password',
|
171 | checkPgPass(function (msg) {
|
172 | con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt))
|
173 | })
|
174 | )
|
175 |
|
176 |
|
177 | var saslSession
|
178 | con.on(
|
179 | 'authenticationSASL',
|
180 | checkPgPass(function (msg) {
|
181 | saslSession = sasl.startSession(msg.mechanisms)
|
182 |
|
183 | con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response)
|
184 | })
|
185 | )
|
186 |
|
187 |
|
188 | con.on('authenticationSASLContinue', function (msg) {
|
189 | sasl.continueSession(saslSession, self.password, msg.data)
|
190 |
|
191 | con.sendSCRAMClientFinalMessage(saslSession.response)
|
192 | })
|
193 |
|
194 |
|
195 | con.on('authenticationSASLFinal', function (msg) {
|
196 | sasl.finalizeSession(saslSession, msg.data)
|
197 |
|
198 | saslSession = null
|
199 | })
|
200 |
|
201 | con.once('backendKeyData', function (msg) {
|
202 | self.processID = msg.processID
|
203 | self.secretKey = msg.secretKey
|
204 | })
|
205 |
|
206 | const connectingErrorHandler = (err) => {
|
207 | if (this._connectionError) {
|
208 | return
|
209 | }
|
210 | this._connectionError = true
|
211 | clearTimeout(connectionTimeoutHandle)
|
212 | if (callback) {
|
213 | return callback(err)
|
214 | }
|
215 | this.emit('error', err)
|
216 | }
|
217 |
|
218 | const connectedErrorHandler = (err) => {
|
219 | this._queryable = false
|
220 | this._errorAllQueries(err)
|
221 | this.emit('error', err)
|
222 | }
|
223 |
|
224 | const connectedErrorMessageHandler = (msg) => {
|
225 | const activeQuery = this.activeQuery
|
226 |
|
227 | if (!activeQuery) {
|
228 | connectedErrorHandler(msg)
|
229 | return
|
230 | }
|
231 |
|
232 | this.activeQuery = null
|
233 | activeQuery.handleError(msg, con)
|
234 | }
|
235 |
|
236 | con.on('error', connectingErrorHandler)
|
237 | con.on('errorMessage', connectingErrorHandler)
|
238 |
|
239 |
|
240 |
|
241 | con.once('readyForQuery', function () {
|
242 | self._connecting = false
|
243 | self._connected = true
|
244 | self._attachListeners(con)
|
245 | con.removeListener('error', connectingErrorHandler)
|
246 | con.removeListener('errorMessage', connectingErrorHandler)
|
247 | con.on('error', connectedErrorHandler)
|
248 | con.on('errorMessage', connectedErrorMessageHandler)
|
249 | clearTimeout(connectionTimeoutHandle)
|
250 |
|
251 |
|
252 | if (callback) {
|
253 | callback(null, self)
|
254 |
|
255 |
|
256 | callback = null
|
257 | }
|
258 | self.emit('connect')
|
259 | })
|
260 |
|
261 | con.on('readyForQuery', function () {
|
262 | var activeQuery = self.activeQuery
|
263 | self.activeQuery = null
|
264 | self.readyForQuery = true
|
265 | if (activeQuery) {
|
266 | activeQuery.handleReadyForQuery(con)
|
267 | }
|
268 | self._pulseQueryQueue()
|
269 | })
|
270 |
|
271 | con.once('end', () => {
|
272 | const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly')
|
273 |
|
274 | clearTimeout(connectionTimeoutHandle)
|
275 | this._errorAllQueries(error)
|
276 |
|
277 | if (!this._ending) {
|
278 |
|
279 |
|
280 |
|
281 |
|
282 | if (this._connecting && !this._connectionError) {
|
283 | if (callback) {
|
284 | callback(error)
|
285 | } else {
|
286 | connectedErrorHandler(error)
|
287 | }
|
288 | } else if (!this._connectionError) {
|
289 | connectedErrorHandler(error)
|
290 | }
|
291 | }
|
292 |
|
293 | process.nextTick(() => {
|
294 | this.emit('end')
|
295 | })
|
296 | })
|
297 |
|
298 | con.on('notice', function (msg) {
|
299 | self.emit('notice', msg)
|
300 | })
|
301 | }
|
302 |
|
303 | Client.prototype.connect = function (callback) {
|
304 | if (callback) {
|
305 | this._connect(callback)
|
306 | return
|
307 | }
|
308 |
|
309 | return new this._Promise((resolve, reject) => {
|
310 | this._connect((error) => {
|
311 | if (error) {
|
312 | reject(error)
|
313 | } else {
|
314 | resolve()
|
315 | }
|
316 | })
|
317 | })
|
318 | }
|
319 |
|
320 | Client.prototype._attachListeners = function (con) {
|
321 | const self = this
|
322 |
|
323 | con.on('rowDescription', function (msg) {
|
324 | self.activeQuery.handleRowDescription(msg)
|
325 | })
|
326 |
|
327 |
|
328 | con.on('dataRow', function (msg) {
|
329 | self.activeQuery.handleDataRow(msg)
|
330 | })
|
331 |
|
332 |
|
333 |
|
334 | con.on('portalSuspended', function (msg) {
|
335 | self.activeQuery.handlePortalSuspended(con)
|
336 | })
|
337 |
|
338 |
|
339 |
|
340 | con.on('emptyQuery', function (msg) {
|
341 | self.activeQuery.handleEmptyQuery(con)
|
342 | })
|
343 |
|
344 |
|
345 | con.on('commandComplete', function (msg) {
|
346 | self.activeQuery.handleCommandComplete(msg, con)
|
347 | })
|
348 |
|
349 |
|
350 |
|
351 |
|
352 |
|
353 | con.on('parseComplete', function (msg) {
|
354 | if (self.activeQuery.name) {
|
355 | con.parsedStatements[self.activeQuery.name] = self.activeQuery.text
|
356 | }
|
357 | })
|
358 |
|
359 |
|
360 | con.on('copyInResponse', function (msg) {
|
361 | self.activeQuery.handleCopyInResponse(self.connection)
|
362 | })
|
363 |
|
364 | con.on('copyData', function (msg) {
|
365 | self.activeQuery.handleCopyData(msg, self.connection)
|
366 | })
|
367 |
|
368 | con.on('notification', function (msg) {
|
369 | self.emit('notification', msg)
|
370 | })
|
371 | }
|
372 |
|
373 | Client.prototype.getStartupConf = function () {
|
374 | var params = this.connectionParameters
|
375 |
|
376 | var data = {
|
377 | user: params.user,
|
378 | database: params.database,
|
379 | }
|
380 |
|
381 | var appName = params.application_name || params.fallback_application_name
|
382 | if (appName) {
|
383 | data.application_name = appName
|
384 | }
|
385 | if (params.replication) {
|
386 | data.replication = '' + params.replication
|
387 | }
|
388 | if (params.statement_timeout) {
|
389 | data.statement_timeout = String(parseInt(params.statement_timeout, 10))
|
390 | }
|
391 | if (params.idle_in_transaction_session_timeout) {
|
392 | data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10))
|
393 | }
|
394 |
|
395 | return data
|
396 | }
|
397 |
|
398 | Client.prototype.cancel = function (client, query) {
|
399 | if (client.activeQuery === query) {
|
400 | var con = this.connection
|
401 |
|
402 | if (this.host && this.host.indexOf('/') === 0) {
|
403 | con.connect(this.host + '/.s.PGSQL.' + this.port)
|
404 | } else {
|
405 | con.connect(this.port, this.host)
|
406 | }
|
407 |
|
408 |
|
409 | con.on('connect', function () {
|
410 | con.cancel(client.processID, client.secretKey)
|
411 | })
|
412 | } else if (client.queryQueue.indexOf(query) !== -1) {
|
413 | client.queryQueue.splice(client.queryQueue.indexOf(query), 1)
|
414 | }
|
415 | }
|
416 |
|
417 | Client.prototype.setTypeParser = function (oid, format, parseFn) {
|
418 | return this._types.setTypeParser(oid, format, parseFn)
|
419 | }
|
420 |
|
421 | Client.prototype.getTypeParser = function (oid, format) {
|
422 | return this._types.getTypeParser(oid, format)
|
423 | }
|
424 |
|
425 |
|
426 | Client.prototype.escapeIdentifier = function (str) {
|
427 | return '"' + str.replace(/"/g, '""') + '"'
|
428 | }
|
429 |
|
430 |
|
431 | Client.prototype.escapeLiteral = function (str) {
|
432 | var hasBackslash = false
|
433 | var escaped = "'"
|
434 |
|
435 | for (var i = 0; i < str.length; i++) {
|
436 | var c = str[i]
|
437 | if (c === "'") {
|
438 | escaped += c + c
|
439 | } else if (c === '\\') {
|
440 | escaped += c + c
|
441 | hasBackslash = true
|
442 | } else {
|
443 | escaped += c
|
444 | }
|
445 | }
|
446 |
|
447 | escaped += "'"
|
448 |
|
449 | if (hasBackslash === true) {
|
450 | escaped = ' E' + escaped
|
451 | }
|
452 |
|
453 | return escaped
|
454 | }
|
455 |
|
456 | Client.prototype._pulseQueryQueue = function () {
|
457 | if (this.readyForQuery === true) {
|
458 | this.activeQuery = this.queryQueue.shift()
|
459 | if (this.activeQuery) {
|
460 | this.readyForQuery = false
|
461 | this.hasExecuted = true
|
462 |
|
463 | const queryError = this.activeQuery.submit(this.connection)
|
464 | if (queryError) {
|
465 | process.nextTick(() => {
|
466 | this.activeQuery.handleError(queryError, this.connection)
|
467 | this.readyForQuery = true
|
468 | this._pulseQueryQueue()
|
469 | })
|
470 | }
|
471 | } else if (this.hasExecuted) {
|
472 | this.activeQuery = null
|
473 | this.emit('drain')
|
474 | }
|
475 | }
|
476 | }
|
477 |
|
478 | Client.prototype.query = function (config, values, callback) {
|
479 |
|
480 | var query
|
481 | var result
|
482 | var readTimeout
|
483 | var readTimeoutTimer
|
484 | var queryCallback
|
485 |
|
486 | if (config === null || config === undefined) {
|
487 | throw new TypeError('Client was passed a null or undefined query')
|
488 | } else if (typeof config.submit === 'function') {
|
489 | readTimeout = config.query_timeout || this.connectionParameters.query_timeout
|
490 | result = query = config
|
491 | if (typeof values === 'function') {
|
492 | query.callback = query.callback || values
|
493 | }
|
494 | } else {
|
495 | readTimeout = this.connectionParameters.query_timeout
|
496 | query = new Query(config, values, callback)
|
497 | if (!query.callback) {
|
498 | result = new this._Promise((resolve, reject) => {
|
499 | query.callback = (err, res) => (err ? reject(err) : resolve(res))
|
500 | })
|
501 | }
|
502 | }
|
503 |
|
504 | if (readTimeout) {
|
505 | queryCallback = query.callback
|
506 |
|
507 | readTimeoutTimer = setTimeout(() => {
|
508 | var error = new Error('Query read timeout')
|
509 |
|
510 | process.nextTick(() => {
|
511 | query.handleError(error, this.connection)
|
512 | })
|
513 |
|
514 | queryCallback(error)
|
515 |
|
516 |
|
517 |
|
518 | query.callback = () => {}
|
519 |
|
520 |
|
521 | var index = this.queryQueue.indexOf(query)
|
522 | if (index > -1) {
|
523 | this.queryQueue.splice(index, 1)
|
524 | }
|
525 |
|
526 | this._pulseQueryQueue()
|
527 | }, readTimeout)
|
528 |
|
529 | query.callback = (err, res) => {
|
530 | clearTimeout(readTimeoutTimer)
|
531 | queryCallback(err, res)
|
532 | }
|
533 | }
|
534 |
|
535 | if (this.binary && !query.binary) {
|
536 | query.binary = true
|
537 | }
|
538 |
|
539 | if (query._result && !query._result._types) {
|
540 | query._result._types = this._types
|
541 | }
|
542 |
|
543 | if (!this._queryable) {
|
544 | process.nextTick(() => {
|
545 | query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
|
546 | })
|
547 | return result
|
548 | }
|
549 |
|
550 | if (this._ending) {
|
551 | process.nextTick(() => {
|
552 | query.handleError(new Error('Client was closed and is not queryable'), this.connection)
|
553 | })
|
554 | return result
|
555 | }
|
556 |
|
557 | this.queryQueue.push(query)
|
558 | this._pulseQueryQueue()
|
559 | return result
|
560 | }
|
561 |
|
562 | Client.prototype.end = function (cb) {
|
563 | this._ending = true
|
564 |
|
565 |
|
566 | if (!this.connection._connecting) {
|
567 | if (cb) {
|
568 | cb()
|
569 | } else {
|
570 | return this._Promise.resolve()
|
571 | }
|
572 | }
|
573 |
|
574 | if (this.activeQuery || !this._queryable) {
|
575 |
|
576 |
|
577 | this.connection.stream.destroy()
|
578 | } else {
|
579 | this.connection.end()
|
580 | }
|
581 |
|
582 | if (cb) {
|
583 | this.connection.once('end', cb)
|
584 | } else {
|
585 | return new this._Promise((resolve) => {
|
586 | this.connection.once('end', resolve)
|
587 | })
|
588 | }
|
589 | }
|
590 |
|
591 |
|
592 | Client.Query = Query
|
593 |
|
594 | module.exports = Client
|