UNPKG

16.7 kBJavaScriptView Raw
1/*
2 * Copyright DataStax, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16'use strict';
17
18const types = require('./types');
19const token = require('./token');
20const utils = require('./utils');
21const MutableLong = require('./types/mutable-long');
22const { Integer } = types;
23
24// Murmur3 constants
25//-0x783C846EEEBDAC2B
26const mconst1 = new MutableLong(0x53d5, 0x1142, 0x7b91, 0x87c3);
27//0x4cf5ad432745937f
28const mconst2 = new MutableLong(0x937f, 0x2745, 0xad43, 0x4cf5);
29const mlongFive = MutableLong.fromNumber(5);
30//0xff51afd7ed558ccd
31const mconst3 = new MutableLong(0x8ccd, 0xed55, 0xafd7, 0xff51);
32//0xc4ceb9fe1a85ec53
33const mconst4 = new MutableLong(0xec53, 0x1a85, 0xb9fe, 0xc4ce);
34const mconst5 = MutableLong.fromNumber(0x52dce729);
35const mconst6 = MutableLong.fromNumber(0x38495ab5);
36
37/**
38 * Represents a set of methods that are able to generate and parse tokens for the C* partitioner.
39 * @abstract
40 */
41class Tokenizer {
42 constructor() {
43
44 }
45
46 /**
47 * Creates a token based on the Buffer value provided
48 * @abstract
49 * @param {Buffer|Array} value
50 * @returns {Token} Computed token
51 */
52 hash(value) {
53 throw new Error('You must implement a hash function for the tokenizer');
54 }
55
56 /**
57 * Parses a token string and returns a representation of the token
58 * @abstract
59 * @param {String} value
60 */
61 parse(value) {
62 throw new Error('You must implement a parse function for the tokenizer');
63 }
64
65 minToken() {
66 throw new Error('You must implement a minToken function for the tokenizer');
67 }
68
69 /**
70 * Splits the range specified by start and end into numberOfSplits equal parts.
71 * @param {Token} start Starting token
72 * @param {Token} end End token
73 * @param {Number} numberOfSplits Number of splits to make.
74 */
75 split(start, end, numberOfSplits) {
76 throw new Error('You must implement a split function for the tokenizer');
77 }
78
79 /**
80 * Common implementation for splitting token ranges when start is in
81 * a shared Integer format.
82 *
83 * @param {Integer} start Starting token
84 * @param {Integer} range How large the range of the split is
85 * @param {Integer} ringEnd The end point of the ring so we know where to wrap
86 * @param {Integer} ringLength The total size of the ring
87 * @param {Number} numberOfSplits The number of splits to make
88 * @returns {Array<Integer>} The evenly-split points on the range
89 */
90 splitBase(start, range, ringEnd, ringLength, numberOfSplits) {
91 const numberOfSplitsInt = Integer.fromInt(numberOfSplits);
92 const divider = range.divide(numberOfSplitsInt);
93 let remainder = range.modulo(numberOfSplitsInt);
94
95 const results = [];
96 let current = start;
97 const dividerPlusOne = divider.add(Integer.ONE);
98
99 for(let i = 1; i < numberOfSplits; i++) {
100 if (remainder.greaterThan(Integer.ZERO)) {
101 current = current.add(dividerPlusOne);
102 } else {
103 current = current.add(divider);
104 }
105 if (ringLength && current.greaterThan(ringEnd)) {
106 current = current.subtract(ringLength);
107 }
108 results.push(current);
109 remainder = remainder.subtract(Integer.ONE);
110 }
111 return results;
112 }
113
114 /**
115 * Return internal string based representation of a Token.
116 * @param {Token} token
117 */
118 stringify(token) {
119 return token.getValue().toString();
120 }
121}
122
123/**
124 * Uniformly distributes data across the cluster based on Cassandra flavored Murmur3 hashed values.
125 */
126class Murmur3Tokenizer extends Tokenizer {
127
128 constructor() {
129 super();
130 }
131
132 /**
133 * @param {Buffer} value
134 * @return {Murmur3Token}
135 */
136 hash(value) {
137 // This is an adapted version of the MurmurHash.hash3_x64_128 from Cassandra used
138 // for M3P. Compared to that methods, there's a few inlining of arguments and we
139 // only return the first 64-bits of the result since that's all M3 partitioner uses.
140
141 const data = value;
142 let offset = 0;
143 const length = data.length;
144
145 const nblocks = length >> 4; // Process as 128-bit blocks.
146
147 const h1 = new MutableLong();
148 const h2 = new MutableLong();
149 let k1 = new MutableLong();
150 let k2 = new MutableLong();
151
152 for (let i = 0; i < nblocks; i++) {
153 k1 = this.getBlock(data, offset, i * 2);
154 k2 = this.getBlock(data, offset, i * 2 + 1);
155
156 k1.multiply(mconst1);
157 this.rotl64(k1, 31);
158 k1.multiply(mconst2);
159
160 h1.xor(k1);
161 this.rotl64(h1, 27);
162 h1.add(h2);
163 h1.multiply(mlongFive).add(mconst5);
164
165 k2.multiply(mconst2);
166 this.rotl64(k2, 33);
167 k2.multiply(mconst1);
168 h2.xor(k2);
169 this.rotl64(h2, 31);
170 h2.add(h1);
171 h2.multiply(mlongFive).add(mconst6);
172 }
173 //----------
174 // tail
175
176 // Advance offset to the unprocessed tail of the data.
177 offset += nblocks * 16;
178
179 k1 = new MutableLong();
180 k2 = new MutableLong();
181
182 /* eslint-disable no-fallthrough */
183 switch(length & 15) {
184 case 15:
185 k2.xor(fromSignedByte(data[offset+14]).shiftLeft(48));
186 case 14:
187 k2.xor(fromSignedByte(data[offset+13]).shiftLeft(40));
188 case 13:
189 k2.xor(fromSignedByte(data[offset+12]).shiftLeft(32));
190 case 12:
191 k2.xor(fromSignedByte(data[offset+11]).shiftLeft(24));
192 case 11:
193 k2.xor(fromSignedByte(data[offset+10]).shiftLeft(16));
194 case 10:
195 k2.xor(fromSignedByte(data[offset+9]).shiftLeft(8));
196 case 9:
197 k2.xor(fromSignedByte(data[offset+8]));
198 k2.multiply(mconst2);
199 this.rotl64(k2, 33);
200 k2.multiply(mconst1);
201 h2.xor(k2);
202 case 8:
203 k1.xor(fromSignedByte(data[offset+7]).shiftLeft(56));
204 case 7:
205 k1.xor(fromSignedByte(data[offset+6]).shiftLeft(48));
206 case 6:
207 k1.xor(fromSignedByte(data[offset+5]).shiftLeft(40));
208 case 5:
209 k1.xor(fromSignedByte(data[offset+4]).shiftLeft(32));
210 case 4:
211 k1.xor(fromSignedByte(data[offset+3]).shiftLeft(24));
212 case 3:
213 k1.xor(fromSignedByte(data[offset+2]).shiftLeft(16));
214 case 2:
215 k1.xor(fromSignedByte(data[offset+1]).shiftLeft(8));
216 case 1:
217 k1.xor(fromSignedByte(data[offset]));
218 k1.multiply(mconst1);
219 this.rotl64(k1,31);
220 k1.multiply(mconst2);
221 h1.xor(k1);
222 }
223 /* eslint-enable no-fallthrough */
224
225 h1.xor(MutableLong.fromNumber(length));
226 h2.xor(MutableLong.fromNumber(length));
227
228 h1.add(h2);
229 h2.add(h1);
230
231
232 this.fmix(h1);
233 this.fmix(h2);
234
235 h1.add(h2);
236
237 return new token.Murmur3Token(h1);
238 }
239
240 /**
241 *
242 * @param {Array<Number>} key
243 * @param {Number} offset
244 * @param {Number} index
245 * @return {MutableLong}
246 */
247 getBlock(key, offset, index) {
248 const i8 = index << 3;
249 const blockOffset = offset + i8;
250 return new MutableLong(
251 (key[blockOffset]) | (key[blockOffset + 1] << 8),
252 (key[blockOffset + 2]) | (key[blockOffset + 3] << 8),
253 (key[blockOffset + 4]) | (key[blockOffset + 5] << 8),
254 (key[blockOffset + 6]) | (key[blockOffset + 7] << 8)
255 );
256 }
257
258 /**
259 * @param {MutableLong} v
260 * @param {Number} n
261 */
262 rotl64(v, n) {
263 const left = v.clone().shiftLeft(n);
264 v.shiftRightUnsigned(64 - n).or(left);
265 }
266
267 /** @param {MutableLong} k */
268 fmix(k) {
269 k.xor(new MutableLong(k.getUint16(2) >>> 1 | ((k.getUint16(3) << 15) & 0xffff), k.getUint16(3) >>> 1, 0, 0));
270 k.multiply(mconst3);
271 const other = new MutableLong(
272 (k.getUint16(2) >>> 1) | ((k.getUint16(3) << 15) & 0xffff),
273 k.getUint16(3) >>> 1,
274 0,
275 0
276 );
277 k.xor(other);
278 k.multiply(mconst4);
279 k.xor(new MutableLong(k.getUint16(2) >>> 1 | (k.getUint16(3) << 15 & 0xffff), k.getUint16(3) >>> 1, 0, 0));
280 }
281
282 /**
283 * Parses a int64 decimal string representation into a MutableLong.
284 * @param {String} value
285 * @returns {Murmur3Token}
286 */
287 parse(value) {
288 return new token.Murmur3Token(MutableLong.fromString(value));
289 }
290
291 minToken() {
292 if (!this._minToken) {
293 // minimum long value.
294 this._minToken = this.parse('-9223372036854775808');
295 }
296 return this._minToken;
297 }
298
299 maxToken() {
300 if (!this._maxToken) {
301 this._maxToken = this.parse('9223372036854775807');
302 }
303 return this._maxToken;
304 }
305
306 maxValue() {
307 if (!this._maxValue) {
308 this._maxValue = Integer.fromString('9223372036854775807');
309 }
310 return this._maxValue;
311 }
312
313 minValue() {
314 if (!this._minValue) {
315 this._minValue = Integer.fromString('-9223372036854775808');
316 }
317 return this._minValue;
318 }
319
320 ringLength() {
321 if (!this._ringLength) {
322 this._ringLength = this.maxValue().subtract(this.minValue());
323 }
324 return this._ringLength;
325 }
326
327 split(start, end, numberOfSplits) {
328 // ]min, min] means the whole ring.
329 if (start.equals(end) && start.equals(this.minToken())) {
330 end = this.maxToken();
331 }
332
333 const startVal = Integer.fromString(start.getValue().toString());
334 const endVal = Integer.fromString(end.getValue().toString());
335
336 let range = endVal.subtract(startVal);
337 if (range.isNegative()) {
338 range = range.add(this.ringLength());
339 }
340
341 const values = this.splitBase(startVal, range, this.maxValue(), this.ringLength(), numberOfSplits);
342 return values.map(v => this.parse(v.toString()));
343 }
344
345 stringify(token) {
346 // Get the underlying MutableLong
347 const value = token.getValue();
348 // We need a way to uniquely represent a token, it doesn't have to be the decimal string representation
349 // Using the uint16 avoids divisions and other expensive operations on the longs
350 return value.getUint16(0) + ',' + value.getUint16(1) + ',' + value.getUint16(2) + ',' + value.getUint16(3);
351 }
352}
353
354/**
355 * Uniformly distributes data across the cluster based on MD5 hash values.
356 */
357class RandomTokenizer extends Tokenizer {
358 constructor() {
359 super();
360 // eslint-disable-next-line
361 this._crypto = require('crypto');
362 }
363
364 /**
365 * @param {Buffer|Array} value
366 * @returns {RandomToken}
367 */
368 hash(value) {
369 if (Array.isArray(value)) {
370 value = utils.allocBufferFromArray(value);
371 }
372 const hashedValue = this._crypto.createHash('md5').update(value).digest();
373 return new token.RandomToken(Integer.fromBuffer(hashedValue).abs());
374 }
375
376 /**
377 * @returns {Token}
378 */
379 parse(value) {
380 return new token.RandomToken(Integer.fromString(value));
381 }
382
383 minToken() {
384 if (!this._minToken) {
385 this._minToken = this.parse('-1');
386 }
387 return this._minToken;
388 }
389
390 maxValue() {
391 if (!this._maxValue) {
392 this._maxValue = Integer.fromNumber(Math.pow(2, 127));
393 }
394 return this._maxValue;
395 }
396
397 maxToken() {
398 if (!this._maxToken) {
399 this._maxToken = new token.RandomToken(this.maxValue());
400 }
401 return this._maxToken;
402 }
403
404 ringLength() {
405 if (!this._ringLength) {
406 this._ringLength = this.maxValue().add(Integer.ONE);
407 }
408 return this._ringLength;
409 }
410
411 split(start, end, numberOfSplits) {
412 // ]min, min] means the whole ring.
413 if (start.equals(end) && start.equals(this.minToken())) {
414 end = this.maxToken();
415 }
416
417 const startVal = start.getValue();
418 const endVal = end.getValue();
419
420 let range = endVal.subtract(startVal);
421 if (range.lessThan(Integer.ZERO)) {
422 range = range.add(this.ringLength());
423 }
424
425 const values = this.splitBase(startVal, range, this.maxValue(), this.ringLength(), numberOfSplits);
426 return values.map(v => new token.RandomToken(v));
427 }
428}
429
430class ByteOrderedTokenizer extends Tokenizer {
431 constructor() {
432 super();
433 }
434
435 /**
436 * @param {Buffer} value
437 * @returns {ByteOrderedToken}
438 */
439 hash(value) {
440 // strip any trailing zeros as tokens with trailing zeros are equivalent
441 // to those who don't have them.
442 if (Array.isArray(value)) {
443 value = utils.allocBufferFromArray(value);
444 }
445 let zeroIndex = value.length;
446 for(let i = value.length - 1; i > 0; i--) {
447 if(value[i] === 0) {
448 zeroIndex = i;
449 } else {
450 break;
451 }
452 }
453 return new token.ByteOrderedToken(value.slice(0, zeroIndex));
454 }
455
456 stringify(token) {
457 return token.getValue().toString('hex');
458 }
459
460 parse(value) {
461 return this.hash(utils.allocBufferFromString(value, 'hex'));
462 }
463
464 minToken() {
465 if (!this._minToken) {
466 this._minToken = this.hash([]);
467 }
468 return this._minToken;
469 }
470
471 _toNumber(buffer, significantBytes) {
472 // Convert a token's byte array to a number in order to perform computations.
473 // This depends on the number of significant bytes that is used to normalize all tokens
474 // to the same size. For example if the token is 0x01 but significant bytes is 2, the
475 // result is 0x0100.
476 let target = buffer;
477 if(buffer.length !== significantBytes) {
478 target = Buffer.alloc(significantBytes);
479 buffer.copy(target);
480 }
481
482 // similar to Integer.fromBuffer except we force the sign to 0.
483 const bits = new Array(Math.ceil(target.length / 4));
484 for (let i = 0; i < bits.length; i++) {
485 let offset = target.length - ((i + 1) * 4);
486 let value;
487 if (offset < 0) {
488 //The buffer length is not multiple of 4
489 offset = offset + 4;
490 value = 0;
491 for (let j = 0; j < offset; j++) {
492 const byte = target[j];
493 value = value | (byte << (offset - j - 1) * 8);
494 }
495 }
496 else {
497 value = target.readInt32BE(offset);
498 }
499 bits[i] = value;
500 }
501 return new Integer(bits, 0);
502 }
503
504 _toBuffer(number, significantBytes) {
505 // Convert numeric representation back to a buffer.
506 const buffer = Integer.toBuffer(number);
507 if (buffer.length === significantBytes) {
508 return buffer;
509 }
510
511 // if first byte is a sign byte, skip it.
512 let start, length;
513 if (buffer[0] === 0) {
514 start = 1;
515 length = buffer.length - 1;
516 } else {
517 start = 0;
518 length = buffer.length;
519 }
520
521 const target = Buffer.alloc(significantBytes);
522 buffer.copy(target, significantBytes - length, start, length + start);
523 return target;
524 }
525
526 split(start, end, numberOfSplits) {
527 const tokenOrder = start.compare(end);
528
529 if (tokenOrder === 0 && start.equals(this.minToken())) {
530 throw new Error("Cannot split whole ring with ordered partitioner");
531 }
532
533 let startVal, endVal, range, ringLength, ringEnd;
534 const intNumberOfSplits = Integer.fromNumber(numberOfSplits);
535 // Since tokens are compared lexicographically, convert to numbers using the
536 // largest length (i.e. given 0x0A and 0x0BCD, switch to 0x0A00 and 0x0BCD)
537 let significantBytes = Math.max(start.getValue().length, end.getValue().length);
538 if (tokenOrder < 0) {
539 let addedBytes = 0;
540 while (true) {
541 startVal = this._toNumber(start.getValue(), significantBytes);
542 endVal = this._toNumber(end.getValue(), significantBytes);
543 range = endVal.subtract(startVal);
544 if (addedBytes === 4 || range.compare(intNumberOfSplits) >= 0) {
545 break;
546 }
547 significantBytes += 1;
548 addedBytes += 1;
549 }
550 } else {
551 let addedBytes = 0;
552 while (true) {
553 startVal = this._toNumber(start.getValue(), significantBytes);
554 endVal = this._toNumber(end.getValue(), significantBytes);
555 ringLength = Integer.fromNumber(Math.pow(2, significantBytes * 8));
556 ringEnd = ringLength.subtract(Integer.ONE);
557 range = endVal.subtract(startVal).add(ringLength);
558 if (addedBytes === 4 || range.compare(intNumberOfSplits) >= 0) {
559 break;
560 }
561 significantBytes += 1;
562 addedBytes += 1;
563 }
564 }
565
566 const values = this.splitBase(startVal, range, ringEnd, ringLength, numberOfSplits);
567 return values.map(v => new token.ByteOrderedToken(this._toBuffer(v, significantBytes)));
568 }
569}
570
571/**
572 * @param {Number} value
573 * @return {MutableLong}
574 */
575function fromSignedByte(value) {
576 if (value < 128) {
577 return new MutableLong(value, 0, 0, 0);
578 }
579 return new MutableLong((value - 256) & 0xffff, 0xffff, 0xffff, 0xffff);
580}
581
582exports.Murmur3Tokenizer = Murmur3Tokenizer;
583exports.RandomTokenizer = RandomTokenizer;
584exports.ByteOrderedTokenizer = ByteOrderedTokenizer;