1 | // Copyright (c) 2015, 2023, Oracle and/or its affiliates.
|
2 |
|
3 | //-----------------------------------------------------------------------------
|
4 | //
|
5 | // This software is dual-licensed to you under the Universal Permissive License
|
6 | // (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
|
7 | // 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
|
8 | // either license.
|
9 | //
|
10 | // If you elect to accept the software under the Apache License, Version 2.0,
|
11 | // the following applies:
|
12 | //
|
13 | // Licensed under the Apache License, Version 2.0 (the "License");
|
14 | // you may not use this file except in compliance with the License.
|
15 | // You may obtain a copy of the License at
|
16 | //
|
17 | // https://www.apache.org/licenses/LICENSE-2.0
|
18 | //
|
19 | // Unless required by applicable law or agreed to in writing, software
|
20 | // distributed under the License is distributed on an "AS IS" BASIS,
|
21 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
22 | // See the License for the specific language governing permissions and
|
23 | // limitations under the License.
|
24 | //
|
25 | //-----------------------------------------------------------------------------
|
26 |
|
27 | ;
|
28 |
|
29 | const process = require('process');
|
30 | const { Readable } = require('stream');
|
31 |
|
32 | class QueryStream extends Readable {
|
33 |
|
34 | constructor(rs) {
|
35 | super({ objectMode: true });
|
36 | this._fetching = false;
|
37 | this._numRows = 0;
|
38 |
|
39 | // calling open via process.nextTick to allow event handlers to be
|
40 | // registered prior to the events being emitted
|
41 | if (rs) {
|
42 | process.nextTick(() => {
|
43 | this._open(rs);
|
44 | });
|
45 | }
|
46 | }
|
47 |
|
48 | // called by readable.destroy() and ensures that the result set is closed if
|
49 | // it has not already been closed (never called directly)
|
50 | async _destroy(err, cb) {
|
51 | if (this._resultSet) {
|
52 | const rs = this._resultSet;
|
53 | this._resultSet = null;
|
54 | if (this._fetching) {
|
55 | await new Promise(resolve =>
|
56 | this.once('_doneFetching', resolve));
|
57 | }
|
58 | try {
|
59 | await rs._impl.close();
|
60 | } catch (closeErr) {
|
61 | cb(closeErr);
|
62 | return;
|
63 | }
|
64 | }
|
65 | cb(err);
|
66 | }
|
67 |
|
68 | // called when the query stream is to be associated with a result set; this
|
69 | // takes place when the query stream if constructed (if a result set is known
|
70 | // at that point) or by Connection.execute() when the result set is ready
|
71 | _open(rs) {
|
72 | this._resultSet = rs;
|
73 |
|
74 | // trigger the event listener that may have been added in _read() now that
|
75 | // the result set is ready
|
76 | this.emit('open');
|
77 |
|
78 | // emit a metadata event as a convenience to users
|
79 | this.emit('metadata', rs.metaData);
|
80 | }
|
81 |
|
82 | // called by readable.read() and pushes rows to the internal queue maintained
|
83 | // by the stream implementation (never called directly) appropriate
|
84 | async _read() {
|
85 |
|
86 | // still waiting on the result set to be added via _open() so add an event
|
87 | // listener to retry when ready
|
88 | if (!this._resultSet) {
|
89 | this.once('open', this._read);
|
90 | return;
|
91 | }
|
92 |
|
93 | // using the JS getRow() to leverage the JS row cache; the result set's
|
94 | // _allowGetRowCall is set to true to allow the call for query streams
|
95 | // created via ResultSet.toQueryStream()
|
96 | try {
|
97 | this._fetching = true;
|
98 | this._resultSet._allowGetRowCall = true;
|
99 | const row = await this._resultSet.getRow();
|
100 | if (row) {
|
101 | this.push(row);
|
102 | } else {
|
103 | this.push(null);
|
104 | }
|
105 | } catch (err) {
|
106 | this.destroy(err);
|
107 | } finally {
|
108 | this._fetching = false;
|
109 | if (this._resultSet) {
|
110 | this._resultSet._allowGetRowCall = false;
|
111 | } else {
|
112 | this.emit('_doneFetching');
|
113 | }
|
114 | }
|
115 | }
|
116 |
|
117 | }
|
118 |
|
119 | module.exports = QueryStream;
|