1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 | 'use strict';
|
18 |
|
19 | const util = require('util');
|
20 | const utils = require('./utils');
|
21 | const types = require('./types');
|
22 | const errors = require('./errors');
|
23 |
|
24 |
|
25 |
|
26 |
|
27 | const resultFlag = {
|
28 | globalTablesSpec: 0x0001,
|
29 | hasMorePages: 0x0002,
|
30 | noMetadata: 0x0004,
|
31 | metadataChanged: 0x0008,
|
32 | continuousPaging: 0x40000000,
|
33 | lastContinuousPage: 0x80000000,
|
34 | };
|
35 |
|
36 |
|
37 | const _writeTimeoutQueryMessage = 'Server timeout during write query at consistency %s (%d peer(s) acknowledged the write over %d required)';
|
38 | const _writeTimeoutBatchLogMessage = 'Server timeout during batchlog write at consistency %s (%d peer(s) acknowledged the write over %d required)';
|
39 | const _writeFailureMessage = 'Server failure during write query at consistency %s (%d responses were required but only %d replicas responded, %d failed)';
|
40 | const _unavailableMessage = 'Not enough replicas available for query at consistency %s (%d required but only %d alive)';
|
41 | const _readTimeoutMessage = 'Server timeout during read query at consistency %s (%s)';
|
42 | const _readFailureMessage = 'Server failure during read query at consistency %s (%d responses were required but only %d replicas responded, %d failed)';
|
43 |
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 | class FrameReader {
|
51 |
|
52 | |
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | constructor(header, body, offset) {
|
59 | this.header = header;
|
60 | this.opcode = header.opcode;
|
61 | this.offset = offset || 0;
|
62 | this.buf = body;
|
63 | }
|
64 |
|
65 | remainingLength() {
|
66 | return this.buf.length - this.offset;
|
67 | }
|
68 |
|
69 | getBuffer() {
|
70 | return this.buf;
|
71 | }
|
72 |
|
73 | |
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 | slice(begin, end) {
|
80 | if (typeof end === 'undefined') {
|
81 | end = this.buf.length;
|
82 | }
|
83 | return this.buf.slice(begin, end);
|
84 | }
|
85 |
|
86 | |
87 |
|
88 |
|
89 | unshift(bytes) {
|
90 | if (this.offset > 0) {
|
91 | throw new Error('Can not modify the underlying buffer if already read');
|
92 | }
|
93 | this.buf = Buffer.concat([bytes, this.buf], bytes.length + this.buf.length);
|
94 | }
|
95 |
|
96 | |
97 |
|
98 |
|
99 |
|
100 |
|
101 |
|
102 | read(length) {
|
103 | let end = this.buf.length;
|
104 | if (typeof length !== 'undefined' && this.offset + length < this.buf.length) {
|
105 | end = this.offset + length;
|
106 | }
|
107 | const bytes = this.slice(this.offset, end);
|
108 | this.offset = end;
|
109 | return bytes;
|
110 | }
|
111 |
|
112 | |
113 |
|
114 |
|
115 | toEnd() {
|
116 | this.offset = this.buf.length;
|
117 | }
|
118 |
|
119 | |
120 |
|
121 |
|
122 |
|
123 | readInt() {
|
124 | const result = this.buf.readInt32BE(this.offset);
|
125 | this.offset += 4;
|
126 | return result;
|
127 | }
|
128 |
|
129 |
|
130 | readShort() {
|
131 | const result = this.buf.readUInt16BE(this.offset);
|
132 | this.offset += 2;
|
133 | return result;
|
134 | }
|
135 |
|
136 | readByte() {
|
137 | const result = this.buf.readUInt8(this.offset);
|
138 | this.offset += 1;
|
139 | return result;
|
140 | }
|
141 |
|
142 | readString() {
|
143 | const length = this.readShort();
|
144 | this.checkOffset(length);
|
145 | const result = this.buf.toString('utf8', this.offset, this.offset + length);
|
146 | this.offset += length;
|
147 | return result;
|
148 | }
|
149 |
|
150 | |
151 |
|
152 |
|
153 |
|
154 | checkOffset(newLength) {
|
155 | if (this.offset + newLength > this.buf.length) {
|
156 | const err = new RangeError('Trying to access beyond buffer length');
|
157 | err.expectedLength = newLength;
|
158 | throw err;
|
159 | }
|
160 | }
|
161 |
|
162 | |
163 |
|
164 |
|
165 |
|
166 | readStringList() {
|
167 | const length = this.readShort();
|
168 | const list = new Array(length);
|
169 | for (let i = 0; i < length; i++) {
|
170 | list[i] = this.readString();
|
171 | }
|
172 | return list;
|
173 | }
|
174 |
|
175 | |
176 |
|
177 |
|
178 |
|
179 | readBytes() {
|
180 | const length = this.readInt();
|
181 | if (length < 0) {
|
182 | return null;
|
183 | }
|
184 | this.checkOffset(length);
|
185 | return this.read(length);
|
186 | }
|
187 |
|
188 | readShortBytes() {
|
189 | const length = this.readShort();
|
190 | if (length < 0) {
|
191 | return null;
|
192 | }
|
193 | this.checkOffset(length);
|
194 | return this.read(length);
|
195 | }
|
196 |
|
197 | |
198 |
|
199 |
|
200 |
|
201 |
|
202 |
|
203 |
|
204 | readMap(length, keyFn, valueFn) {
|
205 | if (length < 0) {
|
206 | return null;
|
207 | }
|
208 | const map = {};
|
209 | for (let i = 0; i < length; i++) {
|
210 | map[keyFn.call(this)] = valueFn.call(this);
|
211 | }
|
212 | return map;
|
213 | }
|
214 |
|
215 | |
216 |
|
217 |
|
218 |
|
219 | readStringMultiMap() {
|
220 |
|
221 |
|
222 | const length = this.readShort();
|
223 | if (length < 0) {
|
224 | return null;
|
225 | }
|
226 | const map = {};
|
227 | for (let i = 0; i < length; i++) {
|
228 | map[this.readString()] = this.readStringList();
|
229 | }
|
230 | return map;
|
231 | }
|
232 |
|
233 | |
234 |
|
235 |
|
236 |
|
237 | readType() {
|
238 | let i;
|
239 | const type = {
|
240 | code: this.readShort(),
|
241 | type: null
|
242 | };
|
243 | switch (type.code) {
|
244 | case types.dataTypes.custom:
|
245 | type.info = this.readString();
|
246 | break;
|
247 | case types.dataTypes.list:
|
248 | case types.dataTypes.set:
|
249 | type.info = this.readType();
|
250 | break;
|
251 | case types.dataTypes.map:
|
252 | type.info = [this.readType(), this.readType()];
|
253 | break;
|
254 | case types.dataTypes.udt:
|
255 | type.info = {
|
256 | keyspace: this.readString(),
|
257 | name: this.readString(),
|
258 | fields: new Array(this.readShort())
|
259 | };
|
260 | for (i = 0; i < type.info.fields.length; i++) {
|
261 | type.info.fields[i] = {
|
262 | name: this.readString(),
|
263 | type: this.readType()
|
264 | };
|
265 | }
|
266 | break;
|
267 | case types.dataTypes.tuple:
|
268 | type.info = new Array(this.readShort());
|
269 | for (i = 0; i < type.info.length; i++) {
|
270 | type.info[i] = this.readType();
|
271 | }
|
272 | break;
|
273 | }
|
274 | return type;
|
275 | }
|
276 |
|
277 | |
278 |
|
279 |
|
280 |
|
281 | readInet() {
|
282 | const length = this.readByte();
|
283 | const address = this.read(length);
|
284 | return { address: new types.InetAddress(address), port: this.readInt() };
|
285 | }
|
286 |
|
287 | |
288 |
|
289 |
|
290 |
|
291 | readInetAddress() {
|
292 | const length = this.readByte();
|
293 | return new types.InetAddress(this.read(length));
|
294 | }
|
295 |
|
296 | |
297 |
|
298 |
|
299 |
|
300 |
|
301 | readFlagsInfo() {
|
302 | if (this.header.flags === 0) {
|
303 | return utils.emptyObject;
|
304 | }
|
305 | const result = {};
|
306 | if (this.header.flags & types.frameFlags.tracing) {
|
307 | this.checkOffset(16);
|
308 | result.traceId = new types.Uuid(utils.copyBuffer(this.read(16)));
|
309 | }
|
310 | if (this.header.flags & types.frameFlags.warning) {
|
311 | result.warnings = this.readStringList();
|
312 | }
|
313 | if (this.header.flags & types.frameFlags.customPayload) {
|
314 |
|
315 | result.customPayload = this.readMap(this.readShort(), this.readString, this.readBytes);
|
316 | }
|
317 | return result;
|
318 | }
|
319 |
|
320 | |
321 |
|
322 |
|
323 |
|
324 |
|
325 |
|
326 | readMetadata(kind) {
|
327 | let i;
|
328 |
|
329 | const isPrepared = (kind === types.resultKind.prepared);
|
330 | const meta = {};
|
331 | if (types.protocolVersion.supportsResultMetadataId(this.header.version) && isPrepared) {
|
332 | meta.resultId = utils.copyBuffer(this.readShortBytes());
|
333 | }
|
334 |
|
335 | const flags = this.readInt();
|
336 | const columnLength = this.readInt();
|
337 | if (types.protocolVersion.supportsPreparedPartitionKey(this.header.version) && isPrepared) {
|
338 |
|
339 | meta.partitionKeys = new Array(this.readInt());
|
340 | for (i = 0; i < meta.partitionKeys.length; i++) {
|
341 | meta.partitionKeys[i] = this.readShort();
|
342 | }
|
343 | }
|
344 | if (flags & resultFlag.hasMorePages) {
|
345 | meta.pageState = utils.copyBuffer(this.readBytes());
|
346 | }
|
347 | if (flags & resultFlag.metadataChanged) {
|
348 | meta.newResultId = utils.copyBuffer(this.readShortBytes());
|
349 | }
|
350 | if (flags & resultFlag.continuousPaging) {
|
351 | meta.continuousPageIndex = this.readInt();
|
352 | meta.lastContinuousPage = !!(flags & resultFlag.lastContinuousPage);
|
353 | }
|
354 | if (flags & resultFlag.globalTablesSpec) {
|
355 | meta.global_tables_spec = true;
|
356 | meta.keyspace = this.readString();
|
357 | meta.table = this.readString();
|
358 | }
|
359 | meta.columns = new Array(columnLength);
|
360 | meta.columnsByName = utils.emptyObject;
|
361 | if (isPrepared) {
|
362 |
|
363 | meta.columnsByName = {};
|
364 | }
|
365 | for (i = 0; i < columnLength; i++) {
|
366 | const col = {};
|
367 | if (!meta.global_tables_spec) {
|
368 | col.ksname = this.readString();
|
369 | col.tablename = this.readString();
|
370 | }
|
371 | col.name = this.readString();
|
372 | col.type = this.readType();
|
373 | meta.columns[i] = col;
|
374 | if (isPrepared) {
|
375 | meta.columnsByName[col.name] = i;
|
376 | }
|
377 | }
|
378 | return meta;
|
379 | }
|
380 |
|
381 | |
382 |
|
383 |
|
384 |
|
385 |
|
386 | readError() {
|
387 | const code = this.readInt();
|
388 | const message = this.readString();
|
389 | const err = new errors.ResponseError(code, message);
|
390 |
|
391 | switch (code) {
|
392 | case types.responseErrorCodes.unavailableException:
|
393 | err.consistencies = this.readShort();
|
394 | err.required = this.readInt();
|
395 | err.alive = this.readInt();
|
396 | err.message = util.format(_unavailableMessage, types.consistencyToString[err.consistencies], err.required, err.alive);
|
397 | break;
|
398 | case types.responseErrorCodes.readTimeout:
|
399 | case types.responseErrorCodes.readFailure:
|
400 | err.consistencies = this.readShort();
|
401 | err.received = this.readInt();
|
402 | err.blockFor = this.readInt();
|
403 | if (code === types.responseErrorCodes.readFailure) {
|
404 | if (types.protocolVersion.supportsFailureReasonMap(this.header.version)) {
|
405 | err.failures = this.readInt();
|
406 | err.reasons = this.readMap(err.failures, this.readInetAddress, this.readShort);
|
407 | }
|
408 | else {
|
409 | err.failures = this.readInt();
|
410 | }
|
411 | }
|
412 | err.isDataPresent = this.readByte();
|
413 | if (code === types.responseErrorCodes.readTimeout) {
|
414 | let details;
|
415 | if (err.received < err.blockFor) {
|
416 | details = util.format('%d replica(s) responded over %d required', err.received, err.blockFor);
|
417 | }
|
418 | else if (!err.isDataPresent) {
|
419 | details = 'the replica queried for the data didn\'t respond';
|
420 | }
|
421 | else {
|
422 | details = 'timeout while waiting for repair of inconsistent replica';
|
423 | }
|
424 | err.message = util.format(_readTimeoutMessage, types.consistencyToString[err.consistencies], details);
|
425 | }
|
426 | else {
|
427 | err.message = util.format(_readFailureMessage, types.consistencyToString[err.consistencies], err.blockFor, err.received, err.failures);
|
428 | }
|
429 | break;
|
430 | case types.responseErrorCodes.writeTimeout:
|
431 | case types.responseErrorCodes.writeFailure:
|
432 | err.consistencies = this.readShort();
|
433 | err.received = this.readInt();
|
434 | err.blockFor = this.readInt();
|
435 | if (code === types.responseErrorCodes.writeFailure) {
|
436 | if (types.protocolVersion.supportsFailureReasonMap(this.header.version)) {
|
437 | err.failures = this.readInt();
|
438 | err.reasons = this.readMap(err.failures, this.readInetAddress, this.readShort);
|
439 | }
|
440 | else {
|
441 | err.failures = this.readInt();
|
442 | }
|
443 | }
|
444 | err.writeType = this.readString();
|
445 | if (code === types.responseErrorCodes.writeTimeout) {
|
446 | const template = err.writeType === 'BATCH_LOG' ? _writeTimeoutBatchLogMessage : _writeTimeoutQueryMessage;
|
447 | err.message = util.format(template, types.consistencyToString[err.consistencies], err.received, err.blockFor);
|
448 | }
|
449 | else {
|
450 | err.message = util.format(_writeFailureMessage, types.consistencyToString[err.consistencies], err.blockFor, err.received, err.failures);
|
451 | }
|
452 | break;
|
453 | case types.responseErrorCodes.unprepared:
|
454 | err.queryId = utils.copyBuffer(this.readShortBytes());
|
455 | break;
|
456 | case types.responseErrorCodes.functionFailure:
|
457 | err.keyspace = this.readString();
|
458 | err.functionName = this.readString();
|
459 | err.argTypes = this.readStringList();
|
460 | break;
|
461 | case types.responseErrorCodes.alreadyExists: {
|
462 | err.keyspace = this.readString();
|
463 | const table = this.readString();
|
464 | if (table.length > 0) {
|
465 | err.table = table;
|
466 | }
|
467 | break;
|
468 | }
|
469 | }
|
470 | return err;
|
471 | }
|
472 |
|
473 | |
474 |
|
475 |
|
476 |
|
477 | readEvent() {
|
478 | const eventType = this.readString();
|
479 | switch (eventType) {
|
480 | case types.protocolEvents.topologyChange:
|
481 | return {
|
482 | added: this.readString() === 'NEW_NODE',
|
483 | inet: this.readInet(),
|
484 | eventType: eventType
|
485 | };
|
486 | case types.protocolEvents.statusChange:
|
487 | return {
|
488 | up: this.readString() === 'UP',
|
489 | inet: this.readInet(),
|
490 | eventType: eventType
|
491 | };
|
492 | case types.protocolEvents.schemaChange:
|
493 | return this.parseSchemaChange();
|
494 | }
|
495 |
|
496 | return { eventType: eventType };
|
497 | }
|
498 |
|
499 | parseSchemaChange() {
|
500 | let result;
|
501 | if (!types.protocolVersion.supportsSchemaChangeFullMetadata(this.header.version)) {
|
502 |
|
503 | result = {
|
504 | eventType: types.protocolEvents.schemaChange,
|
505 | schemaChangeType: this.readString(),
|
506 | keyspace: this.readString(),
|
507 | table: this.readString()
|
508 | };
|
509 | result.isKeyspace = !result.table;
|
510 | return result;
|
511 | }
|
512 |
|
513 | result = {
|
514 | eventType: types.protocolEvents.schemaChange,
|
515 | schemaChangeType: this.readString(),
|
516 | target: this.readString(),
|
517 | keyspace: this.readString(),
|
518 | table: null,
|
519 | udt: null,
|
520 | signature: null
|
521 | };
|
522 | result.isKeyspace = result.target === 'KEYSPACE';
|
523 | switch (result.target) {
|
524 | case 'TABLE':
|
525 | result.table = this.readString();
|
526 | break;
|
527 | case 'TYPE':
|
528 | result.udt = this.readString();
|
529 | break;
|
530 | case 'FUNCTION':
|
531 | result.functionName = this.readString();
|
532 | result.signature = this.readStringList();
|
533 | break;
|
534 | case 'AGGREGATE':
|
535 | result.aggregate = this.readString();
|
536 | result.signature = this.readStringList();
|
537 | }
|
538 | return result;
|
539 | }
|
540 | }
|
541 |
|
542 | module.exports = { FrameReader };
|