UNPKG

8.44 kBJavaScriptView Raw
1const Bluebird = require('bluebird');
2
3let PassThrough;
4
5// The "Runner" constructor takes a "builder" (query, schema, or raw)
6// and runs through each of the query statements, calling any additional
7// "output" method provided alongside the query and bindings.
8function Runner(client, builder) {
9 this.client = client;
10 this.builder = builder;
11 this.queries = [];
12
13 // The "connection" object is set on the runner when
14 // "run" is called.
15 this.connection = void 0;
16}
17
18Object.assign(Runner.prototype, {
19 // "Run" the target, calling "toSQL" on the builder, returning
20 // an object or array of queries to run, each of which are run on
21 // a single connection.
22 run() {
23 const runner = this;
24 return (
25 this.ensureConnection(function(connection) {
26 runner.connection = connection;
27
28 runner.client.emit('start', runner.builder);
29 runner.builder.emit('start', runner.builder);
30 const sql = runner.builder.toSQL();
31
32 if (runner.builder._debug) {
33 runner.client.logger.debug(sql);
34 }
35
36 if (Array.isArray(sql)) {
37 return runner.queryArray(sql);
38 }
39 return runner.query(sql);
40 })
41
42 // If there are any "error" listeners, we fire an error event
43 // and then re-throw the error to be eventually handled by
44 // the promise chain. Useful if you're wrapping in a custom `Promise`.
45 .catch(function(err) {
46 if (runner.builder._events && runner.builder._events.error) {
47 runner.builder.emit('error', err);
48 }
49 throw err;
50 })
51
52 // Fire a single "end" event on the builder when
53 // all queries have successfully completed.
54 .then(function(res) {
55 runner.builder.emit('end');
56 return res;
57 })
58 );
59 },
60
61 // Stream the result set, by passing through to the dialect's streaming
62 // capabilities. If the options are
63 stream(options, handler) {
64 // If we specify stream(handler).then(...
65 if (arguments.length === 1) {
66 if (typeof options === 'function') {
67 handler = options;
68 options = {};
69 }
70 }
71
72 // Determines whether we emit an error or throw here.
73 const hasHandler = typeof handler === 'function';
74
75 // Lazy-load the "PassThrough" dependency.
76 PassThrough = PassThrough || require('stream').PassThrough;
77
78 const runner = this;
79 const stream = new PassThrough({ objectMode: true });
80
81 let hasConnection = false;
82 const promise = this.ensureConnection(function(connection) {
83 hasConnection = true;
84 runner.connection = connection;
85 try {
86 const sql = runner.builder.toSQL();
87
88 if (Array.isArray(sql) && hasHandler) {
89 throw new Error(
90 'The stream may only be used with a single query statement.'
91 );
92 }
93
94 return runner.client.stream(runner.connection, sql, stream, options);
95 } catch (e) {
96 stream.emit('error', e);
97 throw e;
98 }
99 });
100
101 // If a function is passed to handle the stream, send the stream
102 // there and return the promise, otherwise just return the stream
103 // and the promise will take care of itself.
104 if (hasHandler) {
105 handler(stream);
106 return Bluebird.resolve(promise);
107 }
108
109 // Emit errors on the stream if the error occurred before a connection
110 // could be acquired.
111 // If the connection was acquired, assume the error occurred in the client
112 // code and has already been emitted on the stream. Don't emit it twice.
113 promise.catch(function(err) {
114 if (!hasConnection) stream.emit('error', err);
115 });
116 return stream;
117 },
118
119 // Allow you to pipe the stream to a writable stream.
120 pipe(writable, options) {
121 return this.stream(options).pipe(writable);
122 },
123
124 // "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
125 // to run in sequence, and on the same connection, especially helpful when schema building
126 // and dealing with foreign key constraints, etc.
127 query: async function(obj) {
128 const { __knexUid, __knexTxId } = this.connection;
129
130 this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
131
132 const runner = this;
133 let queryPromise = this.client.query(this.connection, obj);
134
135 if (obj.timeout) {
136 queryPromise = queryPromise.timeout(obj.timeout);
137 }
138
139 // Await the return value of client.processResponse; in the case of sqlite3's
140 // dropColumn()/renameColumn(), it will be a Promise for the transaction
141 // containing the complete rename procedure.
142 return queryPromise
143 .then((resp) => this.client.processResponse(resp, runner))
144 .then((processedResponse) => {
145 const queryContext = this.builder.queryContext();
146 const postProcessedResponse = this.client.postProcessResponse(
147 processedResponse,
148 queryContext
149 );
150
151 this.builder.emit(
152 'query-response',
153 postProcessedResponse,
154 Object.assign({ __knexUid: this.connection.__knexUid }, obj),
155 this.builder
156 );
157
158 this.client.emit(
159 'query-response',
160 postProcessedResponse,
161 Object.assign({ __knexUid: this.connection.__knexUid }, obj),
162 this.builder
163 );
164
165 return postProcessedResponse;
166 })
167 .catch(Bluebird.TimeoutError, (error) => {
168 const { timeout, sql, bindings } = obj;
169
170 let cancelQuery;
171 if (obj.cancelOnTimeout) {
172 cancelQuery = this.client.cancelQuery(this.connection);
173 } else {
174 // If we don't cancel the query, we need to mark the connection as disposed so that
175 // it gets destroyed by the pool and is never used again. If we don't do this and
176 // return the connection to the pool, it will be useless until the current operation
177 // that timed out, finally finishes.
178 this.connection.__knex__disposed = error;
179 cancelQuery = Bluebird.resolve();
180 }
181
182 return cancelQuery
183 .catch((cancelError) => {
184 // If the cancellation failed, we need to mark the connection as disposed so that
185 // it gets destroyed by the pool and is never used again. If we don't do this and
186 // return the connection to the pool, it will be useless until the current operation
187 // that timed out, finally finishes.
188 this.connection.__knex__disposed = error;
189
190 // cancellation failed
191 throw Object.assign(cancelError, {
192 message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
193 sql,
194 bindings,
195 timeout,
196 });
197 })
198 .then(() => {
199 // cancellation succeeded, rethrow timeout error
200 throw Object.assign(error, {
201 message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
202 sql,
203 bindings,
204 timeout,
205 });
206 });
207 })
208 .catch((error) => {
209 this.builder.emit(
210 'query-error',
211 error,
212 Object.assign({ __knexUid: this.connection.__knexUid }, obj)
213 );
214 throw error;
215 });
216 },
217
218 // In the case of the "schema builder" we call `queryArray`, which runs each
219 // of the queries in sequence.
220 async queryArray(queries) {
221 if (queries.length === 1) {
222 return this.query(queries[0]);
223 }
224
225 const results = [];
226 for (const query of queries) {
227 results.push(await this.query(query));
228 }
229 return results;
230 },
231
232 // Check whether there's a transaction flag, and that it has a connection.
233 async ensureConnection(cb) {
234 // Use override from a builder if passed
235 if (this.builder._connection) {
236 return cb(this.builder._connection);
237 }
238
239 if (this.connection) {
240 return cb(this.connection);
241 }
242 return this.client
243 .acquireConnection()
244 .catch(Bluebird.TimeoutError, (error) => {
245 if (this.builder) {
246 error.sql = this.builder.sql;
247 error.bindings = this.builder.bindings;
248 }
249 throw error;
250 })
251 .then(async (connection) => {
252 try {
253 return await cb(connection);
254 } finally {
255 await this.client.releaseConnection(this.connection);
256 }
257 });
258 },
259});
260
261module.exports = Runner;