UNPKG

8.58 kBJavaScriptView Raw
1const Bluebird = require('bluebird');
2
3let PassThrough;
4
5// The "Runner" constructor takes a "builder" (query, schema, or raw)
6// and runs through each of the query statements, calling any additional
7// "output" method provided alongside the query and bindings.
8function Runner(client, builder) {
9 this.client = client;
10 this.builder = builder;
11 this.queries = [];
12
13 // The "connection" object is set on the runner when
14 // "run" is called.
15 this.connection = void 0;
16}
17
18Object.assign(Runner.prototype, {
19 // "Run" the target, calling "toSQL" on the builder, returning
20 // an object or array of queries to run, each of which are run on
21 // a single connection.
22 run() {
23 const runner = this;
24 return (
25 Bluebird.using(this.ensureConnection(), function(connection) {
26 runner.connection = connection;
27
28 runner.client.emit('start', runner.builder);
29 runner.builder.emit('start', runner.builder);
30 const sql = runner.builder.toSQL();
31
32 if (runner.builder._debug) {
33 runner.client.logger.debug(sql);
34 }
35
36 if (Array.isArray(sql)) {
37 return runner.queryArray(sql);
38 }
39 return runner.query(sql);
40 })
41
42 // If there are any "error" listeners, we fire an error event
43 // and then re-throw the error to be eventually handled by
44 // the promise chain. Useful if you're wrapping in a custom `Promise`.
45 .catch(function(err) {
46 if (runner.builder._events && runner.builder._events.error) {
47 runner.builder.emit('error', err);
48 }
49 throw err;
50 })
51
52 // Fire a single "end" event on the builder when
53 // all queries have successfully completed.
54 .then(function(res) {
55 runner.builder.emit('end');
56 return res;
57 })
58 );
59 },
60
61 // Stream the result set, by passing through to the dialect's streaming
62 // capabilities. If the options are
63 stream(options, handler) {
64 // If we specify stream(handler).then(...
65 if (arguments.length === 1) {
66 if (typeof options === 'function') {
67 handler = options;
68 options = {};
69 }
70 }
71
72 // Determines whether we emit an error or throw here.
73 const hasHandler = typeof handler === 'function';
74
75 // Lazy-load the "PassThrough" dependency.
76 PassThrough = PassThrough || require('stream').PassThrough;
77
78 const runner = this;
79 const stream = new PassThrough({ objectMode: true });
80
81 let hasConnection = false;
82 const promise = Bluebird.using(this.ensureConnection(), function(
83 connection
84 ) {
85 hasConnection = true;
86 runner.connection = connection;
87 try {
88 const sql = runner.builder.toSQL();
89
90 if (Array.isArray(sql) && hasHandler) {
91 throw new Error(
92 'The stream may only be used with a single query statement.'
93 );
94 }
95
96 return runner.client.stream(runner.connection, sql, stream, options);
97 } catch (e) {
98 stream.emit('error', e);
99 throw e;
100 }
101 });
102
103 // If a function is passed to handle the stream, send the stream
104 // there and return the promise, otherwise just return the stream
105 // and the promise will take care of itself.
106 if (hasHandler) {
107 handler(stream);
108 return promise;
109 }
110
111 // Emit errors on the stream if the error occurred before a connection
112 // could be acquired.
113 // If the connection was acquired, assume the error occurred in the client
114 // code and has already been emitted on the stream. Don't emit it twice.
115 promise.catch(function(err) {
116 if (!hasConnection) stream.emit('error', err);
117 });
118 return stream;
119 },
120
121 // Allow you to pipe the stream to a writable stream.
122 pipe(writable, options) {
123 return this.stream(options).pipe(writable);
124 },
125
126 // "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
127 // to run in sequence, and on the same connection, especially helpful when schema building
128 // and dealing with foreign key constraints, etc.
129 query: async function(obj) {
130 const { __knexUid, __knexTxId } = this.connection;
131
132 this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
133
134 const runner = this;
135 let queryPromise = this.client.query(this.connection, obj);
136
137 if (obj.timeout) {
138 queryPromise = queryPromise.timeout(obj.timeout);
139 }
140
141 // Await the return value of client.processResponse; in the case of sqlite3's
142 // dropColumn()/renameColumn(), it will be a Promise for the transaction
143 // containing the complete rename procedure.
144 return queryPromise
145 .then((resp) => this.client.processResponse(resp, runner))
146 .then((processedResponse) => {
147 const queryContext = this.builder.queryContext();
148 const postProcessedResponse = this.client.postProcessResponse(
149 processedResponse,
150 queryContext
151 );
152
153 this.builder.emit(
154 'query-response',
155 postProcessedResponse,
156 Object.assign({ __knexUid: this.connection.__knexUid }, obj),
157 this.builder
158 );
159
160 this.client.emit(
161 'query-response',
162 postProcessedResponse,
163 Object.assign({ __knexUid: this.connection.__knexUid }, obj),
164 this.builder
165 );
166
167 return postProcessedResponse;
168 })
169 .catch(Bluebird.TimeoutError, (error) => {
170 const { timeout, sql, bindings } = obj;
171
172 let cancelQuery;
173 if (obj.cancelOnTimeout) {
174 cancelQuery = this.client.cancelQuery(this.connection);
175 } else {
176 // If we don't cancel the query, we need to mark the connection as disposed so that
177 // it gets destroyed by the pool and is never used again. If we don't do this and
178 // return the connection to the pool, it will be useless until the current operation
179 // that timed out, finally finishes.
180 this.connection.__knex__disposed = error;
181 cancelQuery = Bluebird.resolve();
182 }
183
184 return cancelQuery
185 .catch((cancelError) => {
186 // If the cancellation failed, we need to mark the connection as disposed so that
187 // it gets destroyed by the pool and is never used again. If we don't do this and
188 // return the connection to the pool, it will be useless until the current operation
189 // that timed out, finally finishes.
190 this.connection.__knex__disposed = error;
191
192 // cancellation failed
193 throw Object.assign(cancelError, {
194 message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
195 sql,
196 bindings,
197 timeout,
198 });
199 })
200 .then(() => {
201 // cancellation succeeded, rethrow timeout error
202 throw Object.assign(error, {
203 message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
204 sql,
205 bindings,
206 timeout,
207 });
208 });
209 })
210 .catch((error) => {
211 this.builder.emit(
212 'query-error',
213 error,
214 Object.assign({ __knexUid: this.connection.__knexUid }, obj)
215 );
216 throw error;
217 });
218 },
219
220 // In the case of the "schema builder" we call `queryArray`, which runs each
221 // of the queries in sequence.
222 queryArray(queries) {
223 return queries.length === 1
224 ? this.query(queries[0])
225 : Bluebird.bind(this)
226 .return(queries)
227 .reduce(function(memo, query) {
228 return this.query(query).then(function(resp) {
229 memo.push(resp);
230 return memo;
231 });
232 }, []);
233 },
234
235 // Check whether there's a transaction flag, and that it has a connection.
236 ensureConnection() {
237 // Use override from a builder if passed
238 if (this.builder._connection) {
239 return Bluebird.resolve(this.builder._connection);
240 }
241
242 if (this.connection) {
243 return Bluebird.resolve(this.connection);
244 }
245 return this.client
246 .acquireConnection()
247 .catch(Bluebird.TimeoutError, (error) => {
248 if (this.builder) {
249 error.sql = this.builder.sql;
250 error.bindings = this.builder.bindings;
251 }
252 throw error;
253 })
254 .disposer(() => {
255 // need to return promise or null from handler to prevent warning from bluebird
256 return this.client.releaseConnection(this.connection);
257 });
258 },
259});
260
261module.exports = Runner;