1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 | 'use strict';
|
17 | const util = require('util');
|
18 |
|
19 | const errors = require('./errors');
|
20 | const types = require('./types');
|
21 | const utils = require('./utils');
|
22 | const RequestExecution = require('./request-execution');
|
23 | const promiseUtils = require('./promise-utils');
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | class RequestHandler {
|
29 | |
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 | constructor(request, execOptions, client) {
|
36 | this.client = client;
|
37 | this._speculativeExecutionPlan = client.options.policies.speculativeExecution.newPlan(
|
38 | client.keyspace, request.query || request.queries);
|
39 | this.logEmitter = client.options.logEmitter;
|
40 | this.log = utils.log;
|
41 | this.request = request;
|
42 | this.executionOptions = execOptions;
|
43 | this.stackContainer = null;
|
44 | this.triedHosts = {};
|
45 |
|
46 | this.speculativeExecutions = -1;
|
47 | this._hostIterator = null;
|
48 | this._resolveCallback = null;
|
49 | this._rejectCallback = null;
|
50 | this._newExecutionTimeout = null;
|
51 |
|
52 | this._executions = [];
|
53 | }
|
54 |
|
55 | |
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 |
|
62 | static send(request, execOptions, client) {
|
63 | const instance = new RequestHandler(request, execOptions, client);
|
64 | return instance.send();
|
65 | }
|
66 |
|
67 | |
68 |
|
69 |
|
70 |
|
71 |
|
72 | getNextConnection() {
|
73 | let host;
|
74 | let connection;
|
75 | const iterator = this._hostIterator;
|
76 |
|
77 |
|
78 | while (true) {
|
79 | const item = iterator.next();
|
80 | if (item.done) {
|
81 | throw new errors.NoHostAvailableError(this.triedHosts);
|
82 | }
|
83 |
|
84 | host = item.value;
|
85 |
|
86 |
|
87 | const distance = this.client.profileManager.getDistance(host);
|
88 | if (distance === types.distance.ignored) {
|
89 |
|
90 | continue;
|
91 | }
|
92 |
|
93 | if (!host.isUp()) {
|
94 | this.triedHosts[host.address] = 'Host considered as DOWN';
|
95 | continue;
|
96 | }
|
97 |
|
98 | try {
|
99 | connection = host.borrowConnection();
|
100 | this.triedHosts[host.address] = null;
|
101 | break;
|
102 | } catch (err) {
|
103 | this.triedHosts[host.address] = err;
|
104 | }
|
105 | }
|
106 |
|
107 | return { connection, host };
|
108 | }
|
109 |
|
110 | |
111 |
|
112 |
|
113 |
|
114 | send() {
|
115 | if (this.executionOptions.getCaptureStackTrace()) {
|
116 | Error.captureStackTrace(this.stackContainer = {});
|
117 | }
|
118 |
|
119 | return new Promise((resolve, reject) => {
|
120 | this._resolveCallback = resolve;
|
121 | this._rejectCallback = reject;
|
122 |
|
123 | const lbp = this.executionOptions.getLoadBalancingPolicy();
|
124 | const fixedHost = this.executionOptions.getFixedHost();
|
125 |
|
126 | if (fixedHost) {
|
127 |
|
128 |
|
129 | this._hostIterator = utils.arrayIterator([fixedHost]);
|
130 | promiseUtils.toBackground(this._startNewExecution());
|
131 | } else {
|
132 | lbp.newQueryPlan(this.client.keyspace, this.executionOptions, (err, iterator) => {
|
133 | if (err) {
|
134 | return reject(err);
|
135 | }
|
136 |
|
137 | this._hostIterator = iterator;
|
138 | promiseUtils.toBackground(this._startNewExecution());
|
139 | });
|
140 | }
|
141 | });
|
142 | }
|
143 |
|
144 | |
145 |
|
146 |
|
147 |
|
148 |
|
149 |
|
150 | async _startNewExecution(isSpecExec) {
|
151 | if (isSpecExec) {
|
152 | this.client.metrics.onSpeculativeExecution();
|
153 | }
|
154 |
|
155 | let host;
|
156 | let connection;
|
157 |
|
158 | try {
|
159 | ({ host, connection } = this.getNextConnection());
|
160 | } catch (err) {
|
161 | return this.handleNoHostAvailable(err, null);
|
162 | }
|
163 |
|
164 | if (isSpecExec && this._executions.length >= 0 && this._executions[0].wasCancelled()) {
|
165 |
|
166 |
|
167 | return;
|
168 | }
|
169 |
|
170 | if (this.client.keyspace && this.client.keyspace !== connection.keyspace) {
|
171 | try {
|
172 | await connection.changeKeyspace(this.client.keyspace);
|
173 | } catch (err) {
|
174 | this.triedHosts[host.address] = err;
|
175 |
|
176 |
|
177 | return this._startNewExecution(isSpecExec);
|
178 | }
|
179 | }
|
180 |
|
181 | const execution = new RequestExecution(this, host, connection);
|
182 | this._executions.push(execution);
|
183 | execution.start();
|
184 |
|
185 | if (this.executionOptions.isIdempotent()) {
|
186 | this._scheduleSpeculativeExecution(host);
|
187 | }
|
188 | }
|
189 |
|
190 | |
191 |
|
192 |
|
193 |
|
194 |
|
195 | _scheduleSpeculativeExecution(host) {
|
196 | const delay = this._speculativeExecutionPlan.nextExecution(host);
|
197 | if (typeof delay !== 'number' || delay < 0) {
|
198 | return;
|
199 | }
|
200 |
|
201 | if (delay === 0) {
|
202 |
|
203 | return process.nextTick(() => {
|
204 | promiseUtils.toBackground(this._startNewExecution(true));
|
205 | });
|
206 | }
|
207 |
|
208 |
|
209 | this._newExecutionTimeout = setTimeout(() =>
|
210 | promiseUtils.toBackground(this._startNewExecution(true)), delay);
|
211 | }
|
212 |
|
213 | |
214 |
|
215 |
|
216 |
|
217 |
|
218 | static setKeyspace(client) {
|
219 | let connection;
|
220 |
|
221 | for (const host of client.hosts.values()) {
|
222 | connection = host.getActiveConnection();
|
223 | if (connection) {
|
224 | break;
|
225 | }
|
226 | }
|
227 |
|
228 | if (!connection) {
|
229 | throw new errors.DriverInternalError('No active connection found');
|
230 | }
|
231 |
|
232 | return connection.changeKeyspace(client.keyspace);
|
233 | }
|
234 |
|
235 | |
236 |
|
237 |
|
238 |
|
239 | setCompleted(err, result) {
|
240 | if (this._newExecutionTimeout !== null) {
|
241 | clearTimeout(this._newExecutionTimeout);
|
242 | }
|
243 |
|
244 |
|
245 | for (const execution of this._executions) {
|
246 | execution.cancel();
|
247 | }
|
248 |
|
249 | if (err) {
|
250 | if (this.executionOptions.getCaptureStackTrace()) {
|
251 | utils.fixStack(this.stackContainer.stack, err);
|
252 | }
|
253 |
|
254 |
|
255 | return this._rejectCallback(err);
|
256 | }
|
257 |
|
258 | if (result.info.warnings) {
|
259 |
|
260 | result.info.warnings.forEach(function (message, i, warnings) {
|
261 | this.log('warning', util.format(
|
262 | 'Received warning (%d of %d) "%s" for "%s"',
|
263 | i + 1,
|
264 | warnings.length,
|
265 | message,
|
266 | this.request.query || 'batch'));
|
267 | }, this);
|
268 | }
|
269 |
|
270 |
|
271 |
|
272 |
|
273 |
|
274 | this._resolveCallback(result);
|
275 | }
|
276 |
|
277 | |
278 |
|
279 |
|
280 |
|
281 | handleNoHostAvailable(err, execution) {
|
282 | if (execution !== null) {
|
283 |
|
284 | const index = this._executions.indexOf(execution);
|
285 | this._executions.splice(index, 1);
|
286 | }
|
287 |
|
288 | if (this._executions.length === 0) {
|
289 |
|
290 |
|
291 | this.setCompleted(err);
|
292 | }
|
293 | }
|
294 |
|
295 | |
296 |
|
297 |
|
298 |
|
299 | getNextPageHandler() {
|
300 | const request = this.request;
|
301 | const execOptions = this.executionOptions;
|
302 | const client = this.client;
|
303 |
|
304 | return function nextPageHandler(pageState) {
|
305 | execOptions.setPageState(pageState);
|
306 | return new RequestHandler(request, execOptions, client).send();
|
307 | };
|
308 | }
|
309 | }
|
310 |
|
311 | module.exports = RequestHandler;
|