1 | const Bluebird = require('bluebird');
|
2 |
|
3 | let PassThrough;
|
4 |
|
5 |
|
6 |
|
7 |
|
8 | function Runner(client, builder) {
|
9 | this.client = client;
|
10 | this.builder = builder;
|
11 | this.queries = [];
|
12 |
|
13 |
|
14 |
|
15 | this.connection = void 0;
|
16 | }
|
17 |
|
18 | Object.assign(Runner.prototype, {
|
19 |
|
20 |
|
21 |
|
22 | run() {
|
23 | const runner = this;
|
24 | return (
|
25 | Bluebird.using(this.ensureConnection(), function(connection) {
|
26 | runner.connection = connection;
|
27 |
|
28 | runner.client.emit('start', runner.builder);
|
29 | runner.builder.emit('start', runner.builder);
|
30 | const sql = runner.builder.toSQL();
|
31 |
|
32 | if (runner.builder._debug) {
|
33 | runner.client.logger.debug(sql);
|
34 | }
|
35 |
|
36 | if (Array.isArray(sql)) {
|
37 | return runner.queryArray(sql);
|
38 | }
|
39 | return runner.query(sql);
|
40 | })
|
41 |
|
42 |
|
43 |
|
44 |
|
45 | .catch(function(err) {
|
46 | if (runner.builder._events && runner.builder._events.error) {
|
47 | runner.builder.emit('error', err);
|
48 | }
|
49 | throw err;
|
50 | })
|
51 |
|
52 |
|
53 |
|
54 | .then(function(res) {
|
55 | runner.builder.emit('end');
|
56 | return res;
|
57 | })
|
58 | );
|
59 | },
|
60 |
|
61 |
|
62 |
|
63 | stream(options, handler) {
|
64 |
|
65 | if (arguments.length === 1) {
|
66 | if (typeof options === 'function') {
|
67 | handler = options;
|
68 | options = {};
|
69 | }
|
70 | }
|
71 |
|
72 |
|
73 | const hasHandler = typeof handler === 'function';
|
74 |
|
75 |
|
76 | PassThrough = PassThrough || require('stream').PassThrough;
|
77 |
|
78 | const runner = this;
|
79 | const stream = new PassThrough({ objectMode: true });
|
80 |
|
81 | let hasConnection = false;
|
82 | const promise = Bluebird.using(this.ensureConnection(), function(
|
83 | connection
|
84 | ) {
|
85 | hasConnection = true;
|
86 | runner.connection = connection;
|
87 | try {
|
88 | const sql = runner.builder.toSQL();
|
89 |
|
90 | if (Array.isArray(sql) && hasHandler) {
|
91 | throw new Error(
|
92 | 'The stream may only be used with a single query statement.'
|
93 | );
|
94 | }
|
95 |
|
96 | return runner.client.stream(runner.connection, sql, stream, options);
|
97 | } catch (e) {
|
98 | stream.emit('error', e);
|
99 | throw e;
|
100 | }
|
101 | });
|
102 |
|
103 |
|
104 |
|
105 |
|
106 | if (hasHandler) {
|
107 | handler(stream);
|
108 | return promise;
|
109 | }
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 | promise.catch(function(err) {
|
116 | if (!hasConnection) stream.emit('error', err);
|
117 | });
|
118 | return stream;
|
119 | },
|
120 |
|
121 |
|
122 | pipe(writable, options) {
|
123 | return this.stream(options).pipe(writable);
|
124 | },
|
125 |
|
126 |
|
127 |
|
128 |
|
129 | query: async function(obj) {
|
130 | const { __knexUid, __knexTxId } = this.connection;
|
131 |
|
132 | this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
|
133 |
|
134 | const runner = this;
|
135 | let queryPromise = this.client.query(this.connection, obj);
|
136 |
|
137 | if (obj.timeout) {
|
138 | queryPromise = queryPromise.timeout(obj.timeout);
|
139 | }
|
140 |
|
141 |
|
142 |
|
143 |
|
144 | return queryPromise
|
145 | .then((resp) => this.client.processResponse(resp, runner))
|
146 | .then((processedResponse) => {
|
147 | const queryContext = this.builder.queryContext();
|
148 | const postProcessedResponse = this.client.postProcessResponse(
|
149 | processedResponse,
|
150 | queryContext
|
151 | );
|
152 |
|
153 | this.builder.emit(
|
154 | 'query-response',
|
155 | postProcessedResponse,
|
156 | Object.assign({ __knexUid: this.connection.__knexUid }, obj),
|
157 | this.builder
|
158 | );
|
159 |
|
160 | this.client.emit(
|
161 | 'query-response',
|
162 | postProcessedResponse,
|
163 | Object.assign({ __knexUid: this.connection.__knexUid }, obj),
|
164 | this.builder
|
165 | );
|
166 |
|
167 | return postProcessedResponse;
|
168 | })
|
169 | .catch(Bluebird.TimeoutError, (error) => {
|
170 | const { timeout, sql, bindings } = obj;
|
171 |
|
172 | let cancelQuery;
|
173 | if (obj.cancelOnTimeout) {
|
174 | cancelQuery = this.client.cancelQuery(this.connection);
|
175 | } else {
|
176 |
|
177 |
|
178 |
|
179 |
|
180 | this.connection.__knex__disposed = error;
|
181 | cancelQuery = Bluebird.resolve();
|
182 | }
|
183 |
|
184 | return cancelQuery
|
185 | .catch((cancelError) => {
|
186 |
|
187 |
|
188 |
|
189 |
|
190 | this.connection.__knex__disposed = error;
|
191 |
|
192 |
|
193 | throw Object.assign(cancelError, {
|
194 | message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
|
195 | sql,
|
196 | bindings,
|
197 | timeout,
|
198 | });
|
199 | })
|
200 | .then(() => {
|
201 |
|
202 | throw Object.assign(error, {
|
203 | message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
|
204 | sql,
|
205 | bindings,
|
206 | timeout,
|
207 | });
|
208 | });
|
209 | })
|
210 | .catch((error) => {
|
211 | this.builder.emit(
|
212 | 'query-error',
|
213 | error,
|
214 | Object.assign({ __knexUid: this.connection.__knexUid }, obj)
|
215 | );
|
216 | throw error;
|
217 | });
|
218 | },
|
219 |
|
220 |
|
221 |
|
222 | queryArray(queries) {
|
223 | return queries.length === 1
|
224 | ? this.query(queries[0])
|
225 | : Bluebird.bind(this)
|
226 | .return(queries)
|
227 | .reduce(function(memo, query) {
|
228 | return this.query(query).then(function(resp) {
|
229 | memo.push(resp);
|
230 | return memo;
|
231 | });
|
232 | }, []);
|
233 | },
|
234 |
|
235 |
|
236 | ensureConnection() {
|
237 |
|
238 | if (this.builder._connection) {
|
239 | return Bluebird.resolve(this.builder._connection);
|
240 | }
|
241 |
|
242 | if (this.connection) {
|
243 | return Bluebird.resolve(this.connection);
|
244 | }
|
245 | return this.client
|
246 | .acquireConnection()
|
247 | .catch(Bluebird.TimeoutError, (error) => {
|
248 | if (this.builder) {
|
249 | error.sql = this.builder.sql;
|
250 | error.bindings = this.builder.bindings;
|
251 | }
|
252 | throw error;
|
253 | })
|
254 | .disposer(() => {
|
255 |
|
256 | return this.client.releaseConnection(this.connection);
|
257 | });
|
258 | },
|
259 | });
|
260 |
|
261 | module.exports = Runner;
|