UNPKG

16.6 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17'use strict';
18
19const util = require('util');
20const utils = require('./utils');
21const types = require('./types');
22const errors = require('./errors');
23
24/**
25 * Information on the formatting of the returned rows
26 */
27const resultFlag = {
28 globalTablesSpec: 0x0001,
29 hasMorePages: 0x0002,
30 noMetadata: 0x0004,
31 metadataChanged: 0x0008,
32 continuousPaging: 0x40000000,
33 lastContinuousPage: 0x80000000,
34};
35
36// templates for derived error messages.
37const _writeTimeoutQueryMessage = 'Server timeout during write query at consistency %s (%d peer(s) acknowledged the write over %d required)';
38const _writeTimeoutBatchLogMessage = 'Server timeout during batchlog write at consistency %s (%d peer(s) acknowledged the write over %d required)';
39const _writeFailureMessage = 'Server failure during write query at consistency %s (%d responses were required but only %d replicas responded, %d failed)';
40const _unavailableMessage = 'Not enough replicas available for query at consistency %s (%d required but only %d alive)';
41const _readTimeoutMessage = 'Server timeout during read query at consistency %s (%s)';
42const _readFailureMessage = 'Server failure during read query at consistency %s (%d responses were required but only %d replicas responded, %d failed)';
43
44/**
45 * Buffer forward reader of CQL binary frames
46 * @param {FrameHeader} header
47 * @param {Buffer} body
48 * @param {Number} [offset]
49 */
50class FrameReader {
51
52 /**
53 * Creates a new instance of the reader
54 * @param {FrameHeader} header
55 * @param {Buffer} body
56 * @param {Number} [offset]
57 */
58 constructor(header, body, offset) {
59 this.header = header;
60 this.opcode = header.opcode;
61 this.offset = offset || 0;
62 this.buf = body;
63 }
64
65 remainingLength() {
66 return this.buf.length - this.offset;
67 }
68
69 getBuffer() {
70 return this.buf;
71 }
72
73 /**
74 * Slices the underlining buffer
75 * @param {Number} begin
76 * @param {Number} [end]
77 * @returns {Buffer}
78 */
79 slice(begin, end) {
80 if (typeof end === 'undefined') {
81 end = this.buf.length;
82 }
83 return this.buf.slice(begin, end);
84 }
85
86 /**
87 * Modifies the underlying buffer, it concatenates the given buffer with the original (internalBuffer = concat(bytes, internalBuffer)
88 */
89 unshift(bytes) {
90 if (this.offset > 0) {
91 throw new Error('Can not modify the underlying buffer if already read');
92 }
93 this.buf = Buffer.concat([bytes, this.buf], bytes.length + this.buf.length);
94 }
95
96 /**
97 * Reads any number of bytes and moves the offset.
98 * if length not provided or it's larger than the remaining bytes, reads to end.
99 * @param length
100 * @returns {Buffer}
101 */
102 read(length) {
103 let end = this.buf.length;
104 if (typeof length !== 'undefined' && this.offset + length < this.buf.length) {
105 end = this.offset + length;
106 }
107 const bytes = this.slice(this.offset, end);
108 this.offset = end;
109 return bytes;
110 }
111
112 /**
113 * Moves the reader cursor to the end
114 */
115 toEnd() {
116 this.offset = this.buf.length;
117 }
118
119 /**
120 * Reads a BE Int and moves the offset
121 * @returns {Number}
122 */
123 readInt() {
124 const result = this.buf.readInt32BE(this.offset);
125 this.offset += 4;
126 return result;
127 }
128
129 /** @returns {Number} */
130 readShort() {
131 const result = this.buf.readUInt16BE(this.offset);
132 this.offset += 2;
133 return result;
134 }
135
136 readByte() {
137 const result = this.buf.readUInt8(this.offset);
138 this.offset += 1;
139 return result;
140 }
141
142 readString() {
143 const length = this.readShort();
144 this.checkOffset(length);
145 const result = this.buf.toString('utf8', this.offset, this.offset + length);
146 this.offset += length;
147 return result;
148 }
149
150 /**
151 * Checks that the new length to read is within the range of the buffer length. Throws a RangeError if not.
152 * @param {Number} newLength
153 */
154 checkOffset(newLength) {
155 if (this.offset + newLength > this.buf.length) {
156 const err = new RangeError('Trying to access beyond buffer length');
157 err.expectedLength = newLength;
158 throw err;
159 }
160 }
161
162 /**
163 * Reads a protocol string list
164 * @returns {Array}
165 */
166 readStringList() {
167 const length = this.readShort();
168 const list = new Array(length);
169 for (let i = 0; i < length; i++) {
170 list[i] = this.readString();
171 }
172 return list;
173 }
174
175 /**
176 * Reads the amount of bytes that the field has and returns them (slicing them).
177 * @returns {Buffer}
178 */
179 readBytes() {
180 const length = this.readInt();
181 if (length < 0) {
182 return null;
183 }
184 this.checkOffset(length);
185 return this.read(length);
186 }
187
188 readShortBytes() {
189 const length = this.readShort();
190 if (length < 0) {
191 return null;
192 }
193 this.checkOffset(length);
194 return this.read(length);
195 }
196
197 /**
198 * Reads an associative array of strings as keys and bytes as values
199 * @param {Number} length
200 * @param {Function} keyFn
201 * @param {Function} valueFn
202 * @returns {Object}
203 */
204 readMap(length, keyFn, valueFn) {
205 if (length < 0) {
206 return null;
207 }
208 const map = {};
209 for (let i = 0; i < length; i++) {
210 map[keyFn.call(this)] = valueFn.call(this);
211 }
212 return map;
213 }
214
215 /**
216 * Reads an associative array of strings as keys and string lists as values
217 * @returns {Object}
218 */
219 readStringMultiMap() {
220 //A [short] n, followed by n pair <k><v> where <k> is a
221 //[string] and <v> is a [string[]].
222 const length = this.readShort();
223 if (length < 0) {
224 return null;
225 }
226 const map = {};
227 for (let i = 0; i < length; i++) {
228 map[this.readString()] = this.readStringList();
229 }
230 return map;
231 }
232
233 /**
234 * Reads a data type definition
235 * @returns {{code: Number, info: Object|null}} An array of 2 elements
236 */
237 readType() {
238 let i;
239 const type = {
240 code: this.readShort(),
241 type: null
242 };
243 switch (type.code) {
244 case types.dataTypes.custom:
245 type.info = this.readString();
246 break;
247 case types.dataTypes.list:
248 case types.dataTypes.set:
249 type.info = this.readType();
250 break;
251 case types.dataTypes.map:
252 type.info = [this.readType(), this.readType()];
253 break;
254 case types.dataTypes.udt:
255 type.info = {
256 keyspace: this.readString(),
257 name: this.readString(),
258 fields: new Array(this.readShort())
259 };
260 for (i = 0; i < type.info.fields.length; i++) {
261 type.info.fields[i] = {
262 name: this.readString(),
263 type: this.readType()
264 };
265 }
266 break;
267 case types.dataTypes.tuple:
268 type.info = new Array(this.readShort());
269 for (i = 0; i < type.info.length; i++) {
270 type.info[i] = this.readType();
271 }
272 break;
273 }
274 return type;
275 }
276
277 /**
278 * Reads an Ip address and port
279 * @returns {{address: exports.InetAddress, port: Number}}
280 */
281 readInet() {
282 const length = this.readByte();
283 const address = this.read(length);
284 return { address: new types.InetAddress(address), port: this.readInt() };
285 }
286
287 /**
288 * Reads an Ip address
289 * @returns {InetAddress}
290 */
291 readInetAddress() {
292 const length = this.readByte();
293 return new types.InetAddress(this.read(length));
294 }
295
296 /**
297 * Reads the body bytes corresponding to the flags
298 * @returns {{traceId: Uuid, warnings: Array, customPayload}}
299 * @throws {RangeError}
300 */
301 readFlagsInfo() {
302 if (this.header.flags === 0) {
303 return utils.emptyObject;
304 }
305 const result = {};
306 if (this.header.flags & types.frameFlags.tracing) {
307 this.checkOffset(16);
308 result.traceId = new types.Uuid(utils.copyBuffer(this.read(16)));
309 }
310 if (this.header.flags & types.frameFlags.warning) {
311 result.warnings = this.readStringList();
312 }
313 if (this.header.flags & types.frameFlags.customPayload) {
314 // Custom payload is a Map<string, Buffer>
315 result.customPayload = this.readMap(this.readShort(), this.readString, this.readBytes);
316 }
317 return result;
318 }
319
320 /**
321 * Reads the metadata from a row or a prepared result response
322 * @param {Number} kind
323 * @returns {Object}
324 * @throws {RangeError}
325 */
326 readMetadata(kind) {
327 let i;
328 //Determines if its a prepared metadata
329 const isPrepared = (kind === types.resultKind.prepared);
330 const meta = {};
331 if (types.protocolVersion.supportsResultMetadataId(this.header.version) && isPrepared) {
332 meta.resultId = utils.copyBuffer(this.readShortBytes());
333 }
334 //as used in Rows and Prepared responses
335 const flags = this.readInt();
336 const columnLength = this.readInt();
337 if (types.protocolVersion.supportsPreparedPartitionKey(this.header.version) && isPrepared) {
338 //read the pk columns
339 meta.partitionKeys = new Array(this.readInt());
340 for (i = 0; i < meta.partitionKeys.length; i++) {
341 meta.partitionKeys[i] = this.readShort();
342 }
343 }
344 if (flags & resultFlag.hasMorePages) {
345 meta.pageState = utils.copyBuffer(this.readBytes());
346 }
347 if (flags & resultFlag.metadataChanged) {
348 meta.newResultId = utils.copyBuffer(this.readShortBytes());
349 }
350 if (flags & resultFlag.continuousPaging) {
351 meta.continuousPageIndex = this.readInt();
352 meta.lastContinuousPage = !!(flags & resultFlag.lastContinuousPage);
353 }
354 if (flags & resultFlag.globalTablesSpec) {
355 meta.global_tables_spec = true;
356 meta.keyspace = this.readString();
357 meta.table = this.readString();
358 }
359 meta.columns = new Array(columnLength);
360 meta.columnsByName = utils.emptyObject;
361 if (isPrepared) {
362 //for prepared metadata, we will need a index of the columns (param) by name
363 meta.columnsByName = {};
364 }
365 for (i = 0; i < columnLength; i++) {
366 const col = {};
367 if (!meta.global_tables_spec) {
368 col.ksname = this.readString();
369 col.tablename = this.readString();
370 }
371 col.name = this.readString();
372 col.type = this.readType();
373 meta.columns[i] = col;
374 if (isPrepared) {
375 meta.columnsByName[col.name] = i;
376 }
377 }
378 return meta;
379 }
380
381 /**
382 * Reads the error from the frame
383 * @throws {RangeError}
384 * @returns {ResponseError}
385 */
386 readError() {
387 const code = this.readInt();
388 const message = this.readString();
389 const err = new errors.ResponseError(code, message);
390 //read extra info
391 switch (code) {
392 case types.responseErrorCodes.unavailableException:
393 err.consistencies = this.readShort();
394 err.required = this.readInt();
395 err.alive = this.readInt();
396 err.message = util.format(_unavailableMessage, types.consistencyToString[err.consistencies], err.required, err.alive);
397 break;
398 case types.responseErrorCodes.readTimeout:
399 case types.responseErrorCodes.readFailure:
400 err.consistencies = this.readShort();
401 err.received = this.readInt();
402 err.blockFor = this.readInt();
403 if (code === types.responseErrorCodes.readFailure) {
404 if (types.protocolVersion.supportsFailureReasonMap(this.header.version)) {
405 err.failures = this.readInt();
406 err.reasons = this.readMap(err.failures, this.readInetAddress, this.readShort);
407 }
408 else {
409 err.failures = this.readInt();
410 }
411 }
412 err.isDataPresent = this.readByte();
413 if (code === types.responseErrorCodes.readTimeout) {
414 let details;
415 if (err.received < err.blockFor) {
416 details = util.format('%d replica(s) responded over %d required', err.received, err.blockFor);
417 }
418 else if (!err.isDataPresent) {
419 details = 'the replica queried for the data didn\'t respond';
420 }
421 else {
422 details = 'timeout while waiting for repair of inconsistent replica';
423 }
424 err.message = util.format(_readTimeoutMessage, types.consistencyToString[err.consistencies], details);
425 }
426 else {
427 err.message = util.format(_readFailureMessage, types.consistencyToString[err.consistencies], err.blockFor, err.received, err.failures);
428 }
429 break;
430 case types.responseErrorCodes.writeTimeout:
431 case types.responseErrorCodes.writeFailure:
432 err.consistencies = this.readShort();
433 err.received = this.readInt();
434 err.blockFor = this.readInt();
435 if (code === types.responseErrorCodes.writeFailure) {
436 if (types.protocolVersion.supportsFailureReasonMap(this.header.version)) {
437 err.failures = this.readInt();
438 err.reasons = this.readMap(err.failures, this.readInetAddress, this.readShort);
439 }
440 else {
441 err.failures = this.readInt();
442 }
443 }
444 err.writeType = this.readString();
445 if (code === types.responseErrorCodes.writeTimeout) {
446 const template = err.writeType === 'BATCH_LOG' ? _writeTimeoutBatchLogMessage : _writeTimeoutQueryMessage;
447 err.message = util.format(template, types.consistencyToString[err.consistencies], err.received, err.blockFor);
448 }
449 else {
450 err.message = util.format(_writeFailureMessage, types.consistencyToString[err.consistencies], err.blockFor, err.received, err.failures);
451 }
452 break;
453 case types.responseErrorCodes.unprepared:
454 err.queryId = utils.copyBuffer(this.readShortBytes());
455 break;
456 case types.responseErrorCodes.functionFailure:
457 err.keyspace = this.readString();
458 err.functionName = this.readString();
459 err.argTypes = this.readStringList();
460 break;
461 case types.responseErrorCodes.alreadyExists: {
462 err.keyspace = this.readString();
463 const table = this.readString();
464 if (table.length > 0) {
465 err.table = table;
466 }
467 break;
468 }
469 }
470 return err;
471 }
472
473 /**
474 * Reads an event from Cassandra and returns the detail
475 * @returns {{eventType: String, inet: {address: Buffer, port: Number}}, *}
476 */
477 readEvent() {
478 const eventType = this.readString();
479 switch (eventType) {
480 case types.protocolEvents.topologyChange:
481 return {
482 added: this.readString() === 'NEW_NODE',
483 inet: this.readInet(),
484 eventType: eventType
485 };
486 case types.protocolEvents.statusChange:
487 return {
488 up: this.readString() === 'UP',
489 inet: this.readInet(),
490 eventType: eventType
491 };
492 case types.protocolEvents.schemaChange:
493 return this.parseSchemaChange();
494 }
495 //Forward compatibility
496 return { eventType: eventType };
497 }
498
499 parseSchemaChange() {
500 let result;
501 if (!types.protocolVersion.supportsSchemaChangeFullMetadata(this.header.version)) {
502 //v1/v2: 3 strings, the table value can be empty
503 result = {
504 eventType: types.protocolEvents.schemaChange,
505 schemaChangeType: this.readString(),
506 keyspace: this.readString(),
507 table: this.readString()
508 };
509 result.isKeyspace = !result.table;
510 return result;
511 }
512 //v3+: 3 or 4 strings: change_type, target, keyspace and (table, type, functionName or aggregate)
513 result = {
514 eventType: types.protocolEvents.schemaChange,
515 schemaChangeType: this.readString(),
516 target: this.readString(),
517 keyspace: this.readString(),
518 table: null,
519 udt: null,
520 signature: null
521 };
522 result.isKeyspace = result.target === 'KEYSPACE';
523 switch (result.target) {
524 case 'TABLE':
525 result.table = this.readString();
526 break;
527 case 'TYPE':
528 result.udt = this.readString();
529 break;
530 case 'FUNCTION':
531 result.functionName = this.readString();
532 result.signature = this.readStringList();
533 break;
534 case 'AGGREGATE':
535 result.aggregate = this.readString();
536 result.signature = this.readStringList();
537 }
538 return result;
539 }
540}
541
542module.exports = { FrameReader };