1 | const Bluebird = require('bluebird');
|
2 |
|
3 | let PassThrough;
|
4 |
|
5 |
|
6 |
|
7 |
|
8 | function Runner(client, builder) {
|
9 | this.client = client;
|
10 | this.builder = builder;
|
11 | this.queries = [];
|
12 |
|
13 |
|
14 |
|
15 | this.connection = void 0;
|
16 | }
|
17 |
|
18 | Object.assign(Runner.prototype, {
|
19 |
|
20 |
|
21 |
|
22 | run() {
|
23 | const runner = this;
|
24 | return (
|
25 | this.ensureConnection(function(connection) {
|
26 | runner.connection = connection;
|
27 |
|
28 | runner.client.emit('start', runner.builder);
|
29 | runner.builder.emit('start', runner.builder);
|
30 | const sql = runner.builder.toSQL();
|
31 |
|
32 | if (runner.builder._debug) {
|
33 | runner.client.logger.debug(sql);
|
34 | }
|
35 |
|
36 | if (Array.isArray(sql)) {
|
37 | return runner.queryArray(sql);
|
38 | }
|
39 | return runner.query(sql);
|
40 | })
|
41 |
|
42 |
|
43 |
|
44 |
|
45 | .catch(function(err) {
|
46 | if (runner.builder._events && runner.builder._events.error) {
|
47 | runner.builder.emit('error', err);
|
48 | }
|
49 | throw err;
|
50 | })
|
51 |
|
52 |
|
53 |
|
54 | .then(function(res) {
|
55 | runner.builder.emit('end');
|
56 | return res;
|
57 | })
|
58 | );
|
59 | },
|
60 |
|
61 |
|
62 |
|
63 | stream(options, handler) {
|
64 |
|
65 | if (arguments.length === 1) {
|
66 | if (typeof options === 'function') {
|
67 | handler = options;
|
68 | options = {};
|
69 | }
|
70 | }
|
71 |
|
72 |
|
73 | const hasHandler = typeof handler === 'function';
|
74 |
|
75 |
|
76 | PassThrough = PassThrough || require('stream').PassThrough;
|
77 |
|
78 | const runner = this;
|
79 | const stream = new PassThrough({ objectMode: true });
|
80 |
|
81 | let hasConnection = false;
|
82 | const promise = this.ensureConnection(function(connection) {
|
83 | hasConnection = true;
|
84 | runner.connection = connection;
|
85 | try {
|
86 | const sql = runner.builder.toSQL();
|
87 |
|
88 | if (Array.isArray(sql) && hasHandler) {
|
89 | throw new Error(
|
90 | 'The stream may only be used with a single query statement.'
|
91 | );
|
92 | }
|
93 |
|
94 | return runner.client.stream(runner.connection, sql, stream, options);
|
95 | } catch (e) {
|
96 | stream.emit('error', e);
|
97 | throw e;
|
98 | }
|
99 | });
|
100 |
|
101 |
|
102 |
|
103 |
|
104 | if (hasHandler) {
|
105 | handler(stream);
|
106 | return Bluebird.resolve(promise);
|
107 | }
|
108 |
|
109 |
|
110 |
|
111 |
|
112 |
|
113 | promise.catch(function(err) {
|
114 | if (!hasConnection) stream.emit('error', err);
|
115 | });
|
116 | return stream;
|
117 | },
|
118 |
|
119 |
|
120 | pipe(writable, options) {
|
121 | return this.stream(options).pipe(writable);
|
122 | },
|
123 |
|
124 |
|
125 |
|
126 |
|
127 | query: async function(obj) {
|
128 | const { __knexUid, __knexTxId } = this.connection;
|
129 |
|
130 | this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
|
131 |
|
132 | const runner = this;
|
133 | let queryPromise = this.client.query(this.connection, obj);
|
134 |
|
135 | if (obj.timeout) {
|
136 | queryPromise = queryPromise.timeout(obj.timeout);
|
137 | }
|
138 |
|
139 |
|
140 |
|
141 |
|
142 | return queryPromise
|
143 | .then((resp) => this.client.processResponse(resp, runner))
|
144 | .then((processedResponse) => {
|
145 | const queryContext = this.builder.queryContext();
|
146 | const postProcessedResponse = this.client.postProcessResponse(
|
147 | processedResponse,
|
148 | queryContext
|
149 | );
|
150 |
|
151 | this.builder.emit(
|
152 | 'query-response',
|
153 | postProcessedResponse,
|
154 | Object.assign({ __knexUid: this.connection.__knexUid }, obj),
|
155 | this.builder
|
156 | );
|
157 |
|
158 | this.client.emit(
|
159 | 'query-response',
|
160 | postProcessedResponse,
|
161 | Object.assign({ __knexUid: this.connection.__knexUid }, obj),
|
162 | this.builder
|
163 | );
|
164 |
|
165 | return postProcessedResponse;
|
166 | })
|
167 | .catch(Bluebird.TimeoutError, (error) => {
|
168 | const { timeout, sql, bindings } = obj;
|
169 |
|
170 | let cancelQuery;
|
171 | if (obj.cancelOnTimeout) {
|
172 | cancelQuery = this.client.cancelQuery(this.connection);
|
173 | } else {
|
174 |
|
175 |
|
176 |
|
177 |
|
178 | this.connection.__knex__disposed = error;
|
179 | cancelQuery = Bluebird.resolve();
|
180 | }
|
181 |
|
182 | return cancelQuery
|
183 | .catch((cancelError) => {
|
184 |
|
185 |
|
186 |
|
187 |
|
188 | this.connection.__knex__disposed = error;
|
189 |
|
190 |
|
191 | throw Object.assign(cancelError, {
|
192 | message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
|
193 | sql,
|
194 | bindings,
|
195 | timeout,
|
196 | });
|
197 | })
|
198 | .then(() => {
|
199 |
|
200 | throw Object.assign(error, {
|
201 | message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
|
202 | sql,
|
203 | bindings,
|
204 | timeout,
|
205 | });
|
206 | });
|
207 | })
|
208 | .catch((error) => {
|
209 | this.builder.emit(
|
210 | 'query-error',
|
211 | error,
|
212 | Object.assign({ __knexUid: this.connection.__knexUid }, obj)
|
213 | );
|
214 | throw error;
|
215 | });
|
216 | },
|
217 |
|
218 |
|
219 |
|
220 | async queryArray(queries) {
|
221 | if (queries.length === 1) {
|
222 | return this.query(queries[0]);
|
223 | }
|
224 |
|
225 | const results = [];
|
226 | for (const query of queries) {
|
227 | results.push(await this.query(query));
|
228 | }
|
229 | return results;
|
230 | },
|
231 |
|
232 |
|
233 | async ensureConnection(cb) {
|
234 |
|
235 | if (this.builder._connection) {
|
236 | return cb(this.builder._connection);
|
237 | }
|
238 |
|
239 | if (this.connection) {
|
240 | return cb(this.connection);
|
241 | }
|
242 | return this.client
|
243 | .acquireConnection()
|
244 | .catch(Bluebird.TimeoutError, (error) => {
|
245 | if (this.builder) {
|
246 | error.sql = this.builder.sql;
|
247 | error.bindings = this.builder.bindings;
|
248 | }
|
249 | throw error;
|
250 | })
|
251 | .then(async (connection) => {
|
252 | try {
|
253 | return await cb(connection);
|
254 | } finally {
|
255 | await this.client.releaseConnection(this.connection);
|
256 | }
|
257 | });
|
258 | },
|
259 | });
|
260 |
|
261 | module.exports = Runner;
|