UNPKG

32.4 kBJavaScriptView Raw
1var crypto = require('crypto'),
2 events = require('events'),
3 async = require('async'),
4 Lazy = require('lazy'),
5 levelup = require('levelup'),
6 memdown = require('memdown'),
7 sub = require('subleveldown'),
8 lock = require('lock'),
9 Big = require('big.js'),
10 once = require('once')
11
12exports.MAX_SIZE = 409600 // TODO: get rid of this? or leave for backwards compat?
13exports.create = create
14exports.lazy = lazyStream
15exports.validateKey = validateKey
16exports.validateItem = validateItem
17exports.validateUpdates = validateUpdates
18exports.validateKeyPiece = validateKeyPiece
19exports.validateKeyPaths = validateKeyPaths
20exports.createKey = createKey
21exports.createIndexKey = createIndexKey
22exports.traverseKey = traverseKey
23exports.traverseIndexes = traverseIndexes
24exports.toRangeStr = toRangeStr
25exports.toLexiStr = toLexiStr
26exports.hashPrefix = hashPrefix
27exports.validationError = validationError
28exports.limitError = limitError
29exports.checkConditional = checkConditional
30exports.itemSize = itemSize
31exports.capacityUnits = capacityUnits
32exports.addConsumedCapacity = addConsumedCapacity
33exports.matchesFilter = matchesFilter
34exports.matchesExprFilter = matchesExprFilter
35exports.compare = compare
36exports.mapPaths = mapPaths
37exports.mapPath = mapPath
38exports.queryTable = queryTable
39exports.updateIndexes = updateIndexes
40exports.getIndexActions = getIndexActions
41
42function create(options) {
43 options = options || {}
44 if (options.createTableMs == null) options.createTableMs = 500
45 if (options.deleteTableMs == null) options.deleteTableMs = 500
46 if (options.updateTableMs == null) options.updateTableMs = 500
47 if (options.maxItemSizeKb == null) options.maxItemSizeKb = exports.MAX_SIZE / 1024
48 options.maxItemSize = options.maxItemSizeKb * 1024
49
50 var db = levelup(options.path ? require('leveldown')(options.path) : memdown()),
51 subDbs = Object.create(null),
52 tableDb = getSubDb('table')
53
54 // XXX: Is there a better way to get this?
55 tableDb.awsAccountId = (process.env.AWS_ACCOUNT_ID || '0000-0000-0000').replace(/[^\d]/g, '')
56 tableDb.awsRegion = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1'
57
58 function getItemDb(name) {
59 return getSubDb('item-' + name)
60 }
61
62 function deleteItemDb(name, cb) {
63 deleteSubDb('item-' + name, cb)
64 }
65
66 function getIndexDb(indexType, tableName, indexName) {
67 return getSubDb('index-' + indexType.toLowerCase() + '~' + tableName + '~' + indexName)
68 }
69
70 function deleteIndexDb(indexType, tableName, indexName, cb) {
71 deleteSubDb('index-' + indexType.toLowerCase() + '~' + tableName + '~' + indexName, cb)
72 }
73
74 function getTagDb(name) {
75 return getSubDb('tag-' + name)
76 }
77
78 function deleteTagDb(name, cb) {
79 deleteSubDb('tag-' + name, cb)
80 }
81
82 function getSubDb(name) {
83 if (!subDbs[name]) {
84 subDbs[name] = sub(db, name, {valueEncoding: 'json'})
85 subDbs[name].lock = lock.Lock()
86 }
87 return subDbs[name]
88 }
89
90 function deleteSubDb(name, cb) {
91 cb = once(cb)
92 var subDb = getSubDb(name)
93 delete subDbs[name]
94 lazyStream(subDb.createKeyStream(), cb).join(function(keys) {
95 subDb.batch(keys.map(function(key) { return {type: 'del', key: key} }), cb)
96 })
97 }
98
99 function getTable(name, checkStatus, cb) {
100 if (typeof checkStatus == 'function') cb = checkStatus
101
102 tableDb.get(name, function(err, table) {
103 if (!err && checkStatus && (table.TableStatus == 'CREATING' || table.TableStatus == 'DELETING')) {
104 err = new Error('NotFoundError')
105 err.name = 'NotFoundError'
106 }
107 if (err) {
108 if (err.name == 'NotFoundError') {
109 err.statusCode = 400
110 err.body = {
111 __type: 'com.amazonaws.dynamodb.v20120810#ResourceNotFoundException',
112 message: 'Requested resource not found',
113 }
114 if (!checkStatus) err.body.message += ': Table: ' + name + ' not found'
115 }
116 return cb(err)
117 }
118
119 cb(null, table)
120 })
121 }
122
123 function recreate() {
124 var self = this, newStore = create(options)
125 Object.keys(newStore).forEach(function(key) {
126 self[key] = newStore[key]
127 })
128 }
129
130 return {
131 options: options,
132 db: db,
133 tableDb: tableDb,
134 getItemDb: getItemDb,
135 deleteItemDb: deleteItemDb,
136 getIndexDb: getIndexDb,
137 deleteIndexDb: deleteIndexDb,
138 getTagDb: getTagDb,
139 deleteTagDb: deleteTagDb,
140 getTable: getTable,
141 recreate: recreate,
142 }
143}
144
145function lazyStream(stream, errHandler) {
146 if (errHandler) stream.on('error', errHandler)
147 var streamAsLazy = new Lazy(stream)
148 stream.removeAllListeners('readable')
149 stream.on('data', streamAsLazy.emit.bind(streamAsLazy, 'data'))
150 if (stream.destroy) streamAsLazy.on('pipe', stream.destroy.bind(stream))
151 return streamAsLazy
152}
153
154function validateKey(dataKey, table, keySchema) {
155 if (keySchema == null) keySchema = table.KeySchema
156 if (keySchema.length != Object.keys(dataKey).length) {
157 return validationError('The provided key element does not match the schema')
158 }
159 return traverseKey(table, keySchema, function(attr, type, isHash) {
160 return validateKeyPiece(dataKey, attr, type, isHash)
161 })
162}
163
164function validateItem(dataItem, table) {
165 return traverseKey(table, function(attr, type, isHash) {
166 if (dataItem[attr] == null) {
167 return validationError('One or more parameter values were invalid: ' +
168 'Missing the key ' + attr + ' in the item')
169 }
170 if (dataItem[attr][type] == null) {
171 return validationError('One or more parameter values were invalid: ' +
172 'Type mismatch for key ' + attr + ' expected: ' + type +
173 ' actual: ' + Object.keys(dataItem[attr])[0])
174 }
175 return checkKeySize(dataItem[attr][type], type, isHash)
176 }) || traverseIndexes(table, function(attr, type, index) {
177 if (dataItem[attr] != null && dataItem[attr][type] == null) {
178 return validationError('One or more parameter values were invalid: ' +
179 'Type mismatch for Index Key ' + attr + ' Expected: ' + type +
180 ' Actual: ' + Object.keys(dataItem[attr])[0] + ' IndexName: ' + index.IndexName)
181 }
182 })
183}
184
185function validateUpdates(attributeUpdates, expressionUpdates, table) {
186 if (attributeUpdates == null && expressionUpdates == null) return
187
188 return traverseKey(table, function(attr) {
189 var hasKey = false
190 if (expressionUpdates) {
191 var sections = expressionUpdates.sections
192 for (var j = 0; j < sections.length; j++) {
193 if (sections[j].path[0] == attr) {
194 hasKey = true
195 break
196 }
197 }
198 } else {
199 hasKey = attributeUpdates[attr] != null
200 }
201 if (hasKey) {
202 return validationError('One or more parameter values were invalid: ' +
203 'Cannot update attribute ' + attr + '. This attribute is part of the key')
204 }
205 }) || traverseIndexes(table, function(attr, type, index) {
206 var actualType
207 if (expressionUpdates) {
208 var sections = expressionUpdates.sections
209 for (var i = 0; i < sections.length; i++) {
210 var section = sections[i]
211 if (section.path.length == 1 && section.path[0] == attr) {
212 actualType = section.attrType
213 break
214 }
215 }
216 } else {
217 actualType = attributeUpdates[attr] && attributeUpdates[attr].Value ?
218 Object.keys(attributeUpdates[attr].Value)[0] : null
219 }
220 if (actualType != null && actualType != type) {
221 return validationError('One or more parameter values were invalid: ' +
222 'Type mismatch for Index Key ' + attr + ' Expected: ' + type +
223 ' Actual: ' + actualType + ' IndexName: ' + index.IndexName)
224 }
225 }) || validateKeyPaths((expressionUpdates || {}).nestedPaths, table)
226}
227
228function validateKeyPiece(key, attr, type, isHash) {
229 if (key[attr] == null || key[attr][type] == null) {
230 return validationError('The provided key element does not match the schema')
231 }
232 return checkKeySize(key[attr][type], type, isHash)
233}
234
235function validateKeyPaths(nestedPaths, table) {
236 if (!nestedPaths) return
237 return traverseKey(table, function(attr) {
238 if (nestedPaths[attr]) {
239 return validationError('Key attributes must be scalars; ' +
240 'list random access \'[]\' and map lookup \'.\' are not allowed: Key: ' + attr)
241 }
242 }) || traverseIndexes(table, function(attr) {
243 if (nestedPaths[attr]) {
244 return validationError('Key attributes must be scalars; ' +
245 'list random access \'[]\' and map lookup \'.\' are not allowed: IndexKey: ' + attr)
246 }
247 })
248}
249
250function createKey(item, table, keySchema) {
251 if (keySchema == null) keySchema = table.KeySchema
252 var keyStr
253 traverseKey(table, keySchema, function(attr, type, isHash) {
254 if (isHash) keyStr = hashPrefix(item[attr][type], type) + '/'
255 keyStr += toRangeStr(item[attr][type], type) + '/'
256 })
257 return keyStr
258}
259
260function createIndexKey(item, table, keySchema) {
261 var tableKeyPieces = []
262 traverseKey(table, function(attr, type) { tableKeyPieces.push(item[attr][type], type) })
263 return createKey(item, table, keySchema) + hashPrefix.apply(this, tableKeyPieces)
264}
265
266function traverseKey(table, keySchema, visitKey) {
267 if (typeof keySchema == 'function') { visitKey = keySchema; keySchema = table.KeySchema }
268 var i, j, attr, type, found
269 for (i = 0; i < keySchema.length; i++) {
270 attr = keySchema[i].AttributeName
271 for (j = 0; j < table.AttributeDefinitions.length; j++) {
272 if (table.AttributeDefinitions[j].AttributeName != attr) continue
273 type = table.AttributeDefinitions[j].AttributeType
274 break
275 }
276 found = visitKey(attr, type, !i)
277 if (found) return found
278 }
279}
280
281function traverseIndexes(table, visitIndex) {
282 var i, j, k, attr, type, found
283 if (table.GlobalSecondaryIndexes) {
284 for (i = 0; i < table.GlobalSecondaryIndexes.length; i++) {
285 for (k = 0; k < table.GlobalSecondaryIndexes[i].KeySchema.length; k++) {
286 attr = table.GlobalSecondaryIndexes[i].KeySchema[k].AttributeName
287 for (j = 0; j < table.AttributeDefinitions.length; j++) {
288 if (table.AttributeDefinitions[j].AttributeName != attr) continue
289 type = table.AttributeDefinitions[j].AttributeType
290 break
291 }
292 found = visitIndex(attr, type, table.GlobalSecondaryIndexes[i], true)
293 if (found) return found
294 }
295 }
296 }
297 if (table.LocalSecondaryIndexes) {
298 for (i = 0; i < table.LocalSecondaryIndexes.length; i++) {
299 attr = table.LocalSecondaryIndexes[i].KeySchema[1].AttributeName
300 for (j = 0; j < table.AttributeDefinitions.length; j++) {
301 if (table.AttributeDefinitions[j].AttributeName != attr) continue
302 type = table.AttributeDefinitions[j].AttributeType
303 break
304 }
305 found = visitIndex(attr, type, table.LocalSecondaryIndexes[i], false)
306 if (found) return found
307 }
308 }
309}
310
311function checkKeySize(keyPiece, type, isHash) {
312 // Numbers are always fine
313 if (type == 'N') return null
314 if (type == 'B') keyPiece = Buffer.from(keyPiece, 'base64')
315 if (isHash && keyPiece.length > 2048)
316 return validationError('One or more parameter values were invalid: ' +
317 'Size of hashkey has exceeded the maximum size limit of2048 bytes')
318 else if (!isHash && keyPiece.length > 1024)
319 return validationError('One or more parameter values were invalid: ' +
320 'Aggregated size of all range keys has exceeded the size limit of 1024 bytes')
321}
322
323function toRangeStr(keyPiece, type) {
324 if (type == null) {
325 type = Object.keys(keyPiece)[0]
326 keyPiece = keyPiece[type]
327 }
328 if (type == 'S') return Buffer.from(keyPiece, 'utf8').toString('hex')
329 return toLexiStr(keyPiece, type)
330}
331
332// Creates lexigraphically sortable number strings
333// 0 7c 009 = '07c009' = -99.1
334// |-| |--| |-----|
335// sign exp digits
336//
337// Sign is 0 for negative, 1 for positive
338// Exp is hex for the exponent modified by adding 130 if sign is positive or subtracting from 125 if negative
339// Digits are unchanged if sign is positive, or added to 10 if negative
340// Hence, in '07c009', the sign is negative, exponent is 125 - 124 = 1, digits are 10 + -0.09 = 9.91 => -9.91e1
341//
342function toLexiStr(keyPiece, type) {
343 if (keyPiece == null) return ''
344 if (type == null) {
345 type = Object.keys(keyPiece)[0]
346 keyPiece = keyPiece[type]
347 }
348 if (type == 'B') return Buffer.from(keyPiece, 'base64').toString('hex')
349 if (type != 'N') return keyPiece
350 var bigNum = new Big(keyPiece), digits,
351 exp = !bigNum.c[0] ? 0 : bigNum.s == -1 ? 125 - bigNum.e : 130 + bigNum.e
352 if (bigNum.s == -1) {
353 bigNum.e = 0
354 digits = new Big(10).plus(bigNum).toFixed().replace(/\./, '')
355 } else {
356 digits = bigNum.c.join('')
357 }
358 return (bigNum.s == -1 ? '0' : '1') + ('0' + exp.toString(16)).slice(-2) + digits
359}
360
361function hashPrefix(hashKey, hashType, rangeKey, rangeType) {
362 if (hashType == 'S') {
363 hashKey = Buffer.from(hashKey, 'utf8')
364 } else if (hashType == 'N') {
365 hashKey = numToBuffer(hashKey)
366 } else if (hashType == 'B') {
367 hashKey = Buffer.from(hashKey, 'base64')
368 }
369 if (rangeKey) {
370 if (rangeType == 'S') {
371 rangeKey = Buffer.from(rangeKey, 'utf8')
372 } else if (rangeType == 'N') {
373 rangeKey = numToBuffer(rangeKey)
374 } else if (rangeType == 'B') {
375 rangeKey = Buffer.from(rangeKey, 'base64')
376 }
377 } else {
378 rangeKey = Buffer.from([])
379 }
380 // TODO: Can use the whole hash if we deem it important - for now just first six chars
381 return crypto.createHash('md5').update('Outliers').update(hashKey).update(rangeKey).digest('hex').slice(0, 6)
382}
383
384function numToBuffer(num) {
385 if (+num === 0) return Buffer.from([-128])
386
387 num = new Big(num)
388
389 var scale = num.s, mantissa = num.c, exponent = num.e + 1, appendZero = exponent % 2 ? 1 : 0,
390 byteArrayLengthWithoutExponent = Math.floor((mantissa.length + appendZero + 1) / 2),
391 byteArray, appendedZero = false, mantissaIndex, byteArrayIndex
392
393 if (byteArrayLengthWithoutExponent < 20 && scale == -1) {
394 byteArray = new Array(byteArrayLengthWithoutExponent + 2)
395 byteArray[byteArrayLengthWithoutExponent + 1] = 102
396 } else {
397 byteArray = new Array(byteArrayLengthWithoutExponent + 1)
398 }
399
400 byteArray[0] = Math.floor((exponent + appendZero) / 2) - 64
401 if (scale == -1)
402 byteArray[0] ^= 0xffffffff
403
404 for (mantissaIndex = 0; mantissaIndex < mantissa.length; mantissaIndex++) {
405 byteArrayIndex = Math.floor((mantissaIndex + appendZero) / 2) + 1
406 if (appendZero && !mantissaIndex && !appendedZero) {
407 byteArray[byteArrayIndex] = 0
408 appendedZero = true
409 mantissaIndex--
410 } else if ((mantissaIndex + appendZero) % 2 === 0) {
411 byteArray[byteArrayIndex] = mantissa[mantissaIndex] * 10
412 } else {
413 byteArray[byteArrayIndex] += mantissa[mantissaIndex]
414 }
415 if (((mantissaIndex + appendZero) % 2) || (mantissaIndex == mantissa.length - 1)) {
416 if (scale == -1)
417 byteArray[byteArrayIndex] = 101 - byteArray[byteArrayIndex]
418 else
419 byteArray[byteArrayIndex]++
420 }
421 }
422
423 return Buffer.from(byteArray)
424}
425
426function checkConditional(data, existingItem) {
427 existingItem = existingItem || {}
428
429 if (data._condition) {
430 if (!matchesExprFilter(existingItem, data._condition.expression)) {
431 return conditionalError()
432 }
433 return null
434 } else if (!data.Expected) {
435 return null
436 }
437 if (!matchesFilter(existingItem, data.Expected, data.ConditionalOperator)) {
438 return conditionalError()
439 }
440}
441
442function validationError(msg) {
443 var err = new Error(msg)
444 err.statusCode = 400
445 err.body = {
446 __type: 'com.amazon.coral.validate#ValidationException',
447 message: msg,
448 }
449 return err
450}
451
452function conditionalError(msg) {
453 if (msg == null) msg = 'The conditional request failed'
454 var err = new Error(msg)
455 err.statusCode = 400
456 err.body = {
457 __type: 'com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException',
458 message: msg,
459 }
460 return err
461}
462
463function limitError(msg) {
464 var err = new Error(msg)
465 err.statusCode = 400
466 err.body = {
467 __type: 'com.amazonaws.dynamodb.v20120810#LimitExceededException',
468 message: msg,
469 }
470 return err
471}
472
473function itemSize(item, compress, addMetaSize, rangeKey) {
474 // Size of compressed item (for checking query/scan limit) seems complicated,
475 // probably due to some internal serialization format.
476 var rangeKeySize = 0
477 var size = Object.keys(item).reduce(function(sum, attr) {
478 var size = valSizeWithStorage(item[attr], compress && attr != rangeKey)
479 if (compress && attr == rangeKey) {
480 rangeKeySize = size
481 return sum
482 }
483 return sum + size + (compress ? 1 : attr.length)
484 }, 0)
485 return !addMetaSize ? size : 2 + size + ((1 + Math.floor((1 + size) / 3072)) * (18 + rangeKeySize))
486}
487
488function valSizeWithStorage(itemAttr, compress) {
489 var type = Object.keys(itemAttr)[0]
490 var val = itemAttr[type]
491 var size = valSize(val, type, compress)
492 if (!compress) return size
493 switch (type) {
494 case 'S':
495 return size + (size < 128 ? 1 : size < 16384 ? 2 : 3)
496 case 'B':
497 return size + 1
498 case 'N':
499 return size + 1
500 case 'SS':
501 return size + val.length + 1
502 case 'BS':
503 return size + val.length + 1
504 case 'NS':
505 return size + val.length + 1
506 case 'NULL':
507 return 0
508 case 'BOOL':
509 return 1
510 case 'L':
511 return size
512 case 'M':
513 return size
514 }
515}
516
517function valSize(val, type, compress) {
518 switch (type) {
519 case 'S':
520 return val.length
521 case 'B':
522 return Buffer.from(val, 'base64').length
523 case 'N':
524 val = new Big(val)
525 var numDigits = val.c.length
526 if (numDigits == 1 && val.c[0] === 0) return 1
527 return 1 + Math.ceil(numDigits / 2) + (numDigits % 2 || val.e % 2 ? 0 : 1) + (val.s == -1 ? 1 : 0)
528 case 'SS':
529 return val.reduce(function(sum, x) { return sum + valSize(x, 'S') }, 0) // eslint-disable-line no-loop-func
530 case 'BS':
531 return val.reduce(function(sum, x) { return sum + valSize(x, 'B') }, 0) // eslint-disable-line no-loop-func
532 case 'NS':
533 return val.reduce(function(sum, x) { return sum + valSize(x, 'N') }, 0) // eslint-disable-line no-loop-func
534 case 'NULL':
535 return 1
536 case 'BOOL':
537 return 1
538 case 'L':
539 return 3 + val.reduce(function(sum, val) { return sum + 1 + valSizeWithStorage(val, compress) }, 0)
540 case 'M':
541 return 3 + Object.keys(val).length + itemSize(val, compress)
542 }
543}
544
545function capacityUnits(item, isRead, isConsistent) {
546 var size = item ? Math.ceil(itemSize(item) / 1024 / (isRead ? 4 : 1)) : 1
547 return size / (!isRead || isConsistent ? 1 : 2)
548}
549
550function addConsumedCapacity(data, isRead, newItem, oldItem) {
551 if (~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)) {
552 var capacity = capacityUnits(newItem, isRead, data.ConsistentRead)
553 if (oldItem != null) {
554 capacity = Math.max(capacity, capacityUnits(oldItem, isRead, data.ConsistentRead))
555 }
556 return {
557 CapacityUnits: capacity,
558 TableName: data.TableName,
559 Table: data.ReturnConsumedCapacity == 'INDEXES' ? {CapacityUnits: capacity} : undefined,
560 }
561 }
562}
563
564function valsEqual(val1, val2) {
565 if (Array.isArray(val1) && Array.isArray(val2)) {
566 if (val1.length != val2.length) return false
567 return val1.every(function(val) { return ~val2.indexOf(val) })
568 } else {
569 return val1 == val2
570 }
571}
572
573function matchesFilter(val, filter, conditionalOperator) {
574 for (var attr in filter) {
575 var comp = filter[attr].Exists != null ? (filter[attr].Exists ? 'NOT_NULL' : 'NULL') :
576 filter[attr].ComparisonOperator || 'EQ'
577 var result = compare(comp, val[attr], filter[attr].AttributeValueList || filter[attr].Value)
578 if (!result) {
579 return false
580 } else if (conditionalOperator == 'OR') {
581 return true
582 }
583 }
584 return true
585}
586
587function matchesExprFilter(item, expr) {
588 if (expr.type == 'and') {
589 return matchesExprFilter(item, expr.args[0]) && matchesExprFilter(item, expr.args[1])
590 } else if (expr.type == 'or') {
591 return matchesExprFilter(item, expr.args[0]) || matchesExprFilter(item, expr.args[1])
592 } else if (expr.type == 'not') {
593 return !matchesExprFilter(item, expr.args[0])
594 }
595 var args = expr.args.map(function(arg) { return resolveArg(arg, item) })
596 return compare(expr.type == 'function' ? expr.name : expr.type, args[0], args.slice(1))
597}
598
599function resolveArg(arg, item) {
600 if (Array.isArray(arg)) {
601 return mapPath(arg, item)
602 } else if (arg.type == 'function' && arg.name == 'size') {
603 var args = arg.args.map(function(arg) { return resolveArg(arg, item) })
604 var val = args[0], length
605 if (!val) {
606 return null
607 } else if (val.S) {
608 length = val.S.length
609 } else if (val.B) {
610 length = Buffer.from(val.B, 'base64').length
611 } else if (val.SS || val.BS || val.NS || val.L) {
612 length = (val.SS || val.BS || val.NS || val.L).length
613 } else if (val.M) {
614 length = Object.keys(val.M).length
615 }
616 return length != null ? {N: length.toString()} : null
617 } else {
618 return arg
619 }
620}
621
622function compare(comp, val, compVals) {
623 if (!Array.isArray(compVals)) compVals = [compVals]
624
625 var attrType = val ? Object.keys(val)[0] : null
626 var attrVal = attrType ? val[attrType] : null
627 var compType = compVals && compVals[0] ? Object.keys(compVals[0])[0] : null
628 var compVal = compType ? compVals[0][compType] : null
629
630 switch (comp) {
631 case 'EQ':
632 case '=':
633 if (compType != attrType || !valsEqual(attrVal, compVal)) return false
634 break
635 case 'NE':
636 case '<>':
637 if (compType == attrType && valsEqual(attrVal, compVal)) return false
638 break
639 case 'LE':
640 case '<=':
641 if (compType != attrType ||
642 (attrType == 'N' && !new Big(attrVal).lte(compVal)) ||
643 (attrType != 'N' && toLexiStr(attrVal, attrType) > toLexiStr(compVal, attrType))) return false
644 break
645 case 'LT':
646 case '<':
647 if (compType != attrType ||
648 (attrType == 'N' && !new Big(attrVal).lt(compVal)) ||
649 (attrType != 'N' && toLexiStr(attrVal, attrType) >= toLexiStr(compVal, attrType))) return false
650 break
651 case 'GE':
652 case '>=':
653 if (compType != attrType ||
654 (attrType == 'N' && !new Big(attrVal).gte(compVal)) ||
655 (attrType != 'N' && toLexiStr(attrVal, attrType) < toLexiStr(compVal, attrType))) return false
656 break
657 case 'GT':
658 case '>':
659 if (compType != attrType ||
660 (attrType == 'N' && !new Big(attrVal).gt(compVal)) ||
661 (attrType != 'N' && toLexiStr(attrVal, attrType) <= toLexiStr(compVal, attrType))) return false
662 break
663 case 'NOT_NULL':
664 case 'attribute_exists':
665 if (attrVal == null) return false
666 break
667 case 'NULL':
668 case 'attribute_not_exists':
669 if (attrVal != null) return false
670 break
671 case 'CONTAINS':
672 case 'contains':
673 return contains(compType, compVal, attrType, attrVal)
674 case 'NOT_CONTAINS':
675 return !contains(compType, compVal, attrType, attrVal)
676 case 'BEGINS_WITH':
677 case 'begins_with':
678 if (compType != attrType) return false
679 if (compType == 'B') {
680 attrVal = Buffer.from(attrVal, 'base64').toString()
681 compVal = Buffer.from(compVal, 'base64').toString()
682 }
683 if (attrVal.indexOf(compVal) !== 0) return false
684 break
685 case 'IN':
686 case 'in':
687 if (!attrVal) return false
688 if (!compVals.some(function(compVal) {
689 compType = Object.keys(compVal)[0]
690 compVal = compVal[compType]
691 return compType == attrType && valsEqual(attrVal, compVal)
692 })) return false
693 break
694 case 'BETWEEN':
695 case 'between':
696 if (!attrVal || compType != attrType ||
697 (attrType == 'N' && (!new Big(attrVal).gte(compVal) || !new Big(attrVal).lte(compVals[1].N))) ||
698 (attrType != 'N' && (toLexiStr(attrVal, attrType) < toLexiStr(compVal, attrType) ||
699 toLexiStr(attrVal, attrType) > toLexiStr(compVals[1][compType], attrType)))) return false
700 break
701 case 'attribute_type':
702 if (!attrVal || !valsEqual(attrType, compVal)) return false
703 }
704 return true
705}
706
707function contains(compType, compVal, attrType, attrVal) {
708 if (compType === 'S') {
709 if (attrType === 'S') return !!~attrVal.indexOf(compVal)
710 if (attrType === 'SS') return attrVal.some(function(val) {
711 return val === compVal
712 })
713 if (attrType === 'L') return attrVal.some(function(val) {
714 return val && val.S && val.S === compVal
715 })
716 return false
717 }
718 if (compType === 'N') {
719 if (attrType === 'NS') return attrVal.some(function(val) {
720 return val === compVal
721 })
722 if (attrType === 'L') return attrVal.some(function(val) {
723 return val && val.N && val.N === compVal
724 })
725 return false
726 }
727 if (compType === 'B') {
728 if (attrType !== 'B' && attrType !== 'BS' && attrType !== 'L') return false
729 var compValString = Buffer.from(compVal, 'base64').toString()
730 if (attrType === 'B') {
731 var attrValString = Buffer.from(attrVal, 'base64').toString()
732 return !!~attrValString.indexOf(compValString)
733 }
734 return attrVal.some(function(val) {
735 if (attrType !== 'L') return compValString === Buffer.from(val, 'base64').toString()
736 if (attrType === 'L' && val.B) return compValString === Buffer.from(val.B, 'base64').toString()
737 return false
738 })
739 }
740}
741
742function mapPaths(paths, item) {
743 var returnItem = Object.create(null), toSquash = []
744 for (var i = 0; i < paths.length; i++) {
745 var path = paths[i]
746 if (!Array.isArray(path)) path = [path]
747 var resolved = mapPath(path, item)
748 if (resolved == null) {
749 continue
750 }
751 if (path.length == 1) {
752 returnItem[path[0]] = resolved
753 continue
754 }
755 var curItem = {M: returnItem}
756 for (var j = 0; j < path.length; j++) {
757 var piece = path[j]
758 if (typeof piece == 'number') {
759 curItem.L = curItem.L || []
760 if (piece > curItem.L.length && !~toSquash.indexOf(curItem)) {
761 toSquash.push(curItem)
762 }
763 if (j < paths[i].length - 1) {
764 curItem.L[piece] = curItem.L[piece] || {}
765 curItem = curItem.L[piece]
766 } else {
767 curItem.L[piece] = resolved
768 }
769 } else {
770 curItem.M = curItem.M || {}
771 if (j < paths[i].length - 1) {
772 curItem.M[piece] = curItem.M[piece] || {}
773 curItem = curItem.M[piece]
774 } else {
775 curItem.M[piece] = resolved
776 }
777 }
778 }
779 }
780 toSquash.forEach(function(obj) { obj.L = obj.L.filter(Boolean) })
781 return returnItem
782}
783
784function mapPath(path, item) {
785 if (path.length == 1) {
786 return item[path[0]]
787 }
788 var resolved = {M: item}
789 for (var i = 0; i < path.length; i++) {
790 var piece = path[i]
791 if (typeof piece == 'number' && resolved.L) {
792 resolved = resolved.L[piece]
793 } else if (resolved.M) {
794 resolved = resolved.M[piece]
795 } else {
796 resolved = null
797 }
798 if (resolved == null) {
799 break
800 }
801 }
802 return resolved
803}
804
805function queryTable(store, table, data, opts, isLocal, fetchFromItemDb, startKeyNames, cb) {
806 cb = once(cb)
807 var itemDb = store.getItemDb(data.TableName), vals
808
809 if (data.IndexName) {
810 var indexDb = store.getIndexDb(isLocal ? 'local' : 'global', data.TableName, data.IndexName)
811 vals = lazyStream(indexDb.createValueStream(opts), cb)
812 } else {
813 vals = lazyStream(itemDb.createValueStream(opts), cb)
814 }
815
816 var tableCapacity = 0, indexCapacity = 0,
817 calculateCapacity = ~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)
818
819 if (fetchFromItemDb) {
820 var em = new events.EventEmitter
821 var queue = async.queue(function(key, cb) {
822 if (!key) {
823 em.emit('end')
824 return cb()
825 }
826 itemDb.get(key, function(err, item) {
827 if (err) {
828 em.emit('error', err)
829 return cb(err)
830 }
831 if (calculateCapacity) tableCapacity += itemSize(item)
832 em.emit('data', item)
833 cb()
834 })
835 })
836 var oldVals = vals
837 vals = new Lazy(em)
838
839 oldVals.map(function(item) {
840 if (calculateCapacity) indexCapacity += itemSize(item)
841 queue.push(createKey(item, table))
842 }).once('pipe', queue.push.bind(queue, ''))
843 }
844
845 var size = 0, count = 0, rangeKey = table.KeySchema[1] && table.KeySchema[1].AttributeName
846
847 vals = vals.takeWhile(function(val) {
848 if (count >= data.Limit || size >= 1024 * 1024) {
849 return false
850 }
851
852 if (calculateCapacity && !fetchFromItemDb) {
853 var capacitySize = itemSize(val)
854 if (data.IndexName) {
855 indexCapacity += capacitySize
856 } else {
857 tableCapacity += capacitySize
858 }
859 }
860
861 count++
862
863 size += itemSize(val, true, true, rangeKey)
864
865 return true
866 })
867
868 vals.join(function(items) {
869 var lastItem = items[items.length - 1]
870
871 var queryFilter = data.QueryFilter || data.ScanFilter
872
873 if (data._filter) {
874 items = items.filter(function(val) { return matchesExprFilter(val, data._filter.expression) })
875 } else if (queryFilter) {
876 items = items.filter(function(val) { return matchesFilter(val, queryFilter, data.ConditionalOperator) })
877 }
878
879 var result = {ScannedCount: count}
880 if (count >= data.Limit || size >= 1024 * 1024) {
881 if (data.Limit) items.splice(data.Limit)
882 if (lastItem) {
883 result.LastEvaluatedKey = startKeyNames.reduce(function(key, attr) {
884 key[attr] = lastItem[attr]
885 return key
886 }, {})
887 }
888 }
889
890 var paths = data._projection ? data._projection.paths : data.AttributesToGet
891 if (paths) {
892 items = items.map(mapPaths.bind(this, paths))
893 }
894
895 result.Count = items.length
896 if (data.Select != 'COUNT') result.Items = items
897 if (calculateCapacity) {
898 var tableUnits = Math.ceil(tableCapacity / 1024 / 4) * (data.ConsistentRead ? 1 : 0.5)
899 var indexUnits = Math.ceil(indexCapacity / 1024 / 4) * (data.ConsistentRead ? 1 : 0.5)
900 result.ConsumedCapacity = {
901 CapacityUnits: tableUnits + indexUnits,
902 TableName: data.TableName,
903 }
904 if (data.ReturnConsumedCapacity == 'INDEXES') {
905 result.ConsumedCapacity.Table = {CapacityUnits: tableUnits}
906 if (data.IndexName) {
907 var indexAttr = isLocal ? 'LocalSecondaryIndexes' : 'GlobalSecondaryIndexes'
908 result.ConsumedCapacity[indexAttr] = {}
909 result.ConsumedCapacity[indexAttr][data.IndexName] = {CapacityUnits: indexUnits}
910 }
911 }
912 }
913 cb(null, result)
914 })
915}
916
917function updateIndexes(store, table, existingItem, item, cb) {
918 if (!existingItem && !item) return cb()
919 var puts = [], deletes = []
920 ;['Local', 'Global'].forEach(function(indexType) {
921 var indexes = table[indexType + 'SecondaryIndexes'] || []
922 var actions = getIndexActions(indexes, existingItem, item, table)
923 puts = puts.concat(actions.puts.map(function(action) {
924 var indexDb = store.getIndexDb(indexType, table.TableName, action.index)
925 return indexDb.put.bind(indexDb, action.key, action.item)
926 }))
927 deletes = deletes.concat(actions.deletes.map(function(action) {
928 var indexDb = store.getIndexDb(indexType, table.TableName, action.index)
929 return indexDb.del.bind(indexDb, action.key)
930 }))
931 })
932
933 async.parallel(deletes, function(err) {
934 if (err) return cb(err)
935 async.parallel(puts, cb)
936 })
937}
938
939function getIndexActions(indexes, existingItem, item, table) {
940 var puts = [], deletes = [], tableKeys = table.KeySchema.map(function(key) { return key.AttributeName })
941 indexes.forEach(function(index) {
942 var indexKeys = index.KeySchema.map(function(key) { return key.AttributeName }), key = null, itemPieces = item
943
944 if (item && indexKeys.every(function(key) { return item[key] != null })) {
945 if (index.Projection.ProjectionType != 'ALL') {
946 var indexAttrs = indexKeys.concat(tableKeys, index.Projection.NonKeyAttributes || [])
947 itemPieces = indexAttrs.reduce(function(obj, attr) {
948 obj[attr] = item[attr]
949 return obj
950 }, Object.create(null))
951 }
952
953 key = createIndexKey(itemPieces, table, index.KeySchema)
954 puts.push({index: index.IndexName, key: key, item: itemPieces})
955 }
956
957 if (existingItem && indexKeys.every(function(key) { return existingItem[key] != null })) {
958 var existingKey = createIndexKey(existingItem, table, index.KeySchema)
959 if (existingKey != key) {
960 deletes.push({index: index.IndexName, key: existingKey})
961 }
962 }
963 })
964 return {puts: puts, deletes: deletes}
965}