UNPKG

17.7 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16'use strict';
17const util = require('util');
18
19const { FrameWriter } = require('./writers');
20const types = require('./types');
21const utils = require('./utils');
22const { ExecutionOptions } = require('./execution-options');
23const packageInfo = require('../package.json');
24
25/**
26 * Options for the execution of the query / prepared statement
27 * @private
28 */
29const queryFlag = {
30 values: 0x01,
31 skipMetadata: 0x02,
32 pageSize: 0x04,
33 withPagingState: 0x08,
34 withSerialConsistency: 0x10,
35 withDefaultTimestamp: 0x20,
36 withNameForValues: 0x40,
37 withKeyspace: 0x80,
38 withPageSizeBytes: 0x40000000,
39 withContinuousPaging: 0x80000000
40};
41
42/**
43 * Options for the execution of a batch request from protocol v3 and above
44 * @private
45 */
46const batchFlag = {
47 withSerialConsistency: 0x10,
48 withDefaultTimestamp: 0x20,
49 withNameForValues: 0x40,
50 withKeyspace: 0x80
51};
52
53/**
54 * Options for execution of a prepare request from protocol DSE_V2 and above
55 * @private
56 */
57const prepareFlag = {
58 withKeyspace: 0x01
59};
60
61const batchType = {
62 logged: 0,
63 unlogged: 1,
64 counter: 2
65};
66
67/**
68 * Abstract class Request
69 */
70class Request {
71 constructor() {
72 this.length = 0;
73 }
74
75 /**
76 * @abstract
77 * @param {Encoder} encoder
78 * @param {Number} streamId
79 * @throws {TypeError}
80 * @returns {Buffer}
81 */
82 write(encoder, streamId) {
83 throw new Error('Method must be implemented');
84 }
85
86 /**
87 * Creates a new instance using the same constructor as the current instance, copying the properties.
88 * @return {Request}
89 */
90 clone() {
91 const newRequest = new (this.constructor)();
92 const keysArray = Object.keys(this);
93 for (let i = 0; i < keysArray.length; i++) {
94 const key = keysArray[i];
95 newRequest[key] = this[key];
96 }
97 return newRequest;
98 }
99}
100
101/**
102 * Writes a execute query (given a prepared queryId)
103 * @param {String} query
104 * @param {Buffer} queryId
105 * @param {Array} params
106 * @param options
107 */
108class ExecuteRequest extends Request {
109 /**
110 * @param {String} query
111 * @param queryId
112 * @param params
113 * @param {ExecutionOptions} execOptions
114 * @param meta
115 */
116 constructor(query, queryId, params, execOptions, meta) {
117 super();
118
119 this.query = query;
120 this.queryId = queryId;
121 this.params = params;
122 this.meta = meta;
123 this.options = execOptions || ExecutionOptions.empty();
124 this.consistency = this.options.getConsistency() || types.consistencies.one;
125 // Only QUERY request parameters are encoded as named parameters
126 // EXECUTE request parameters are always encoded as positional parameters
127 this.namedParameters = false;
128 }
129
130 getParamType(index) {
131 const columnInfo = this.meta.columns[index];
132 return columnInfo ? columnInfo.type : null;
133 }
134
135 write(encoder, streamId) {
136 //v1: <queryId>
137 // <n><value_1>....<value_n><consistency>
138 //v2: <queryId>
139 // <consistency><flags>[<n><value_1>...<value_n>][<result_page_size>][<paging_state>][<serial_consistency>]
140 //v3: <queryId>
141 // <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
142 const frameWriter = new FrameWriter(types.opcodes.execute);
143 let headerFlags = this.options.isQueryTracing() ? types.frameFlags.tracing : 0;
144 if (this.options.getCustomPayload()) {
145 //The body may contain the custom payload
146 headerFlags |= types.frameFlags.customPayload;
147 frameWriter.writeCustomPayload(this.options.getCustomPayload());
148 }
149 frameWriter.writeShortBytes(this.queryId);
150 if(types.protocolVersion.supportsResultMetadataId(encoder.protocolVersion)) {
151 frameWriter.writeShortBytes(this.meta.resultId);
152 }
153 this.writeQueryParameters(frameWriter, encoder);
154
155 // Record the length of the body of the request before writing it
156 this.length = frameWriter.bodyLength;
157
158 return frameWriter.write(encoder.protocolVersion, streamId, headerFlags);
159 }
160
161 /**
162 * Writes v1 and v2 execute query parameters
163 * @param {FrameWriter} frameWriter
164 * @param {Encoder} encoder
165 * @param {Boolean} [isQuery] True if query, otherwise assumed to be execute request.
166 */
167 writeQueryParameters(frameWriter, encoder, isQuery) {
168 //v1: <n><value_1>....<value_n><consistency>
169 //v2: <consistency><flags>[<n><value_1>...<value_n>][<result_page_size>][<paging_state>][<serial_consistency>]
170 //v3: <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
171 //dse_v1: <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>]
172 // [<serial_consistency>][<timestamp>][continuous_paging_options]
173 //dse_v2: <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>]
174 // [<serial_consistency>][<timestamp>][keyspace][continuous_paging_options]
175 let flags = 0;
176
177 const timestamp = this.options.getOrGenerateTimestamp();
178
179 if (types.protocolVersion.supportsPaging(encoder.protocolVersion)) {
180 flags |= (this.params && this.params.length) ? queryFlag.values : 0;
181 flags |= (this.options.getFetchSize() > 0) ? queryFlag.pageSize : 0;
182 flags |= this.options.getPageState() ? queryFlag.withPagingState : 0;
183 flags |= this.options.getSerialConsistency() ? queryFlag.withSerialConsistency : 0;
184 flags |= timestamp !== null && timestamp !== undefined ? queryFlag.withDefaultTimestamp : 0;
185 flags |= this.namedParameters ? queryFlag.withNameForValues : 0;
186
187 // Don't inject keyspace for EXECUTE requests as inherited from prepared statement.
188 const supportsKeyspace = isQuery && types.protocolVersion.supportsKeyspaceInRequest(encoder.protocolVersion);
189 flags |= supportsKeyspace && this.options.getKeyspace() ? queryFlag.withKeyspace : 0;
190
191 frameWriter.writeShort(this.consistency);
192 if (types.protocolVersion.uses4BytesQueryFlags(encoder.protocolVersion)) {
193 frameWriter.writeInt(flags);
194 }
195 else {
196 frameWriter.writeByte(flags);
197 }
198 }
199
200 if (this.params && this.params.length) {
201 frameWriter.writeShort(this.params.length);
202 for (let i = 0; i < this.params.length; i++) {
203 let paramValue = this.params[i];
204 if (flags & queryFlag.withNameForValues) {
205 //parameter is composed by name / value
206 frameWriter.writeString(paramValue.name);
207 paramValue = paramValue.value;
208 }
209 frameWriter.writeBytes(encoder.encode(paramValue, this.getParamType(i)));
210 }
211 }
212
213 if (!types.protocolVersion.supportsPaging(encoder.protocolVersion)) {
214 if (!this.params || !this.params.length) {
215 //zero parameters
216 frameWriter.writeShort(0);
217 }
218 frameWriter.writeShort(this.consistency);
219 return;
220 }
221 if (flags & queryFlag.pageSize) {
222 frameWriter.writeInt(this.options.getFetchSize());
223 }
224 if (flags & queryFlag.withPagingState) {
225 frameWriter.writeBytes(this.options.getPageState());
226 }
227 if (flags & queryFlag.withSerialConsistency) {
228 frameWriter.writeShort(this.options.getSerialConsistency());
229 }
230 if (flags & queryFlag.withDefaultTimestamp) {
231 frameWriter.writeLong(timestamp);
232 }
233 if (flags & queryFlag.withKeyspace) {
234 frameWriter.writeString(this.options.getKeyspace());
235 }
236 }
237}
238
239class QueryRequest extends ExecuteRequest {
240 /**
241 * @param {String} query
242 * @param params
243 * @param {ExecutionOptions} [execOptions]
244 * @param {Boolean} [namedParameters]
245 */
246 constructor(query, params, execOptions, namedParameters) {
247 super(query, null, params, execOptions, null);
248 this.hints = this.options.getHints() || utils.emptyArray;
249 this.namedParameters = namedParameters;
250 }
251
252 getParamType(index) {
253 return this.hints[index];
254 }
255
256 write(encoder, streamId) {
257 //v1: <query><consistency>
258 //v2: <query>
259 // <consistency><flags>[<n><value_1>...<value_n>][<result_page_size>][<paging_state>][<serial_consistency>]
260 //v3: <query>
261 // <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
262 const frameWriter = new FrameWriter(types.opcodes.query);
263 let headerFlags = this.options.isQueryTracing() ? types.frameFlags.tracing : 0;
264 if (this.options.getCustomPayload()) {
265 //The body may contain the custom payload
266 headerFlags |= types.frameFlags.customPayload;
267 frameWriter.writeCustomPayload(this.options.getCustomPayload());
268 }
269
270 frameWriter.writeLString(this.query);
271
272 if (!types.protocolVersion.supportsPaging(encoder.protocolVersion)) {
273 frameWriter.writeShort(this.consistency);
274 } else {
275 //Use the same fields as the execute writer
276 this.writeQueryParameters(frameWriter, encoder, true);
277 }
278
279 // Record the length of the body of the request before writing it
280 this.length = frameWriter.bodyLength;
281
282 return frameWriter.write(encoder.protocolVersion, streamId, headerFlags);
283 }
284}
285
286class PrepareRequest extends Request {
287 constructor(query, keyspace) {
288 super();
289 this.query = query;
290 this.keyspace = keyspace;
291 }
292
293 write(encoder, streamId) {
294 const frameWriter = new FrameWriter(types.opcodes.prepare);
295 frameWriter.writeLString(this.query);
296 if (types.protocolVersion.supportsPrepareFlags(encoder.protocolVersion)) {
297 const flags = this.keyspace && types.protocolVersion.supportsKeyspaceInRequest(encoder.protocolVersion) ? prepareFlag.withKeyspace : 0;
298 frameWriter.writeInt(flags);
299 if (flags & prepareFlag.withKeyspace) {
300 frameWriter.writeString(this.keyspace);
301 }
302 }
303 return frameWriter.write(encoder.protocolVersion, streamId);
304 }
305}
306
307class StartupRequest extends Request {
308
309 /**
310 * Creates a new instance of {@link StartupRequest}.
311 * @param {Object} [options]
312 * @param [options.cqlVersion]
313 * @param [options.noCompact]
314 * @param [options.clientId]
315 * @param [options.applicationName]
316 * @param [options.applicationVersion]
317 */
318 constructor(options) {
319 super();
320 this.options = options || {};
321 }
322
323 write(encoder, streamId) {
324 const frameWriter = new FrameWriter(types.opcodes.startup);
325
326 const startupOptions = {
327 CQL_VERSION: this.options.cqlVersion || '3.0.0',
328 DRIVER_NAME: packageInfo.description,
329 DRIVER_VERSION: packageInfo.version
330 };
331
332 if(this.options.noCompact) {
333 startupOptions['NO_COMPACT'] = 'true';
334 }
335
336 if (this.options.clientId) {
337 startupOptions['CLIENT_ID'] = this.options.clientId.toString();
338 }
339
340 if (this.options.applicationName) {
341 startupOptions['APPLICATION_NAME'] = this.options.applicationName;
342 }
343
344 if (this.options.applicationVersion) {
345 startupOptions['APPLICATION_VERSION'] = this.options.applicationVersion;
346 }
347
348 frameWriter.writeStringMap(startupOptions);
349 return frameWriter.write(encoder.protocolVersion, streamId);
350 }
351}
352
353class RegisterRequest extends Request {
354 constructor(events) {
355 super();
356 this.events = events;
357 }
358
359 write(encoder, streamId) {
360 const frameWriter = new FrameWriter(types.opcodes.register);
361 frameWriter.writeStringList(this.events);
362 return frameWriter.write(encoder.protocolVersion, streamId);
363 }
364}
365
366/**
367 * Represents an AUTH_RESPONSE request
368 * @param {Buffer} token
369 */
370class AuthResponseRequest extends Request {
371 constructor(token) {
372 super();
373 this.token = token;
374 }
375
376 write(encoder, streamId) {
377 const frameWriter = new FrameWriter(types.opcodes.authResponse);
378 frameWriter.writeBytes(this.token);
379 return frameWriter.write(encoder.protocolVersion, streamId);
380 }
381}
382
383/**
384 * Represents a protocol v1 CREDENTIALS request message
385 */
386class CredentialsRequest extends Request {
387 constructor(username, password) {
388 super();
389 this.username = username;
390 this.password = password;
391 }
392
393 write(encoder, streamId) {
394 const frameWriter = new FrameWriter(types.opcodes.credentials);
395 frameWriter.writeStringMap({ username:this.username, password:this.password });
396 return frameWriter.write(encoder.protocolVersion, streamId);
397 }
398}
399
400class BatchRequest extends Request {
401 /**
402 * Creates a new instance of BatchRequest.
403 * @param {Array.<{query, params, [info]}>} queries Array of objects with the properties query and params
404 * @param {ExecutionOptions} execOptions
405 */
406 constructor(queries, execOptions) {
407 super();
408 this.queries = queries;
409 this.options = execOptions;
410 this.hints = execOptions.getHints() || utils.emptyArray;
411 this.type = batchType.logged;
412
413 if (execOptions.isBatchCounter()) {
414 this.type = batchType.counter;
415 } else if (!execOptions.isBatchLogged()) {
416 this.type = batchType.unlogged;
417 }
418 }
419
420 /**
421 * Writes a batch request
422 */
423 write(encoder, streamId) {
424 //v2: <type><n><query_1>...<query_n><consistency>
425 //v3: <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
426 //dseV1+: similar to v3/v4, flags is an int instead of a byte
427 if (!this.queries || !(this.queries.length > 0)) {
428 throw new TypeError(util.format('Invalid queries provided %s', this.queries));
429 }
430 const frameWriter = new FrameWriter(types.opcodes.batch);
431 let headerFlags = this.options.isQueryTracing() ? types.frameFlags.tracing : 0;
432 if (this.options.getCustomPayload()) {
433 //The body may contain the custom payload
434 headerFlags |= types.frameFlags.customPayload;
435 frameWriter.writeCustomPayload(this.options.getCustomPayload());
436 }
437 frameWriter.writeByte(this.type);
438 frameWriter.writeShort(this.queries.length);
439 const self = this;
440 this.queries.forEach(function eachQuery(item, i) {
441 const hints = self.hints[i];
442 const params = item.params || utils.emptyArray;
443 let getParamType;
444 if (item.queryId) {
445 // Contains prepared queries
446 frameWriter.writeByte(1);
447 frameWriter.writeShortBytes(item.queryId);
448 getParamType = i => item.meta.columns[i].type;
449 }
450 else {
451 // Contains string queries
452 frameWriter.writeByte(0);
453 frameWriter.writeLString(item.query);
454 getParamType = hints ? (i => hints[i]) : (() => null);
455 }
456
457 frameWriter.writeShort(params.length);
458 params.forEach((param, index) => frameWriter.writeBytes(encoder.encode(param, getParamType(index))));
459 }, this);
460
461 frameWriter.writeShort(this.options.getConsistency());
462
463 if (types.protocolVersion.supportsTimestamp(encoder.protocolVersion)) {
464 // Batch flags
465 let flags = this.options.getSerialConsistency() ? batchFlag.withSerialConsistency : 0;
466 const timestamp = this.options.getOrGenerateTimestamp();
467 flags |= timestamp !== null && timestamp !== undefined ? batchFlag.withDefaultTimestamp : 0;
468
469 flags |= this.options.getKeyspace() && types.protocolVersion.supportsKeyspaceInRequest(encoder.protocolVersion)
470 ? batchFlag.withKeyspace : 0;
471
472 if (types.protocolVersion.uses4BytesQueryFlags(encoder.protocolVersion)) {
473 frameWriter.writeInt(flags);
474 }
475 else {
476 frameWriter.writeByte(flags);
477 }
478
479 if (flags & batchFlag.withSerialConsistency) {
480 frameWriter.writeShort(this.options.getSerialConsistency());
481 }
482
483 if (flags & batchFlag.withDefaultTimestamp) {
484 frameWriter.writeLong(timestamp);
485 }
486
487 if (flags & batchFlag.withKeyspace) {
488 frameWriter.writeString(this.options.getKeyspace());
489 }
490 }
491
492 // Set the length of the body of the request before writing it
493 this.length = frameWriter.bodyLength;
494
495 return frameWriter.write(encoder.protocolVersion, streamId, headerFlags);
496 }
497
498 clone() {
499 return new BatchRequest(this.queries, this.options);
500 }
501}
502
503function CancelRequest(operationId) {
504 this.streamId = null;
505 this.operationId = operationId;
506}
507
508util.inherits(CancelRequest, Request);
509
510CancelRequest.prototype.write = function (encoder, streamId) {
511 const frameWriter = new FrameWriter(types.opcodes.cancel);
512 frameWriter.writeInt(1);
513 frameWriter.writeInt(this.operationId);
514 return frameWriter.write(encoder.protocolVersion, streamId);
515};
516
517class OptionsRequest extends Request {
518
519 write(encoder, streamId) {
520 const frameWriter = new FrameWriter(types.opcodes.options);
521 return frameWriter.write(encoder.protocolVersion, streamId, 0);
522 }
523
524 clone() {
525 // since options has no unique state, simply return self.
526 return this;
527 }
528}
529
530const options = new OptionsRequest();
531
532exports.AuthResponseRequest = AuthResponseRequest;
533exports.BatchRequest = BatchRequest;
534exports.CancelRequest = CancelRequest;
535exports.CredentialsRequest = CredentialsRequest;
536exports.ExecuteRequest = ExecuteRequest;
537exports.PrepareRequest = PrepareRequest;
538exports.QueryRequest = QueryRequest;
539exports.Request = Request;
540exports.RegisterRequest = RegisterRequest;
541exports.StartupRequest = StartupRequest;
542exports.options = options;