UNPKG

9.26 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16'use strict';
17const util = require('util');
18
19const errors = require('./errors');
20const types = require('./types');
21const utils = require('./utils');
22const RequestExecution = require('./request-execution');
23const promiseUtils = require('./promise-utils');
24
25/**
26 * Handles a BATCH, QUERY and EXECUTE request to the server, dealing with host fail-over and retries on error
27 */
28class RequestHandler {
29 /**
30 * Creates a new instance of RequestHandler.
31 * @param {Request} request
32 * @param {ExecutionOptions} execOptions
33 * @param {Client} client Client instance used to retrieve and set the keyspace.
34 */
35 constructor(request, execOptions, client) {
36 this.client = client;
37 this._speculativeExecutionPlan = client.options.policies.speculativeExecution.newPlan(
38 client.keyspace, request.query || request.queries);
39 this.logEmitter = client.options.logEmitter;
40 this.log = utils.log;
41 this.request = request;
42 this.executionOptions = execOptions;
43 this.stackContainer = null;
44 this.triedHosts = {};
45 // start at -1 as first request does not count.
46 this.speculativeExecutions = -1;
47 this._hostIterator = null;
48 this._resolveCallback = null;
49 this._rejectCallback = null;
50 this._newExecutionTimeout = null;
51 /** @type {RequestExecution[]} */
52 this._executions = [];
53 }
54
55 /**
56 * Sends a new BATCH, QUERY or EXECUTE request.
57 * @param {Request} request
58 * @param {ExecutionOptions} execOptions
59 * @param {Client} client Client instance used to retrieve and set the keyspace.
60 * @returns {Promise<ResultSet>}
61 */
62 static send(request, execOptions, client) {
63 const instance = new RequestHandler(request, execOptions, client);
64 return instance.send();
65 }
66
67 /**
68 * Gets a connection from the next host according to the query plan or throws a NoHostAvailableError.
69 * @returns {{host, connection}}
70 * @throws {NoHostAvailableError}
71 */
72 getNextConnection() {
73 let host;
74 let connection;
75 const iterator = this._hostIterator;
76
77 // Get a host that is UP in a sync loop
78 while (true) {
79 const item = iterator.next();
80 if (item.done) {
81 throw new errors.NoHostAvailableError(this.triedHosts);
82 }
83
84 host = item.value;
85
86 // Set the distance relative to the client first
87 const distance = this.client.profileManager.getDistance(host);
88 if (distance === types.distance.ignored) {
89 //If its marked as ignore by the load balancing policy, move on.
90 continue;
91 }
92
93 if (!host.isUp()) {
94 this.triedHosts[host.address] = 'Host considered as DOWN';
95 continue;
96 }
97
98 try {
99 connection = host.borrowConnection();
100 this.triedHosts[host.address] = null;
101 break;
102 } catch (err) {
103 this.triedHosts[host.address] = err;
104 }
105 }
106
107 return { connection, host };
108 }
109
110 /**
111 * Gets an available connection and sends the request
112 * @returns {Promise<ResultSet>}
113 */
114 send() {
115 if (this.executionOptions.getCaptureStackTrace()) {
116 Error.captureStackTrace(this.stackContainer = {});
117 }
118
119 return new Promise((resolve, reject) => {
120 this._resolveCallback = resolve;
121 this._rejectCallback = reject;
122
123 const lbp = this.executionOptions.getLoadBalancingPolicy();
124 const fixedHost = this.executionOptions.getFixedHost();
125
126 if (fixedHost) {
127 // if host is configured bypass load balancing policy and use
128 // a single host plan.
129 this._hostIterator = utils.arrayIterator([fixedHost]);
130 promiseUtils.toBackground(this._startNewExecution());
131 } else {
132 lbp.newQueryPlan(this.client.keyspace, this.executionOptions, (err, iterator) => {
133 if (err) {
134 return reject(err);
135 }
136
137 this._hostIterator = iterator;
138 promiseUtils.toBackground(this._startNewExecution());
139 });
140 }
141 });
142 }
143
144 /**
145 * Starts a new execution on the next host of the query plan.
146 * @param {Boolean} [isSpecExec]
147 * @returns {Promise<void>}
148 * @private
149 */
150 async _startNewExecution(isSpecExec) {
151 if (isSpecExec) {
152 this.client.metrics.onSpeculativeExecution();
153 }
154
155 let host;
156 let connection;
157
158 try {
159 ({ host, connection } = this.getNextConnection());
160 } catch (err) {
161 return this.handleNoHostAvailable(err, null);
162 }
163
164 if (isSpecExec && this._executions.length >= 0 && this._executions[0].wasCancelled()) {
165 // This method was called on the next tick and could not be cleared, the previous execution was cancelled so
166 // there's no point in launching a new execution.
167 return;
168 }
169
170 if (this.client.keyspace && this.client.keyspace !== connection.keyspace) {
171 try {
172 await connection.changeKeyspace(this.client.keyspace);
173 } catch (err) {
174 this.triedHosts[host.address] = err;
175 // The error occurred asynchronously
176 // We can blindly re-try to obtain a different host/connection.
177 return this._startNewExecution(isSpecExec);
178 }
179 }
180
181 const execution = new RequestExecution(this, host, connection);
182 this._executions.push(execution);
183 execution.start();
184
185 if (this.executionOptions.isIdempotent()) {
186 this._scheduleSpeculativeExecution(host);
187 }
188 }
189
190 /**
191 * Schedules next speculative execution, if any.
192 * @param {Host!} host
193 * @private
194 */
195 _scheduleSpeculativeExecution(host) {
196 const delay = this._speculativeExecutionPlan.nextExecution(host);
197 if (typeof delay !== 'number' || delay < 0) {
198 return;
199 }
200
201 if (delay === 0) {
202 // Parallel speculative execution
203 return process.nextTick(() => {
204 promiseUtils.toBackground(this._startNewExecution(true));
205 });
206 }
207
208 // Create timer for speculative execution
209 this._newExecutionTimeout = setTimeout(() =>
210 promiseUtils.toBackground(this._startNewExecution(true)), delay);
211 }
212
213 /**
214 * Sets the keyspace in any connection that is already opened.
215 * @param {Client} client
216 * @returns {Promise}
217 */
218 static setKeyspace(client) {
219 let connection;
220
221 for (const host of client.hosts.values()) {
222 connection = host.getActiveConnection();
223 if (connection) {
224 break;
225 }
226 }
227
228 if (!connection) {
229 throw new errors.DriverInternalError('No active connection found');
230 }
231
232 return connection.changeKeyspace(client.keyspace);
233 }
234
235 /**
236 * @param {Error} err
237 * @param {ResultSet} [result]
238 */
239 setCompleted(err, result) {
240 if (this._newExecutionTimeout !== null) {
241 clearTimeout(this._newExecutionTimeout);
242 }
243
244 // Mark all executions as cancelled
245 for (const execution of this._executions) {
246 execution.cancel();
247 }
248
249 if (err) {
250 if (this.executionOptions.getCaptureStackTrace()) {
251 utils.fixStack(this.stackContainer.stack, err);
252 }
253
254 // Reject the promise
255 return this._rejectCallback(err);
256 }
257
258 if (result.info.warnings) {
259 // Log the warnings from the response
260 result.info.warnings.forEach(function (message, i, warnings) {
261 this.log('warning', util.format(
262 'Received warning (%d of %d) "%s" for "%s"',
263 i + 1,
264 warnings.length,
265 message,
266 this.request.query || 'batch'));
267 }, this);
268 }
269
270 // We used to invoke the callback on next tick to allow stack unwinding and prevent the optimizing compiler to
271 // optimize read and write functions together.
272 // As we are resolving a Promise then() and catch() are always scheduled in the microtask queue
273 // We can invoke the resolve method directly.
274 this._resolveCallback(result);
275 }
276
277 /**
278 * @param {NoHostAvailableError} err
279 * @param {RequestExecution|null} execution
280 */
281 handleNoHostAvailable(err, execution) {
282 if (execution !== null) {
283 // Remove the execution
284 const index = this._executions.indexOf(execution);
285 this._executions.splice(index, 1);
286 }
287
288 if (this._executions.length === 0) {
289 // There aren't any other executions, we should report back to the user that there isn't
290 // a host available for executing the request
291 this.setCompleted(err);
292 }
293 }
294
295 /**
296 * Gets a long lived closure that can fetch the next page.
297 * @returns {Function}
298 */
299 getNextPageHandler() {
300 const request = this.request;
301 const execOptions = this.executionOptions;
302 const client = this.client;
303
304 return function nextPageHandler(pageState) {
305 execOptions.setPageState(pageState);
306 return new RequestHandler(request, execOptions, client).send();
307 };
308 }
309}
310
311module.exports = RequestHandler;