UNPKG

3.69 kBJavaScriptView Raw
1// Copyright (c) 2015, 2023, Oracle and/or its affiliates.
2
3//-----------------------------------------------------------------------------
4//
5// This software is dual-licensed to you under the Universal Permissive License
6// (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
7// 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
8// either license.
9//
10// If you elect to accept the software under the Apache License, Version 2.0,
11// the following applies:
12//
13// Licensed under the Apache License, Version 2.0 (the "License");
14// you may not use this file except in compliance with the License.
15// You may obtain a copy of the License at
16//
17// https://www.apache.org/licenses/LICENSE-2.0
18//
19// Unless required by applicable law or agreed to in writing, software
20// distributed under the License is distributed on an "AS IS" BASIS,
21// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22// See the License for the specific language governing permissions and
23// limitations under the License.
24//
25//-----------------------------------------------------------------------------
26
27'use strict';
28
29const process = require('process');
30const { Readable } = require('stream');
31
32class QueryStream extends Readable {
33
34 constructor(rs) {
35 super({ objectMode: true });
36 this._fetching = false;
37 this._numRows = 0;
38
39 // calling open via process.nextTick to allow event handlers to be
40 // registered prior to the events being emitted
41 if (rs) {
42 process.nextTick(() => {
43 this._open(rs);
44 });
45 }
46 }
47
48 // called by readable.destroy() and ensures that the result set is closed if
49 // it has not already been closed (never called directly)
50 async _destroy(err, cb) {
51 if (this._resultSet) {
52 const rs = this._resultSet;
53 this._resultSet = null;
54 if (this._fetching) {
55 await new Promise(resolve =>
56 this.once('_doneFetching', resolve));
57 }
58 try {
59 await rs._impl.close();
60 } catch (closeErr) {
61 cb(closeErr);
62 return;
63 }
64 }
65 cb(err);
66 }
67
68 // called when the query stream is to be associated with a result set; this
69 // takes place when the query stream if constructed (if a result set is known
70 // at that point) or by Connection.execute() when the result set is ready
71 _open(rs) {
72 this._resultSet = rs;
73
74 // trigger the event listener that may have been added in _read() now that
75 // the result set is ready
76 this.emit('open');
77
78 // emit a metadata event as a convenience to users
79 this.emit('metadata', rs.metaData);
80 }
81
82 // called by readable.read() and pushes rows to the internal queue maintained
83 // by the stream implementation (never called directly) appropriate
84 async _read() {
85
86 // still waiting on the result set to be added via _open() so add an event
87 // listener to retry when ready
88 if (!this._resultSet) {
89 this.once('open', this._read);
90 return;
91 }
92
93 // using the JS getRow() to leverage the JS row cache; the result set's
94 // _allowGetRowCall is set to true to allow the call for query streams
95 // created via ResultSet.toQueryStream()
96 try {
97 this._fetching = true;
98 this._resultSet._allowGetRowCall = true;
99 const row = await this._resultSet.getRow();
100 if (row) {
101 this.push(row);
102 } else {
103 this.push(null);
104 }
105 } catch (err) {
106 this.destroy(err);
107 } finally {
108 this._fetching = false;
109 if (this._resultSet) {
110 this._resultSet._allowGetRowCall = false;
111 } else {
112 this.emit('_doneFetching');
113 }
114 }
115 }
116
117}
118
119module.exports = QueryStream;