1 | /*
|
2 | * Copyright DataStax, Inc.
|
3 | *
|
4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | * you may not use this file except in compliance with the License.
|
6 | * You may obtain a copy of the License at
|
7 | *
|
8 | * http://www.apache.org/licenses/LICENSE-2.0
|
9 | *
|
10 | * Unless required by applicable law or agreed to in writing, software
|
11 | * distributed under the License is distributed on an "AS IS" BASIS,
|
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | * See the License for the specific language governing permissions and
|
14 | * limitations under the License.
|
15 | */
|
16 | ;
|
17 |
|
18 | const events = require('events');
|
19 | const util = require('util');
|
20 |
|
21 | const utils = require('./utils.js');
|
22 | const errors = require('./errors.js');
|
23 | const types = require('./types');
|
24 | const { ProfileManager } = require('./execution-profile');
|
25 | const requests = require('./requests');
|
26 | const clientOptions = require('./client-options');
|
27 | const ClientState = require('./metadata/client-state');
|
28 | const description = require('../package.json').description;
|
29 | const { version } = require('../package.json');
|
30 | const { DefaultExecutionOptions } = require('./execution-options');
|
31 | const ControlConnection = require('./control-connection');
|
32 | const RequestHandler = require('./request-handler');
|
33 | const PrepareHandler = require('./prepare-handler');
|
34 | const InsightsClient = require('./insights-client');
|
35 | const cloud = require('./datastax/cloud');
|
36 | const GraphExecutor = require('./datastax/graph/graph-executor');
|
37 | const promiseUtils = require('./promise-utils');
|
38 |
|
39 | /**
|
40 | * Max amount of pools being warmup in parallel, when warmup is enabled
|
41 | * @private
|
42 | */
|
43 | const warmupLimit = 32;
|
44 |
|
45 | /**
|
46 | * Client options.
|
47 | * <p>While the driver provides lots of extensibility points and configurability, few client options are required.</p>
|
48 | * <p>Default values for all settings are designed to be suitable for the majority of use cases, you should avoid
|
49 | * fine tuning it when not needed.</p>
|
50 | * <p>See [Client constructor]{@link Client} documentation for recommended options.</p>
|
51 | * @typedef {Object} ClientOptions
|
52 | * @property {Array.<string>} contactPoints
|
53 | * Array of addresses or host names of the nodes to add as contact points.
|
54 | * <p>
|
55 | * Contact points are addresses of Cassandra nodes that the driver uses to discover the cluster topology.
|
56 | * </p>
|
57 | * <p>
|
58 | * Only one contact point is required (the driver will retrieve the address of the other nodes automatically),
|
59 | * but it is usually a good idea to provide more than one contact point, because if that single contact point is
|
60 | * unavailable, the driver will not be able to initialize correctly.
|
61 | * </p>
|
62 | * @property {String} [localDataCenter] The local data center to use.
|
63 | * <p>
|
64 | * If using DCAwareRoundRobinPolicy (default), this option is required and only hosts from this data center are
|
65 | * connected to and used in query plans.
|
66 | * </p>
|
67 | * @property {String} [keyspace] The logged keyspace for all the connections created within the {@link Client} instance.
|
68 | * @property {Object} [credentials] An object containing the username and password for plain-text authentication.
|
69 | * It configures the authentication provider to be used against Apache Cassandra's PasswordAuthenticator or DSE's
|
70 | * DseAuthenticator, when default auth scheme is plain-text.
|
71 | * <p>
|
72 | * Note that you should configure either <code>credentials</code> or <code>authProvider</code> to connect to an
|
73 | * auth-enabled cluster, but not both.
|
74 | * </p>
|
75 | * @property {String} [credentials.username] The username to use for plain-text authentication.
|
76 | * @property {String} [credentials.password] The password to use for plain-text authentication.
|
77 | * @property {Uuid} [id] A unique identifier assigned to a {@link Client} object, that will be communicated to the
|
78 | * server (DSE 6.0+) to identify the client instance created with this options. When not defined, the driver will
|
79 | * generate a random identifier.
|
80 | * @property {String} [applicationName] An optional setting identifying the name of the application using
|
81 | * the {@link Client} instance.
|
82 | * <p>This value is passed to DSE and is useful as metadata for describing a client connection on the server side.</p>
|
83 | * @property {String} [applicationVersion] An optional setting identifying the version of the application using
|
84 | * the {@link Client} instance.
|
85 | * <p>This value is passed to DSE and is useful as metadata for describing a client connection on the server side.</p>
|
86 | * @property {Object} [monitorReporting] Options for reporting mechanism from the client to the DSE server, for
|
87 | * versions that support it.
|
88 | * @property {Boolean} [monitorReporting.enabled=true] Determines whether the reporting mechanism is enabled.
|
89 | * Defaults to <code>true</code>.
|
90 | * @property {Object} [cloud] The options to connect to a cloud instance.
|
91 | * @property {String|URL} cloud.secureConnectBundle Determines the file path for the credentials file bundle.
|
92 | * @property {Number} [refreshSchemaDelay] The default window size in milliseconds used to debounce node list and schema
|
93 | * refresh metadata requests. Default: 1000.
|
94 | * @property {Boolean} [isMetadataSyncEnabled] Determines whether client-side schema metadata retrieval and update is
|
95 | * enabled.
|
96 | * <p>Setting this value to <code>false</code> will cause keyspace information not to be automatically loaded, affecting
|
97 | * replica calculation per token in the different keyspaces. When disabling metadata synchronization, use
|
98 | * [Metadata.refreshKeyspaces()]{@link module:metadata~Metadata#refreshKeyspaces} to keep keyspace information up to
|
99 | * date or token-awareness will not work correctly.</p>
|
100 | * Default: <code>true</code>.
|
101 | * @property {Boolean} [prepareOnAllHosts] Determines if the driver should prepare queries on all hosts in the cluster.
|
102 | * Default: <code>true</code>.
|
103 | * @property {Boolean} [rePrepareOnUp] Determines if the driver should re-prepare all cached prepared queries on a
|
104 | * host when it marks it back up.
|
105 | * Default: <code>true</code>.
|
106 | * @property {Number} [maxPrepared] Determines the maximum amount of different prepared queries before evicting items
|
107 | * from the internal cache. Reaching a high threshold hints that the queries are not being reused, like when
|
108 | * hard-coding parameter values inside the queries.
|
109 | * Default: <code>500</code>.
|
110 | * @property {Object} [policies]
|
111 | * @property {LoadBalancingPolicy} [policies.loadBalancing] The load balancing policy instance to be used to determine
|
112 | * the coordinator per query.
|
113 | * @property {RetryPolicy} [policies.retry] The retry policy.
|
114 | * @property {ReconnectionPolicy} [policies.reconnection] The reconnection policy to be used.
|
115 | * @property {AddressTranslator} [policies.addressResolution] The address resolution policy.
|
116 | * @property {SpeculativeExecutionPolicy} [policies.speculativeExecution] The <code>SpeculativeExecutionPolicy</code>
|
117 | * instance to be used to determine if the client should send speculative queries when the selected host takes more
|
118 | * time than expected.
|
119 | * <p>
|
120 | * Default: <code>[NoSpeculativeExecutionPolicy]{@link
|
121 | * module:policies/speculativeExecution~NoSpeculativeExecutionPolicy}</code>
|
122 | * </p>
|
123 | * @property {TimestampGenerator} [policies.timestampGeneration] The client-side
|
124 | * [query timestamp generator]{@link module:policies/timestampGeneration~TimestampGenerator}.
|
125 | * <p>
|
126 | * Default: <code>[MonotonicTimestampGenerator]{@link module:policies/timestampGeneration~MonotonicTimestampGenerator}
|
127 | * </code>
|
128 | * </p>
|
129 | * <p>Use <code>null</code> to disable client-side timestamp generation.</p>
|
130 | * @property {QueryOptions} [queryOptions] Default options for all queries.
|
131 | * @property {Object} [pooling] Pooling options.
|
132 | * @property {Number} [pooling.heartBeatInterval] The amount of idle time in milliseconds that has to pass before the
|
133 | * driver issues a request on an active connection to avoid idle time disconnections. Default: 30000.
|
134 | * @property {Object} [pooling.coreConnectionsPerHost] Associative array containing amount of connections per host
|
135 | * distance.
|
136 | * @property {Number} [pooling.maxRequestsPerConnection] The maximum number of requests per connection. The default
|
137 | * value is:
|
138 | * <ul>
|
139 | * <li>For modern protocol versions (v3 and above): 2048</li>
|
140 | * <li>For older protocol versions (v1 and v2): 128</li>
|
141 | * </ul>
|
142 | * @property {Boolean} [pooling.warmup] Determines if all connections to hosts in the local datacenter must be opened on
|
143 | * connect. Default: true.
|
144 | * @property {Object} [protocolOptions]
|
145 | * @property {Number} [protocolOptions.port] The port to use to connect to the Cassandra host. If not set through this
|
146 | * method, the default port (9042) will be used instead.
|
147 | * @property {Number} [protocolOptions.maxSchemaAgreementWaitSeconds] The maximum time in seconds to wait for schema
|
148 | * agreement between nodes before returning from a DDL query. Default: 10.
|
149 | * @property {Number} [protocolOptions.maxVersion] When set, it limits the maximum protocol version used to connect to
|
150 | * the nodes.
|
151 | * Useful for using the driver against a cluster that contains nodes with different major/minor versions of Cassandra.
|
152 | * @property {Boolean} [protocolOptions.noCompact] When set to true, enables the NO_COMPACT startup option.
|
153 | * <p>
|
154 | * When this option is supplied <code>SELECT</code>, <code>UPDATE</code>, <code>DELETE</code>, and <code>BATCH</code>
|
155 | * statements on <code>COMPACT STORAGE</code> tables function in "compatibility" mode which allows seeing these tables
|
156 | * as if they were "regular" CQL tables.
|
157 | * </p>
|
158 | * <p>
|
159 | * This option only effects interactions with interactions with tables using <code>COMPACT STORAGE</code> and is only
|
160 | * supported by C* 3.0.16+, 3.11.2+, 4.0+ and DSE 6.0+.
|
161 | * </p>
|
162 | * @property {Object} [socketOptions]
|
163 | * @property {Number} [socketOptions.connectTimeout] Connection timeout in milliseconds. Default: 5000.
|
164 | * @property {Number} [socketOptions.defunctReadTimeoutThreshold] Determines the amount of requests that simultaneously
|
165 | * have to timeout before closing the connection. Default: 64.
|
166 | * @property {Boolean} [socketOptions.keepAlive] Whether to enable TCP keep-alive on the socket. Default: true.
|
167 | * @property {Number} [socketOptions.keepAliveDelay] TCP keep-alive delay in milliseconds. Default: 0.
|
168 | * @property {Number} [socketOptions.readTimeout] Per-host read timeout in milliseconds.
|
169 | * <p>
|
170 | * Please note that this is not the maximum time a call to {@link Client#execute} may have to wait;
|
171 | * this is the maximum time that call will wait for one particular Cassandra host, but other hosts will be tried if
|
172 | * one of them timeout. In other words, a {@link Client#execute} call may theoretically wait up to
|
173 | * <code>readTimeout * number_of_cassandra_hosts</code> (though the total number of hosts tried for a given query also
|
174 | * depends on the LoadBalancingPolicy in use).
|
175 | * <p>When setting this value, keep in mind the following:</p>
|
176 | * <ul>
|
177 | * <li>the timeout settings used on the Cassandra side (*_request_timeout_in_ms in cassandra.yaml) should be taken
|
178 | * into account when picking a value for this read timeout. You should pick a value a couple of seconds greater than
|
179 | * the Cassandra timeout settings.
|
180 | * </li>
|
181 | * <li>
|
182 | * the read timeout is only approximate and only control the timeout to one Cassandra host, not the full query.
|
183 | * </li>
|
184 | * </ul>
|
185 | * Setting a value of 0 disables read timeouts. Default: <code>12000</code>.
|
186 | * @property {Boolean} [socketOptions.tcpNoDelay] When set to true, it disables the Nagle algorithm. Default: true.
|
187 | * @property {Number} [socketOptions.coalescingThreshold] Buffer length in bytes use by the write queue before flushing
|
188 | * the frames. Default: 8000.
|
189 | * @property {AuthProvider} [authProvider] Provider to be used to authenticate to an auth-enabled cluster.
|
190 | * @property {RequestTracker} [requestTracker] The instance of RequestTracker used to monitor or log requests executed
|
191 | * with this instance.
|
192 | * @property {Object} [sslOptions] Client-to-node ssl options. When set the driver will use the secure layer.
|
193 | * You can specify cert, ca, ... options named after the Node.js <code>tls.connect()</code> options.
|
194 | * <p>
|
195 | * It uses the same default values as Node.js <code>tls.connect()</code> except for <code>rejectUnauthorized</code>
|
196 | * which is set to <code>false</code> by default (for historical reasons). This setting is likely to change
|
197 | * in upcoming versions to enable validation by default.
|
198 | * </p>
|
199 | * @property {Object} [encoding] Encoding options.
|
200 | * @property {Function} [encoding.map] Map constructor to use for Cassandra map<k,v> type encoding and decoding.
|
201 | * If not set, it will default to Javascript Object with map keys as property names.
|
202 | * @property {Function} [encoding.set] Set constructor to use for Cassandra set<k> type encoding and decoding.
|
203 | * If not set, it will default to Javascript Array.
|
204 | * @property {Boolean} [encoding.copyBuffer] Determines if the network buffer should be copied for buffer based data
|
205 | * types (blob, uuid, timeuuid and inet).
|
206 | * <p>
|
207 | * Setting it to true will cause that the network buffer is copied for each row value of those types,
|
208 | * causing additional allocations but freeing the network buffer to be reused.
|
209 | * Setting it to true is a good choice for cases where the Row and ResultSet returned by the queries are long-lived
|
210 | * objects.
|
211 | * </p>
|
212 | * <p>
|
213 | * Setting it to false will cause less overhead and the reference of the network buffer to be maintained until the row
|
214 | * / result set are de-referenced.
|
215 | * Default: true.
|
216 | * </p>
|
217 | * @property {Boolean} [encoding.useUndefinedAsUnset] Valid for Cassandra 2.2 and above. Determines that, if a parameter
|
218 | * is set to
|
219 | * <code>undefined</code> it should be encoded as <code>unset</code>.
|
220 | * <p>
|
221 | * By default, ECMAScript <code>undefined</code> is encoded as <code>null</code> in the driver. Cassandra 2.2
|
222 | * introduced the concept of unset.
|
223 | * At driver level, you can set a parameter to unset using the field <code>types.unset</code>. Setting this flag to
|
224 | * true allows you to use ECMAScript undefined as Cassandra <code>unset</code>.
|
225 | * </p>
|
226 | * <p>
|
227 | * Default: true.
|
228 | * </p>
|
229 | * @property {Boolean} [encoding.useBigIntAsLong] Use [BigInt ECMAScript type](https://tc39.github.io/proposal-bigint/)
|
230 | * to represent CQL bigint and counter data types.
|
231 | * @property {Boolean} [encoding.useBigIntAsVarint] Use [BigInt ECMAScript
|
232 | * type](https://tc39.github.io/proposal-bigint/) to represent CQL varint data type.
|
233 | * @property {Array.<ExecutionProfile>} [profiles] The array of [execution profiles]{@link ExecutionProfile}.
|
234 | * @property {Function} [promiseFactory] Function to be used to create a <code>Promise</code> from a
|
235 | * callback-style function.
|
236 | * <p>
|
237 | * Promise libraries often provide different methods to create a promise. For example, you can use Bluebird's
|
238 | * <code>Promise.fromCallback()</code> method.
|
239 | * </p>
|
240 | * <p>
|
241 | * By default, the driver will use the
|
242 | * [Promise constructor]{@link https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise}.
|
243 | * </p>
|
244 | */
|
245 |
|
246 | /**
|
247 | * Query options
|
248 | * @typedef {Object} QueryOptions
|
249 | * @property {Boolean} [autoPage] Determines if the driver must retrieve the following result pages automatically.
|
250 | * <p>
|
251 | * This setting is only considered by the [Client#eachRow()]{@link Client#eachRow} method. For more information,
|
252 | * check the
|
253 | * [paging results documentation]{@link https://docs.datastax.com/en/developer/nodejs-driver/latest/features/paging/}.
|
254 | * </p>
|
255 | * @property {Boolean} [captureStackTrace] Determines if the stack trace before the query execution should be
|
256 | * maintained.
|
257 | * <p>
|
258 | * Useful for debugging purposes, it should be set to <code>false</code> under production environment as it adds an
|
259 | * unnecessary overhead to each execution.
|
260 | * </p>
|
261 | * Default: false.
|
262 | * @property {Number} [consistency] [Consistency level]{@link module:types~consistencies}.
|
263 | * <p>
|
264 | * Defaults to <code>localOne</code> for Apache Cassandra and DSE deployments.
|
265 | * For DataStax Astra, it defaults to <code>localQuorum</code>.
|
266 | * </p>
|
267 | * @property {Object} [customPayload] Key-value payload to be passed to the server. On the Cassandra side,
|
268 | * implementations of QueryHandler can use this data.
|
269 | * @property {String} [executeAs] The user or role name to act as when executing this statement.
|
270 | * <p>When set, it executes as a different user/role than the one currently authenticated (a.k.a. proxy execution).</p>
|
271 | * <p>This feature is only available in DSE 5.1+.</p>
|
272 | * @property {String|ExecutionProfile} [executionProfile] Name or instance of the [profile]{@link ExecutionProfile} to
|
273 | * be used for this execution. If not set, it will the use "default" execution profile.
|
274 | * @property {Number} [fetchSize] Amount of rows to retrieve per page.
|
275 | * @property {Array|Array<Array>} [hints] Type hints for parameters given in the query, ordered as for the parameters.
|
276 | * <p>For batch queries, an array of such arrays, ordered as with the queries in the batch.</p>
|
277 | * @property {Host} [host] The host that should handle the query.
|
278 | * <p>
|
279 | * Use of this option is <em>heavily discouraged</em> and should only be used in the following cases:
|
280 | * </p>
|
281 | * <ol>
|
282 | * <li>
|
283 | * Querying node-local tables, such as tables in the <code>system</code> and <code>system_views</code>
|
284 | * keyspaces.
|
285 | * </li>
|
286 | * <li>
|
287 | * Applying a series of schema changes, where it may be advantageous to execute schema changes in sequence on the
|
288 | * same node.
|
289 | * </li>
|
290 | * </ol>
|
291 | * <p>
|
292 | * Configuring a specific host causes the configured
|
293 | * [LoadBalancingPolicy]{@link module:policies/loadBalancing~LoadBalancingPolicy} to be completely bypassed.
|
294 | * However, if the load balancing policy dictates that the host is at a
|
295 | * [distance of ignored]{@link module:types~distance} or there is no active connectivity to the host, the request will
|
296 | * fail with a [NoHostAvailableError]{@link module:errors~NoHostAvailableError}.
|
297 | * </p>
|
298 | * @property {Boolean} [isIdempotent] Defines whether the query can be applied multiple times without changing the result
|
299 | * beyond the initial application.
|
300 | * <p>
|
301 | * The query execution idempotence can be used at [RetryPolicy]{@link module:policies/retry~RetryPolicy} level to
|
302 | * determine if an statement can be retried in case of request error or write timeout.
|
303 | * </p>
|
304 | * <p>Default: <code>false</code>.</p>
|
305 | * @property {String} [keyspace] Specifies the keyspace for the query. It is used for the following:
|
306 | * <ol>
|
307 | * <li>To indicate what keyspace the statement is applicable to (protocol V5+ only). This is useful when the
|
308 | * query does not provide an explicit keyspace and you want to override the current {@link Client#keyspace}.</li>
|
309 | * <li>For query routing when the query operates on a different keyspace than the current {@link Client#keyspace}.</li>
|
310 | * </ol>
|
311 | * @property {Boolean} [logged] Determines if the batch should be written to the batchlog. Only valid for
|
312 | * [Client#batch()]{@link Client#batch}, it will be ignored by other methods. Default: true.
|
313 | * @property {Boolean} [counter] Determines if its a counter batch. Only valid for
|
314 | * [Client#batch()]{@link Client#batch}, it will be ignored by other methods. Default: false.
|
315 | * @property {Buffer|String} [pageState] Buffer or string token representing the paging state.
|
316 | * <p>Useful for manual paging, if provided, the query will be executed starting from a given paging state.</p>
|
317 | * @property {Boolean} [prepare] Determines if the query must be executed as a prepared statement.
|
318 | * @property {Number} [readTimeout] When defined, it overrides the default read timeout
|
319 | * (<code>socketOptions.readTimeout</code>) in milliseconds for this execution per coordinator.
|
320 | * <p>
|
321 | * Suitable for statements for which the coordinator may allow a longer server-side timeout, for example aggregation
|
322 | * queries.
|
323 | * </p>
|
324 | * <p>
|
325 | * A value of <code>0</code> disables client side read timeout for the execution. Default: <code>undefined</code>.
|
326 | * </p>
|
327 | * @property {RetryPolicy} [retry] Retry policy for the query.
|
328 | * <p>
|
329 | * This property can be used to specify a different [retry policy]{@link module:policies/retry} to the one specified
|
330 | * in the {@link ClientOptions}.policies.
|
331 | * </p>
|
332 | * @property {Array} [routingIndexes] Index of the parameters that are part of the partition key to determine
|
333 | * the routing.
|
334 | * @property {Buffer|Array} [routingKey] Partition key(s) to determine which coordinator should be used for the query.
|
335 | * @property {Array} [routingNames] Array of the parameters names that are part of the partition key to determine the
|
336 | * routing. Only valid for non-prepared requests, it's recommended that you use the prepare flag instead.
|
337 | * @property {Number} [serialConsistency] Serial consistency is the consistency level for the serial phase of
|
338 | * conditional updates.
|
339 | * This option will be ignored for anything else that a conditional update/insert.
|
340 | * @property {Number|Long} [timestamp] The default timestamp for the query in microseconds from the unix epoch
|
341 | * (00:00:00, January 1st, 1970).
|
342 | * <p>If provided, this will replace the server side assigned timestamp as default timestamp.</p>
|
343 | * <p>Use [generateTimestamp()]{@link module:types~generateTimestamp} utility method to generate a valid timestamp
|
344 | * based on a Date and microseconds parts.</p>
|
345 | * @property {Boolean} [traceQuery] Enable query tracing for the execution. Use query tracing to diagnose performance
|
346 | * problems related to query executions. Default: false.
|
347 | * <p>To retrieve trace, you can call [Metadata.getTrace()]{@link module:metadata~Metadata#getTrace} method.</p>
|
348 | * @property {Object} [graphOptions] Default options for graph query executions.
|
349 | * <p>
|
350 | * These options are meant to provide defaults for all graph query executions. Consider using
|
351 | * [execution profiles]{@link ExecutionProfile} if you plan to reuse different set of options across different
|
352 | * query executions.
|
353 | * </p>
|
354 | * @property {String} [graphOptions.language] The graph language to use in graph queries. Default:
|
355 | * <code>'gremlin-groovy'</code>.
|
356 | * @property {String} [graphOptions.name] The graph name to be used in all graph queries.
|
357 | * <p>
|
358 | * This property is required but there is no default value for it. This value can be overridden at query level.
|
359 | * </p>
|
360 | * @property {Number} [graphOptions.readConsistency] Overrides the
|
361 | * [consistency level]{@link module:types~consistencies}
|
362 | * defined in the query options for graph read queries.
|
363 | * @property {Number} [graphOptions.readTimeout] Overrides the default per-host read timeout (in milliseconds) for all
|
364 | * graph queries. Default: <code>0</code>.
|
365 | * <p>
|
366 | * Use <code>null</code> to reset the value and use the default on <code>socketOptions.readTimeout</code> .
|
367 | * </p>
|
368 | * @property {String} [graphOptions.source] The graph traversal source name to use in graph queries. Default:
|
369 | * <code>'g'</code>.
|
370 | * @property {Number} [graphOptions.writeConsistency] Overrides the [consistency
|
371 | * level]{@link module:types~consistencies} defined in the query options for graph write queries.
|
372 | */
|
373 |
|
374 | /**
|
375 | * Creates a new instance of {@link Client}.
|
376 | * @classdesc
|
377 | * Represents a database client that maintains multiple connections to the cluster nodes, providing methods to
|
378 | * execute CQL statements.
|
379 | * <p>
|
380 | * The <code>Client</code> uses [policies]{@link module:policies} to decide which nodes to connect to, which node
|
381 | * to use per each query execution, when it should retry failed or timed-out executions and how reconnection to down
|
382 | * nodes should be made.
|
383 | * </p>
|
384 | * @extends EventEmitter
|
385 | * @param {ClientOptions} options The options for this instance.
|
386 | * @example <caption>Creating a new client instance</caption>
|
387 | * const client = new Client({
|
388 | * contactPoints: ['10.0.1.101', '10.0.1.102'],
|
389 | * localDataCenter: 'datacenter1'
|
390 | * });
|
391 | * @example <caption>Executing a query</caption>
|
392 | * const result = await client.connect();
|
393 | * console.log(`Connected to ${client.hosts.length} nodes in the cluster: ${client.hosts.keys().join(', ')}`);
|
394 | * @example <caption>Executing a query</caption>
|
395 | * const result = await client.execute('SELECT key FROM system.local');
|
396 | * const row = result.first();
|
397 | * console.log(row['key']);
|
398 | * @constructor
|
399 | */
|
400 | function Client(options) {
|
401 | events.EventEmitter.call(this);
|
402 | this.options = clientOptions.extend({ logEmitter: this.emit.bind(this), id: types.Uuid.random() }, options);
|
403 | Object.defineProperty(this, 'profileManager', { value: new ProfileManager(this.options) });
|
404 | Object.defineProperty(this, 'controlConnection', {
|
405 | value: new ControlConnection(this.options, this.profileManager), writable: true }
|
406 | );
|
407 | Object.defineProperty(this, 'insightsClient', { value: new InsightsClient(this)});
|
408 |
|
409 | //Unlimited amount of listeners for internal event queues by default
|
410 | this.setMaxListeners(0);
|
411 | this.connected = false;
|
412 | this.isShuttingDown = false;
|
413 | /**
|
414 | * Gets the name of the active keyspace.
|
415 | * @type {String}
|
416 | */
|
417 | this.keyspace = options.keyspace;
|
418 | /**
|
419 | * Gets the schema and cluster metadata information.
|
420 | * @type {Metadata}
|
421 | */
|
422 | this.metadata = this.controlConnection.metadata;
|
423 | /**
|
424 | * Gets an associative array of cluster hosts.
|
425 | * @type {HostMap}
|
426 | */
|
427 | this.hosts = this.controlConnection.hosts;
|
428 |
|
429 | /**
|
430 | * The [ClientMetrics]{@link module:metrics~ClientMetrics} instance used to expose measurements of its internal
|
431 | * behavior and of the server as seen from the driver side.
|
432 | * <p>By default, a [DefaultMetrics]{@link module:metrics~DefaultMetrics} instance is used.</p>
|
433 | * @type {ClientMetrics}
|
434 | */
|
435 | this.metrics = this.options.metrics;
|
436 |
|
437 | this._graphExecutor = new GraphExecutor(this, options, this._execute);
|
438 | }
|
439 |
|
440 | util.inherits(Client, events.EventEmitter);
|
441 |
|
442 | /**
|
443 | * Emitted when a new host is added to the cluster.
|
444 | * <ul>
|
445 | * <li>{@link Host} The host being added.</li>
|
446 | * </ul>
|
447 | * @event Client#hostAdd
|
448 | */
|
449 | /**
|
450 | * Emitted when a host is removed from the cluster
|
451 | * <ul>
|
452 | * <li>{@link Host} The host being removed.</li>
|
453 | * </ul>
|
454 | * @event Client#hostRemove
|
455 | */
|
456 | /**
|
457 | * Emitted when a host in the cluster changed status from down to up.
|
458 | * <ul>
|
459 | * <li>{@link Host host} The host that changed the status.</li>
|
460 | * </ul>
|
461 | * @event Client#hostUp
|
462 | */
|
463 | /**
|
464 | * Emitted when a host in the cluster changed status from up to down.
|
465 | * <ul>
|
466 | * <li>{@link Host host} The host that changed the status.</li>
|
467 | * </ul>
|
468 | * @event Client#hostDown
|
469 | */
|
470 |
|
471 | /**
|
472 | * Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the
|
473 | * cluster.
|
474 | * <p>When the {@link Client} is already connected, it resolves immediately.</p>
|
475 | * <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
|
476 | * @param {function} [callback] The optional callback that is invoked when the pool is connected or it failed to
|
477 | * connect.
|
478 | * @example <caption>Usage example</caption>
|
479 | * await client.connect();
|
480 | */
|
481 | Client.prototype.connect = function (callback) {
|
482 | if (this.connected && callback) {
|
483 | // Avoid creating Promise to immediately resolve them
|
484 | return callback();
|
485 | }
|
486 |
|
487 | return promiseUtils.optionalCallback(this._connect(), callback);
|
488 | };
|
489 |
|
490 | /**
|
491 | * Async-only version of {@link Client#connect()}.
|
492 | * @private
|
493 | */
|
494 | Client.prototype._connect = async function () {
|
495 | if (this.connected) {
|
496 | return;
|
497 | }
|
498 |
|
499 | if (this.isShuttingDown) {
|
500 | //it is being shutdown, don't allow further calls to connect()
|
501 | throw new errors.NoHostAvailableError(null, 'Connecting after shutdown is not supported');
|
502 | }
|
503 |
|
504 | if (this.connecting) {
|
505 | return promiseUtils.fromEvent(this, 'connected');
|
506 | }
|
507 |
|
508 | this.connecting = true;
|
509 | this.log('info', util.format("Connecting to cluster using '%s' version %s", description, version));
|
510 |
|
511 | try {
|
512 | await cloud.init(this.options);
|
513 | await this.controlConnection.init();
|
514 | this.hosts = this.controlConnection.hosts;
|
515 | await this.profileManager.init(this, this.hosts);
|
516 |
|
517 | if (this.keyspace) {
|
518 | await RequestHandler.setKeyspace(this);
|
519 | }
|
520 |
|
521 | clientOptions.setMetadataDependent(this);
|
522 |
|
523 | await this._warmup();
|
524 |
|
525 | } catch (err) {
|
526 | // We should close the pools (if any) and reset the state to allow successive calls to connect()
|
527 | await this.controlConnection.reset();
|
528 | this.connected = false;
|
529 | this.connecting = false;
|
530 | this.emit('connected', err);
|
531 | throw err;
|
532 | }
|
533 |
|
534 | this._setHostListeners();
|
535 |
|
536 | // Set the distance of the control connection host relatively to this instance
|
537 | this.profileManager.getDistance(this.controlConnection.host);
|
538 | this.insightsClient.init();
|
539 | this.connected = true;
|
540 | this.connecting = false;
|
541 | this.emit('connected');
|
542 | };
|
543 |
|
544 | /**
|
545 | * Executes a query on an available connection.
|
546 | * <p>The query can be prepared (recommended) or not depending on the [prepare]{@linkcode QueryOptions} flag.</p>
|
547 | * <p>
|
548 | * Some execution failures can be handled transparently by the driver, according to the
|
549 | * [RetryPolicy]{@linkcode module:policies/retry~RetryPolicy} or the
|
550 | * [SpeculativeExecutionPolicy]{@linkcode module:policies/speculativeExecution} used.
|
551 | * </p>
|
552 | * <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
|
553 | * @param {String} query The query to execute.
|
554 | * @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
|
555 | * as keys and its value.
|
556 | * @param {QueryOptions} [options] The query options for the execution.
|
557 | * @param {ResultCallback} [callback] Executes callback(err, result) when execution completed. When not defined, the
|
558 | * method will return a promise.
|
559 | * @example <caption>Promise-based API, using async/await</caption>
|
560 | * const query = 'SELECT name, email FROM users WHERE id = ?';
|
561 | * const result = await client.execute(query, [ id ], { prepare: true });
|
562 | * const row = result.first();
|
563 | * console.log('%s: %s', row['name'], row['email']);
|
564 | * @example <caption>Callback-based API</caption>
|
565 | * const query = 'SELECT name, email FROM users WHERE id = ?';
|
566 | * client.execute(query, [ id ], { prepare: true }, function (err, result) {
|
567 | * assert.ifError(err);
|
568 | * const row = result.first();
|
569 | * console.log('%s: %s', row['name'], row['email']);
|
570 | * });
|
571 | * @see {@link ExecutionProfile} to reuse a set of options across different query executions.
|
572 | */
|
573 | Client.prototype.execute = function (query, params, options, callback) {
|
574 | // This method acts as a wrapper for the async method _execute()
|
575 |
|
576 | if (!callback) {
|
577 | // Set default argument values for optional parameters
|
578 | if (typeof options === 'function') {
|
579 | callback = options;
|
580 | options = null;
|
581 | } else if (typeof params === 'function') {
|
582 | callback = params;
|
583 | params = null;
|
584 | }
|
585 | }
|
586 |
|
587 | try {
|
588 | const execOptions = DefaultExecutionOptions.create(options, this);
|
589 | return promiseUtils.optionalCallback(this._execute(query, params, execOptions), callback);
|
590 | }
|
591 | catch (err) {
|
592 | // There was an error when parsing the user options
|
593 | if (callback) {
|
594 | return callback(err);
|
595 | }
|
596 |
|
597 | return Promise.reject(err);
|
598 | }
|
599 | };
|
600 |
|
601 | /**
|
602 | * Executes a graph query.
|
603 | * <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
|
604 | * @param {String} query The gremlin query.
|
605 | * @param {Object|null} [parameters] An associative array containing the key and values of the parameters.
|
606 | * @param {GraphQueryOptions|null} [options] The graph query options.
|
607 | * @param {Function} [callback] Function to execute when the response is retrieved, taking two arguments:
|
608 | * <code>err</code> and <code>result</code>. When not defined, the method will return a promise.
|
609 | * @example <caption>Promise-based API, using async/await</caption>
|
610 | * const result = await client.executeGraph('g.V()');
|
611 | * // Get the first item (vertex, edge, scalar value, ...)
|
612 | * const vertex = result.first();
|
613 | * console.log(vertex.label);
|
614 | * @example <caption>Callback-based API</caption>
|
615 | * client.executeGraph('g.V()', (err, result) => {
|
616 | * const vertex = result.first();
|
617 | * console.log(vertex.label);
|
618 | * });
|
619 | * @example <caption>Iterating through the results</caption>
|
620 | * const result = await client.executeGraph('g.E()');
|
621 | * for (let edge of result) {
|
622 | * console.log(edge.label); // created
|
623 | * });
|
624 | * @example <caption>Using result.forEach()</caption>
|
625 | * const result = await client.executeGraph('g.V().hasLabel("person")');
|
626 | * result.forEach(function(vertex) {
|
627 | * console.log(vertex.type); // vertex
|
628 | * console.log(vertex.label); // person
|
629 | * });
|
630 | * @see {@link ExecutionProfile} to reuse a set of options across different query executions.
|
631 | */
|
632 | Client.prototype.executeGraph = function (query, parameters, options, callback) {
|
633 | callback = callback || (options ? options : parameters);
|
634 |
|
635 | if (typeof callback === 'function') {
|
636 | parameters = typeof parameters !== 'function' ? parameters : null;
|
637 | return promiseUtils.toCallback(this._graphExecutor.send(query, parameters, options), callback);
|
638 | }
|
639 |
|
640 | return this._graphExecutor.send(query, parameters, options);
|
641 | };
|
642 |
|
643 | /**
|
644 | * Executes the query and calls <code>rowCallback</code> for each row as soon as they are received. Calls the final
|
645 | * <code>callback</code> after all rows have been sent, or when there is an error.
|
646 | * <p>
|
647 | * The query can be prepared (recommended) or not depending on the [prepare]{@linkcode QueryOptions} flag.
|
648 | * </p>
|
649 | * @param {String} query The query to execute
|
650 | * @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
|
651 | * as keys and its value.
|
652 | * @param {QueryOptions} [options] The query options.
|
653 | * @param {function} rowCallback Executes <code>rowCallback(n, row)</code> per each row received, where n is the row
|
654 | * index and row is the current Row.
|
655 | * @param {function} [callback] Executes <code>callback(err, result)</code> after all rows have been received.
|
656 | * <p>
|
657 | * When dealing with paged results, [ResultSet#nextPage()]{@link module:types~ResultSet#nextPage} method can be used
|
658 | * to retrieve the following page. In that case, <code>rowCallback()</code> will be again called for each row and
|
659 | * the final callback will be invoked when all rows in the following page has been retrieved.
|
660 | * </p>
|
661 | * @example <caption>Using per-row callback and arrow functions</caption>
|
662 | * client.eachRow(query, params, { prepare: true }, (n, row) => console.log(n, row), err => console.error(err));
|
663 | * @example <caption>Overloads</caption>
|
664 | * client.eachRow(query, rowCallback);
|
665 | * client.eachRow(query, params, rowCallback);
|
666 | * client.eachRow(query, params, options, rowCallback);
|
667 | * client.eachRow(query, params, rowCallback, callback);
|
668 | * client.eachRow(query, params, options, rowCallback, callback);
|
669 | */
|
670 | Client.prototype.eachRow = function (query, params, options, rowCallback, callback) {
|
671 | if (!callback && rowCallback && typeof options === 'function') {
|
672 | callback = utils.validateFn(rowCallback, 'rowCallback');
|
673 | rowCallback = options;
|
674 | } else {
|
675 | callback = callback || utils.noop;
|
676 | rowCallback = utils.validateFn(rowCallback || options || params, 'rowCallback');
|
677 | }
|
678 |
|
679 | params = typeof params !== 'function' ? params : null;
|
680 |
|
681 | let execOptions;
|
682 | try {
|
683 | execOptions = DefaultExecutionOptions.create(options, this, rowCallback);
|
684 | }
|
685 | catch (e) {
|
686 | return callback(e);
|
687 | }
|
688 |
|
689 | let rowLength = 0;
|
690 |
|
691 | const nextPage = () => promiseUtils.toCallback(this._execute(query, params, execOptions), pageCallback);
|
692 |
|
693 | function pageCallback (err, result) {
|
694 | if (err) {
|
695 | return callback(err);
|
696 | }
|
697 | // Next requests in case paging (auto or explicit) is used
|
698 | rowLength += result.rowLength;
|
699 |
|
700 | if (result.rawPageState !== undefined) {
|
701 | // Use new page state as next request page state
|
702 | execOptions.setPageState(result.rawPageState);
|
703 | if (execOptions.isAutoPage()) {
|
704 | // Issue next request for the next page
|
705 | return nextPage();
|
706 | }
|
707 | // Allows for explicit (manual) paging, in case the caller needs it
|
708 | result.nextPage = nextPage;
|
709 | }
|
710 |
|
711 | // Finished auto-paging
|
712 | result.rowLength = rowLength;
|
713 | callback(null, result);
|
714 | }
|
715 |
|
716 | promiseUtils.toCallback(this._execute(query, params, execOptions), pageCallback);
|
717 | };
|
718 |
|
719 | /**
|
720 | * Executes the query and pushes the rows to the result stream as soon as they received.
|
721 | * <p>
|
722 | * The stream is a [ReadableStream]{@linkcode https://nodejs.org/api/stream.html#stream_class_stream_readable} object
|
723 | * that emits rows.
|
724 | * It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).
|
725 | * </p>
|
726 | * <p>
|
727 | * The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple
|
728 | * hosts if needed.
|
729 | * </p>
|
730 | * @param {String} query The query to prepare and execute.
|
731 | * @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
|
732 | * as keys and its value
|
733 | * @param {QueryOptions} [options] The query options.
|
734 | * @param {function} [callback] executes callback(err) after all rows have been received or if there is an error
|
735 | * @returns {ResultStream}
|
736 | */
|
737 | Client.prototype.stream = function (query, params, options, callback) {
|
738 | callback = callback || utils.noop;
|
739 | // NOTE: the nodejs stream maintains yet another internal buffer
|
740 | // we rely on the default stream implementation to keep memory
|
741 | // usage reasonable.
|
742 | const resultStream = new types.ResultStream({ objectMode: 1 });
|
743 | function onFinish(err, result) {
|
744 | if (err) {
|
745 | resultStream.emit('error', err);
|
746 | }
|
747 | if (result && result.nextPage ) {
|
748 | // allows for throttling as per the
|
749 | // default nodejs stream implementation
|
750 | resultStream._valve(function pageValve() {
|
751 | try {
|
752 | result.nextPage();
|
753 | }
|
754 | catch( ex ) {
|
755 | resultStream.emit('error', ex );
|
756 | }
|
757 | });
|
758 | return;
|
759 | }
|
760 | // Explicitly dropping the valve (closure)
|
761 | resultStream._valve(null);
|
762 | resultStream.add(null);
|
763 | callback(err);
|
764 | }
|
765 | let sync = true;
|
766 | this.eachRow(query, params, options, function rowCallback(n, row) {
|
767 | resultStream.add(row);
|
768 | }, function eachRowFinished(err, result) {
|
769 | if (sync) {
|
770 | // Prevent sync callback
|
771 | return setImmediate(function eachRowFinishedImmediate() {
|
772 | onFinish(err, result);
|
773 | });
|
774 | }
|
775 | onFinish(err, result);
|
776 | });
|
777 | sync = false;
|
778 | return resultStream;
|
779 | };
|
780 |
|
781 | /**
|
782 | * Executes batch of queries on an available connection to a host.
|
783 | * <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
|
784 | * @param {Array.<string>|Array.<{query, params}>} queries The queries to execute as an Array of strings or as an array
|
785 | * of object containing the query and params
|
786 | * @param {QueryOptions} [options] The query options.
|
787 | * @param {ResultCallback} [callback] Executes callback(err, result) when the batch was executed
|
788 | */
|
789 | Client.prototype.batch = function (queries, options, callback) {
|
790 | if (!callback && typeof options === 'function') {
|
791 | callback = options;
|
792 | options = null;
|
793 | }
|
794 |
|
795 | return promiseUtils.optionalCallback(this._batch(queries, options), callback);
|
796 | };
|
797 |
|
798 | /**
|
799 | * Async-only version of {@link Client#batch()} .
|
800 | * @param {Array.<string>|Array.<{query, params}>}queries
|
801 | * @param {QueryOptions} options
|
802 | * @returns {Promise<ResultSet>}
|
803 | * @private
|
804 | */
|
805 | Client.prototype._batch = async function (queries, options) {
|
806 | if (!Array.isArray(queries)) {
|
807 | throw new errors.ArgumentError('Queries should be an Array');
|
808 | }
|
809 |
|
810 | if (queries.length === 0) {
|
811 | throw new errors.ArgumentError('Queries array should not be empty');
|
812 | }
|
813 |
|
814 | await this._connect();
|
815 |
|
816 | const execOptions = DefaultExecutionOptions.create(options, this);
|
817 | let queryItems;
|
818 |
|
819 | if (execOptions.isPrepared()) {
|
820 | // use keyspace from query options if protocol supports per-query keyspace, otherwise use connection keyspace.
|
821 | const version = this.controlConnection.protocolVersion;
|
822 | const queryKeyspace = types.protocolVersion.supportsKeyspaceInRequest(version) && options.keyspace || this.keyspace;
|
823 | queryItems = await PrepareHandler.getPreparedMultiple(
|
824 | this, execOptions.getLoadBalancingPolicy(), queries, queryKeyspace);
|
825 | } else {
|
826 | queryItems = new Array(queries.length);
|
827 |
|
828 | for (let i = 0; i < queries.length; i++) {
|
829 | const item = queries[i];
|
830 | if (!item) {
|
831 | throw new errors.ArgumentError(`Invalid query at index ${i}`);
|
832 | }
|
833 |
|
834 | const query = typeof item === 'string' ? item : item.query;
|
835 | if (!query) {
|
836 | throw new errors.ArgumentError(`Invalid query at index ${i}`);
|
837 | }
|
838 |
|
839 | queryItems[i] = { query, params: item.params };
|
840 | }
|
841 | }
|
842 |
|
843 | const request = await this._createBatchRequest(queryItems, execOptions);
|
844 | return await RequestHandler.send(request, execOptions, this);
|
845 | };
|
846 |
|
847 | /**
|
848 | * Gets the host that are replicas of a given token.
|
849 | * @param {String} keyspace
|
850 | * @param {Buffer} token
|
851 | * @returns {Array<Host>}
|
852 | */
|
853 | Client.prototype.getReplicas = function (keyspace, token) {
|
854 | return this.metadata.getReplicas(keyspace, token);
|
855 | };
|
856 |
|
857 | /**
|
858 | * Gets a snapshot containing information on the connections pools held by this Client at the current time.
|
859 | * <p>
|
860 | * The information provided in the returned object only represents the state at the moment this method was called and
|
861 | * it's not maintained in sync with the driver metadata.
|
862 | * </p>
|
863 | * @returns {ClientState} A [ClientState]{@linkcode module:metadata~ClientState} instance.
|
864 | */
|
865 | Client.prototype.getState = function () {
|
866 | return ClientState.from(this);
|
867 | };
|
868 |
|
869 | Client.prototype.log = utils.log;
|
870 |
|
871 | /**
|
872 | * Closes all connections to all hosts.
|
873 | * <p>It returns a <code>Promise</code> when a <code>callback</code> is not provided.</p>
|
874 | * @param {Function} [callback] Optional callback to be invoked when finished closing all connections.
|
875 | */
|
876 | Client.prototype.shutdown = function (callback) {
|
877 | return promiseUtils.optionalCallback(this._shutdown(), callback);
|
878 | };
|
879 |
|
880 | /** @private */
|
881 | Client.prototype._shutdown = async function () {
|
882 | this.log('info', 'Shutting down');
|
883 |
|
884 | if (!this.hosts || !this.connected) {
|
885 | // not initialized
|
886 | this.connected = false;
|
887 | return;
|
888 | }
|
889 |
|
890 | if (this.connecting) {
|
891 | this.log('warning', 'Shutting down while connecting');
|
892 | // wait until finish connecting for easier troubleshooting
|
893 | await promiseUtils.fromEvent(this, 'connected');
|
894 | }
|
895 |
|
896 | this.connected = false;
|
897 | this.isShuttingDown = true;
|
898 | const hosts = this.hosts.values();
|
899 |
|
900 | this.insightsClient.shutdown();
|
901 |
|
902 | // Shutdown the ControlConnection before shutting down the pools
|
903 | this.controlConnection.shutdown();
|
904 | this.options.policies.speculativeExecution.shutdown();
|
905 |
|
906 | if (this.options.requestTracker) {
|
907 | this.options.requestTracker.shutdown();
|
908 | }
|
909 |
|
910 | // go through all the host and shut down their pools
|
911 | await Promise.all(hosts.map(h => h.shutdown(false)));
|
912 | };
|
913 |
|
914 | /**
|
915 | * Waits until that the schema version in all nodes is the same or the waiting time passed.
|
916 | * @param {Connection} connection
|
917 | * @returns {Promise<boolean>}
|
918 | * @ignore
|
919 | */
|
920 | Client.prototype._waitForSchemaAgreement = async function (connection) {
|
921 | if (this.hosts.length === 1) {
|
922 | return true;
|
923 | }
|
924 |
|
925 | const start = process.hrtime();
|
926 | const maxWaitSeconds = this.options.protocolOptions.maxSchemaAgreementWaitSeconds;
|
927 |
|
928 | this.log('info', 'Waiting for schema agreement');
|
929 |
|
930 | let versionsMatch;
|
931 |
|
932 | while (!versionsMatch && process.hrtime(start)[0] < maxWaitSeconds) {
|
933 | versionsMatch = await this.metadata.compareSchemaVersions(connection);
|
934 |
|
935 | if (versionsMatch) {
|
936 | this.log('info', 'Schema versions match');
|
937 | break;
|
938 | }
|
939 |
|
940 | // Let some time pass before the next check
|
941 | await promiseUtils.delay(500);
|
942 | }
|
943 |
|
944 | return versionsMatch;
|
945 | };
|
946 |
|
947 | /**
|
948 | * Waits for schema agreements and schedules schema metadata refresh.
|
949 | * @param {Connection} connection
|
950 | * @param event
|
951 | * @returns {Promise<boolean>}
|
952 | * @ignore
|
953 | * @internal
|
954 | */
|
955 | Client.prototype.handleSchemaAgreementAndRefresh = async function (connection, event) {
|
956 | let agreement = false;
|
957 |
|
958 | try {
|
959 | agreement = await this._waitForSchemaAgreement(connection);
|
960 | } catch (err) {
|
961 | //we issue a warning but we continue with the normal flow
|
962 | this.log('warning', 'There was an error while waiting for the schema agreement between nodes', err);
|
963 | }
|
964 |
|
965 | if (!this.options.isMetadataSyncEnabled) {
|
966 | return agreement;
|
967 | }
|
968 |
|
969 | // Refresh metadata immediately
|
970 | try {
|
971 | await this.controlConnection.handleSchemaChange(event, true);
|
972 | } catch (err) {
|
973 | this.log('warning', 'There was an error while handling schema change', err);
|
974 | }
|
975 |
|
976 | return agreement;
|
977 | };
|
978 |
|
979 | /**
|
980 | * Connects and handles the execution of prepared and simple statements.
|
981 | * @param {string} query
|
982 | * @param {Array} params
|
983 | * @param {ExecutionOptions} execOptions
|
984 | * @returns {Promise<ResultSet>}
|
985 | * @private
|
986 | */
|
987 | Client.prototype._execute = async function (query, params, execOptions) {
|
988 | const version = this.controlConnection.protocolVersion;
|
989 |
|
990 | if (!execOptions.isPrepared() && params && !Array.isArray(params) &&
|
991 | !types.protocolVersion.supportsNamedParameters(version)) {
|
992 | // Only Cassandra 2.1 and above supports named parameters
|
993 | throw new errors.ArgumentError('Named parameters for simple statements are not supported, use prepare flag');
|
994 | }
|
995 |
|
996 | let request;
|
997 |
|
998 | if (!this.connected) {
|
999 | // Micro optimization to avoid an async execution for a simple check
|
1000 | await this._connect();
|
1001 | }
|
1002 |
|
1003 | if (!execOptions.isPrepared()) {
|
1004 | request = await this._createQueryRequest(query, execOptions, params);
|
1005 | } else {
|
1006 | const lbp = execOptions.getLoadBalancingPolicy();
|
1007 |
|
1008 | // Use keyspace from query options if protocol supports per-query keyspace, otherwise use connection keyspace.
|
1009 | const queryKeyspace = types.protocolVersion.supportsKeyspaceInRequest(version) &&
|
1010 | execOptions.getKeyspace() || this.keyspace;
|
1011 |
|
1012 | const { queryId, meta } = await PrepareHandler.getPrepared(this, lbp, query, queryKeyspace);
|
1013 | request = await this._createExecuteRequest(query, queryId, execOptions, params, meta);
|
1014 | }
|
1015 |
|
1016 | return await RequestHandler.send(request, execOptions, this);
|
1017 | };
|
1018 |
|
1019 | /**
|
1020 | * Sets the listeners for the nodes.
|
1021 | * @private
|
1022 | */
|
1023 | Client.prototype._setHostListeners = function () {
|
1024 | function getHostUpListener(emitter, h) {
|
1025 | return () => emitter.emit('hostUp', h);
|
1026 | }
|
1027 |
|
1028 | function getHostDownListener(emitter, h) {
|
1029 | return () => emitter.emit('hostDown', h);
|
1030 | }
|
1031 |
|
1032 | const self = this;
|
1033 |
|
1034 | // Add status listeners when new nodes are added and emit hostAdd
|
1035 | this.hosts.on('add', function hostAddedListener(h) {
|
1036 | h.on('up', getHostUpListener(self, h));
|
1037 | h.on('down', getHostDownListener(self, h));
|
1038 | self.emit('hostAdd', h);
|
1039 | });
|
1040 |
|
1041 | // Remove all listeners and emit hostRemove
|
1042 | this.hosts.on('remove', function hostRemovedListener(h) {
|
1043 | h.removeAllListeners();
|
1044 | self.emit('hostRemove', h);
|
1045 | });
|
1046 |
|
1047 | // Add status listeners for existing hosts
|
1048 | this.hosts.forEach(function (h) {
|
1049 | h.on('up', getHostUpListener(self, h));
|
1050 | h.on('down', getHostDownListener(self, h));
|
1051 | });
|
1052 | };
|
1053 |
|
1054 | /**
|
1055 | * Sets the distance to each host and when warmup is true, creates all connections to local hosts.
|
1056 | * @returns {Promise}
|
1057 | * @private
|
1058 | */
|
1059 | Client.prototype._warmup = function () {
|
1060 | const hosts = this.hosts.values();
|
1061 |
|
1062 | return promiseUtils.times(hosts.length, warmupLimit, async (index) => {
|
1063 | const h = hosts[index];
|
1064 | const distance = this.profileManager.getDistance(h);
|
1065 |
|
1066 | if (distance === types.distance.ignored) {
|
1067 | return;
|
1068 | }
|
1069 |
|
1070 | if (this.options.pooling.warmup && distance === types.distance.local) {
|
1071 | try {
|
1072 | await h.warmupPool(this.keyspace);
|
1073 | } catch (err) {
|
1074 | // An error while trying to create a connection to one of the hosts.
|
1075 | // Warn the user and move on.
|
1076 | this.log('warning', `Connection pool to host ${h.address} could not be created: ${err}`, err);
|
1077 | }
|
1078 | } else {
|
1079 | h.initializePool();
|
1080 | }
|
1081 | });
|
1082 | };
|
1083 |
|
1084 | /**
|
1085 | * @returns {Encoder}
|
1086 | * @private
|
1087 | */
|
1088 | Client.prototype._getEncoder = function () {
|
1089 | const encoder = this.controlConnection.getEncoder();
|
1090 | if (!encoder) {
|
1091 | throw new errors.DriverInternalError('Encoder is not defined');
|
1092 | }
|
1093 | return encoder;
|
1094 | };
|
1095 |
|
1096 | /**
|
1097 | * Returns a BatchRequest instance and fills the routing key information in the provided options.
|
1098 | * @private
|
1099 | */
|
1100 | Client.prototype._createBatchRequest = async function (queryItems, info) {
|
1101 | const firstQuery = queryItems[0];
|
1102 | if (!firstQuery.meta) {
|
1103 | return new requests.BatchRequest(queryItems, info);
|
1104 | }
|
1105 |
|
1106 | await this._setRoutingInfo(info, firstQuery.params, firstQuery.meta);
|
1107 | return new requests.BatchRequest(queryItems, info);
|
1108 | };
|
1109 |
|
1110 | /**
|
1111 | * Returns an ExecuteRequest instance and fills the routing key information in the provided options.
|
1112 | * @private
|
1113 | */
|
1114 | Client.prototype._createExecuteRequest = async function(query, queryId, info, params, meta) {
|
1115 | params = utils.adaptNamedParamsPrepared(params, meta.columns);
|
1116 | await this._setRoutingInfo(info, params, meta);
|
1117 | return new requests.ExecuteRequest(query, queryId, params, info, meta);
|
1118 | };
|
1119 |
|
1120 | /**
|
1121 | * Returns a QueryRequest instance and fills the routing key information in the provided options.
|
1122 | * @private
|
1123 | */
|
1124 | Client.prototype._createQueryRequest = async function (query, execOptions, params) {
|
1125 | await this.metadata.adaptUserHints(this.keyspace, execOptions.getHints());
|
1126 | const paramsInfo = utils.adaptNamedParamsWithHints(params, execOptions);
|
1127 | this._getEncoder().setRoutingKeyFromUser(paramsInfo.params, execOptions, paramsInfo.keyIndexes);
|
1128 |
|
1129 | return new requests.QueryRequest(query, paramsInfo.params, execOptions, paramsInfo.namedParameters);
|
1130 | };
|
1131 |
|
1132 | /**
|
1133 | * Sets the routing key based on the parameter values or the provided routing key components.
|
1134 | * @param {ExecutionOptions} execOptions
|
1135 | * @param {Array} params
|
1136 | * @param meta
|
1137 | * @private
|
1138 | */
|
1139 | Client.prototype._setRoutingInfo = async function (execOptions, params, meta) {
|
1140 | const encoder = this._getEncoder();
|
1141 |
|
1142 | if (!execOptions.getKeyspace() && meta.keyspace) {
|
1143 | execOptions.setKeyspace(meta.keyspace);
|
1144 | }
|
1145 | if (execOptions.getRoutingKey()) {
|
1146 | // Routing information provided by the user
|
1147 | return encoder.setRoutingKeyFromUser(params, execOptions);
|
1148 | }
|
1149 | if (Array.isArray(meta.partitionKeys)) {
|
1150 | // The partition keys are provided as part of the metadata for modern protocol versions
|
1151 | execOptions.setRoutingIndexes(meta.partitionKeys);
|
1152 | return encoder.setRoutingKeyFromMeta(meta, params, execOptions);
|
1153 | }
|
1154 |
|
1155 | // Older versions of the protocol (v3 and below) don't provide routing information
|
1156 | try {
|
1157 | const tableInfo = await this.metadata.getTable(meta.keyspace, meta.table);
|
1158 |
|
1159 | if (!tableInfo) {
|
1160 | // The schema data is not there, maybe it is being recreated, avoid setting the routing information
|
1161 | return;
|
1162 | }
|
1163 |
|
1164 | execOptions.setRoutingIndexes(tableInfo.partitionKeys.map(c => meta.columnsByName[c.name]));
|
1165 | // Skip parsing metadata next time
|
1166 | meta.partitionKeys = execOptions.getRoutingIndexes();
|
1167 | encoder.setRoutingKeyFromMeta(meta, params, execOptions);
|
1168 | } catch (err) {
|
1169 | this.log('warning', util.format('Table %s.%s metadata could not be retrieved', meta.keyspace, meta.table));
|
1170 | }
|
1171 | };
|
1172 |
|
1173 | /**
|
1174 | * Callback used by execution methods.
|
1175 | * @callback ResultCallback
|
1176 | * @param {Error} err Error occurred in the execution of the query.
|
1177 | * @param {ResultSet} [result] Result of the execution of the query.
|
1178 | */
|
1179 |
|
1180 | module.exports = Client;
|