1 | var crypto = require('crypto'),
|
2 | events = require('events'),
|
3 | async = require('async'),
|
4 | Lazy = require('lazy'),
|
5 | levelup = require('levelup'),
|
6 | memdown = require('memdown'),
|
7 | sub = require('subleveldown'),
|
8 | lock = require('lock'),
|
9 | Big = require('big.js'),
|
10 | once = require('once')
|
11 |
|
12 | exports.MAX_SIZE = 409600
|
13 | exports.create = create
|
14 | exports.lazy = lazyStream
|
15 | exports.validateKey = validateKey
|
16 | exports.validateItem = validateItem
|
17 | exports.validateUpdates = validateUpdates
|
18 | exports.validateKeyPiece = validateKeyPiece
|
19 | exports.validateKeyPaths = validateKeyPaths
|
20 | exports.createKey = createKey
|
21 | exports.createIndexKey = createIndexKey
|
22 | exports.traverseKey = traverseKey
|
23 | exports.traverseIndexes = traverseIndexes
|
24 | exports.toRangeStr = toRangeStr
|
25 | exports.toLexiStr = toLexiStr
|
26 | exports.hashPrefix = hashPrefix
|
27 | exports.validationError = validationError
|
28 | exports.limitError = limitError
|
29 | exports.checkConditional = checkConditional
|
30 | exports.itemSize = itemSize
|
31 | exports.capacityUnits = capacityUnits
|
32 | exports.addConsumedCapacity = addConsumedCapacity
|
33 | exports.matchesFilter = matchesFilter
|
34 | exports.matchesExprFilter = matchesExprFilter
|
35 | exports.compare = compare
|
36 | exports.mapPaths = mapPaths
|
37 | exports.mapPath = mapPath
|
38 | exports.queryTable = queryTable
|
39 | exports.updateIndexes = updateIndexes
|
40 | exports.getIndexActions = getIndexActions
|
41 |
|
42 | function create(options) {
|
43 | options = options || {}
|
44 | if (options.createTableMs == null) options.createTableMs = 500
|
45 | if (options.deleteTableMs == null) options.deleteTableMs = 500
|
46 | if (options.updateTableMs == null) options.updateTableMs = 500
|
47 | if (options.maxItemSizeKb == null) options.maxItemSizeKb = exports.MAX_SIZE / 1024
|
48 | options.maxItemSize = options.maxItemSizeKb * 1024
|
49 |
|
50 | var db = levelup(options.path ? require('leveldown')(options.path) : memdown()),
|
51 | subDbs = Object.create(null),
|
52 | tableDb = getSubDb('table')
|
53 |
|
54 |
|
55 | tableDb.awsAccountId = (process.env.AWS_ACCOUNT_ID || '0000-0000-0000').replace(/[^\d]/g, '')
|
56 | tableDb.awsRegion = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION || 'us-east-1'
|
57 |
|
58 | function getItemDb(name) {
|
59 | return getSubDb('item-' + name)
|
60 | }
|
61 |
|
62 | function deleteItemDb(name, cb) {
|
63 | deleteSubDb('item-' + name, cb)
|
64 | }
|
65 |
|
66 | function getIndexDb(indexType, tableName, indexName) {
|
67 | return getSubDb('index-' + indexType.toLowerCase() + '~' + tableName + '~' + indexName)
|
68 | }
|
69 |
|
70 | function deleteIndexDb(indexType, tableName, indexName, cb) {
|
71 | deleteSubDb('index-' + indexType.toLowerCase() + '~' + tableName + '~' + indexName, cb)
|
72 | }
|
73 |
|
74 | function getTagDb(name) {
|
75 | return getSubDb('tag-' + name)
|
76 | }
|
77 |
|
78 | function deleteTagDb(name, cb) {
|
79 | deleteSubDb('tag-' + name, cb)
|
80 | }
|
81 |
|
82 | function getSubDb(name) {
|
83 | if (!subDbs[name]) {
|
84 | subDbs[name] = sub(db, name, {valueEncoding: 'json'})
|
85 | subDbs[name].lock = lock.Lock()
|
86 | }
|
87 | return subDbs[name]
|
88 | }
|
89 |
|
90 | function deleteSubDb(name, cb) {
|
91 | cb = once(cb)
|
92 | var subDb = getSubDb(name)
|
93 | delete subDbs[name]
|
94 | lazyStream(subDb.createKeyStream(), cb).join(function(keys) {
|
95 | subDb.batch(keys.map(function(key) { return {type: 'del', key: key} }), cb)
|
96 | })
|
97 | }
|
98 |
|
99 | function getTable(name, checkStatus, cb) {
|
100 | if (typeof checkStatus == 'function') cb = checkStatus
|
101 |
|
102 | tableDb.get(name, function(err, table) {
|
103 | if (!err && checkStatus && (table.TableStatus == 'CREATING' || table.TableStatus == 'DELETING')) {
|
104 | err = new Error('NotFoundError')
|
105 | err.name = 'NotFoundError'
|
106 | }
|
107 | if (err) {
|
108 | if (err.name == 'NotFoundError') {
|
109 | err.statusCode = 400
|
110 | err.body = {
|
111 | __type: 'com.amazonaws.dynamodb.v20120810#ResourceNotFoundException',
|
112 | message: 'Requested resource not found',
|
113 | }
|
114 | if (!checkStatus) err.body.message += ': Table: ' + name + ' not found'
|
115 | }
|
116 | return cb(err)
|
117 | }
|
118 |
|
119 | cb(null, table)
|
120 | })
|
121 | }
|
122 |
|
123 | function recreate() {
|
124 | var self = this, newStore = create(options)
|
125 | Object.keys(newStore).forEach(function(key) {
|
126 | self[key] = newStore[key]
|
127 | })
|
128 | }
|
129 |
|
130 | return {
|
131 | options: options,
|
132 | db: db,
|
133 | tableDb: tableDb,
|
134 | getItemDb: getItemDb,
|
135 | deleteItemDb: deleteItemDb,
|
136 | getIndexDb: getIndexDb,
|
137 | deleteIndexDb: deleteIndexDb,
|
138 | getTagDb: getTagDb,
|
139 | deleteTagDb: deleteTagDb,
|
140 | getTable: getTable,
|
141 | recreate: recreate,
|
142 | }
|
143 | }
|
144 |
|
145 | function lazyStream(stream, errHandler) {
|
146 | if (errHandler) stream.on('error', errHandler)
|
147 | var streamAsLazy = new Lazy(stream)
|
148 | stream.removeAllListeners('readable')
|
149 | stream.on('data', streamAsLazy.emit.bind(streamAsLazy, 'data'))
|
150 | if (stream.destroy) streamAsLazy.on('pipe', stream.destroy.bind(stream))
|
151 | return streamAsLazy
|
152 | }
|
153 |
|
154 | function validateKey(dataKey, table, keySchema) {
|
155 | if (keySchema == null) keySchema = table.KeySchema
|
156 | if (keySchema.length != Object.keys(dataKey).length) {
|
157 | return validationError('The provided key element does not match the schema')
|
158 | }
|
159 | return traverseKey(table, keySchema, function(attr, type, isHash) {
|
160 | return validateKeyPiece(dataKey, attr, type, isHash)
|
161 | })
|
162 | }
|
163 |
|
164 | function validateItem(dataItem, table) {
|
165 | return traverseKey(table, function(attr, type, isHash) {
|
166 | if (dataItem[attr] == null) {
|
167 | return validationError('One or more parameter values were invalid: ' +
|
168 | 'Missing the key ' + attr + ' in the item')
|
169 | }
|
170 | if (dataItem[attr][type] == null) {
|
171 | return validationError('One or more parameter values were invalid: ' +
|
172 | 'Type mismatch for key ' + attr + ' expected: ' + type +
|
173 | ' actual: ' + Object.keys(dataItem[attr])[0])
|
174 | }
|
175 | return checkKeySize(dataItem[attr][type], type, isHash)
|
176 | }) || traverseIndexes(table, function(attr, type, index) {
|
177 | if (dataItem[attr] != null && dataItem[attr][type] == null) {
|
178 | return validationError('One or more parameter values were invalid: ' +
|
179 | 'Type mismatch for Index Key ' + attr + ' Expected: ' + type +
|
180 | ' Actual: ' + Object.keys(dataItem[attr])[0] + ' IndexName: ' + index.IndexName)
|
181 | }
|
182 | })
|
183 | }
|
184 |
|
185 | function validateUpdates(attributeUpdates, expressionUpdates, table) {
|
186 | if (attributeUpdates == null && expressionUpdates == null) return
|
187 |
|
188 | return traverseKey(table, function(attr) {
|
189 | var hasKey = false
|
190 | if (expressionUpdates) {
|
191 | var sections = expressionUpdates.sections
|
192 | for (var j = 0; j < sections.length; j++) {
|
193 | if (sections[j].path[0] == attr) {
|
194 | hasKey = true
|
195 | break
|
196 | }
|
197 | }
|
198 | } else {
|
199 | hasKey = attributeUpdates[attr] != null
|
200 | }
|
201 | if (hasKey) {
|
202 | return validationError('One or more parameter values were invalid: ' +
|
203 | 'Cannot update attribute ' + attr + '. This attribute is part of the key')
|
204 | }
|
205 | }) || traverseIndexes(table, function(attr, type, index) {
|
206 | var actualType
|
207 | if (expressionUpdates) {
|
208 | var sections = expressionUpdates.sections
|
209 | for (var i = 0; i < sections.length; i++) {
|
210 | var section = sections[i]
|
211 | if (section.path.length == 1 && section.path[0] == attr) {
|
212 | actualType = section.attrType
|
213 | break
|
214 | }
|
215 | }
|
216 | } else {
|
217 | actualType = attributeUpdates[attr] && attributeUpdates[attr].Value ?
|
218 | Object.keys(attributeUpdates[attr].Value)[0] : null
|
219 | }
|
220 | if (actualType != null && actualType != type) {
|
221 | return validationError('One or more parameter values were invalid: ' +
|
222 | 'Type mismatch for Index Key ' + attr + ' Expected: ' + type +
|
223 | ' Actual: ' + actualType + ' IndexName: ' + index.IndexName)
|
224 | }
|
225 | }) || validateKeyPaths((expressionUpdates || {}).nestedPaths, table)
|
226 | }
|
227 |
|
228 | function validateKeyPiece(key, attr, type, isHash) {
|
229 | if (key[attr] == null || key[attr][type] == null) {
|
230 | return validationError('The provided key element does not match the schema')
|
231 | }
|
232 | return checkKeySize(key[attr][type], type, isHash)
|
233 | }
|
234 |
|
235 | function validateKeyPaths(nestedPaths, table) {
|
236 | if (!nestedPaths) return
|
237 | return traverseKey(table, function(attr) {
|
238 | if (nestedPaths[attr]) {
|
239 | return validationError('Key attributes must be scalars; ' +
|
240 | 'list random access \'[]\' and map lookup \'.\' are not allowed: Key: ' + attr)
|
241 | }
|
242 | }) || traverseIndexes(table, function(attr) {
|
243 | if (nestedPaths[attr]) {
|
244 | return validationError('Key attributes must be scalars; ' +
|
245 | 'list random access \'[]\' and map lookup \'.\' are not allowed: IndexKey: ' + attr)
|
246 | }
|
247 | })
|
248 | }
|
249 |
|
250 | function createKey(item, table, keySchema) {
|
251 | if (keySchema == null) keySchema = table.KeySchema
|
252 | var keyStr
|
253 | traverseKey(table, keySchema, function(attr, type, isHash) {
|
254 | if (isHash) keyStr = hashPrefix(item[attr][type], type) + '/'
|
255 | keyStr += toRangeStr(item[attr][type], type) + '/'
|
256 | })
|
257 | return keyStr
|
258 | }
|
259 |
|
260 | function createIndexKey(item, table, keySchema) {
|
261 | var tableKeyPieces = []
|
262 | traverseKey(table, function(attr, type) { tableKeyPieces.push(item[attr][type], type) })
|
263 | return createKey(item, table, keySchema) + hashPrefix.apply(this, tableKeyPieces)
|
264 | }
|
265 |
|
266 | function traverseKey(table, keySchema, visitKey) {
|
267 | if (typeof keySchema == 'function') { visitKey = keySchema; keySchema = table.KeySchema }
|
268 | var i, j, attr, type, found
|
269 | for (i = 0; i < keySchema.length; i++) {
|
270 | attr = keySchema[i].AttributeName
|
271 | for (j = 0; j < table.AttributeDefinitions.length; j++) {
|
272 | if (table.AttributeDefinitions[j].AttributeName != attr) continue
|
273 | type = table.AttributeDefinitions[j].AttributeType
|
274 | break
|
275 | }
|
276 | found = visitKey(attr, type, !i)
|
277 | if (found) return found
|
278 | }
|
279 | }
|
280 |
|
281 | function traverseIndexes(table, visitIndex) {
|
282 | var i, j, k, attr, type, found
|
283 | if (table.GlobalSecondaryIndexes) {
|
284 | for (i = 0; i < table.GlobalSecondaryIndexes.length; i++) {
|
285 | for (k = 0; k < table.GlobalSecondaryIndexes[i].KeySchema.length; k++) {
|
286 | attr = table.GlobalSecondaryIndexes[i].KeySchema[k].AttributeName
|
287 | for (j = 0; j < table.AttributeDefinitions.length; j++) {
|
288 | if (table.AttributeDefinitions[j].AttributeName != attr) continue
|
289 | type = table.AttributeDefinitions[j].AttributeType
|
290 | break
|
291 | }
|
292 | found = visitIndex(attr, type, table.GlobalSecondaryIndexes[i], true)
|
293 | if (found) return found
|
294 | }
|
295 | }
|
296 | }
|
297 | if (table.LocalSecondaryIndexes) {
|
298 | for (i = 0; i < table.LocalSecondaryIndexes.length; i++) {
|
299 | attr = table.LocalSecondaryIndexes[i].KeySchema[1].AttributeName
|
300 | for (j = 0; j < table.AttributeDefinitions.length; j++) {
|
301 | if (table.AttributeDefinitions[j].AttributeName != attr) continue
|
302 | type = table.AttributeDefinitions[j].AttributeType
|
303 | break
|
304 | }
|
305 | found = visitIndex(attr, type, table.LocalSecondaryIndexes[i], false)
|
306 | if (found) return found
|
307 | }
|
308 | }
|
309 | }
|
310 |
|
311 | function checkKeySize(keyPiece, type, isHash) {
|
312 |
|
313 | if (type == 'N') return null
|
314 | if (type == 'B') keyPiece = Buffer.from(keyPiece, 'base64')
|
315 | if (isHash && keyPiece.length > 2048)
|
316 | return validationError('One or more parameter values were invalid: ' +
|
317 | 'Size of hashkey has exceeded the maximum size limit of2048 bytes')
|
318 | else if (!isHash && keyPiece.length > 1024)
|
319 | return validationError('One or more parameter values were invalid: ' +
|
320 | 'Aggregated size of all range keys has exceeded the size limit of 1024 bytes')
|
321 | }
|
322 |
|
323 | function toRangeStr(keyPiece, type) {
|
324 | if (type == null) {
|
325 | type = Object.keys(keyPiece)[0]
|
326 | keyPiece = keyPiece[type]
|
327 | }
|
328 | if (type == 'S') return Buffer.from(keyPiece, 'utf8').toString('hex')
|
329 | return toLexiStr(keyPiece, type)
|
330 | }
|
331 |
|
332 |
|
333 |
|
334 |
|
335 |
|
336 |
|
337 |
|
338 |
|
339 |
|
340 |
|
341 |
|
342 | function toLexiStr(keyPiece, type) {
|
343 | if (keyPiece == null) return ''
|
344 | if (type == null) {
|
345 | type = Object.keys(keyPiece)[0]
|
346 | keyPiece = keyPiece[type]
|
347 | }
|
348 | if (type == 'B') return Buffer.from(keyPiece, 'base64').toString('hex')
|
349 | if (type != 'N') return keyPiece
|
350 | var bigNum = new Big(keyPiece), digits,
|
351 | exp = !bigNum.c[0] ? 0 : bigNum.s == -1 ? 125 - bigNum.e : 130 + bigNum.e
|
352 | if (bigNum.s == -1) {
|
353 | bigNum.e = 0
|
354 | digits = new Big(10).plus(bigNum).toFixed().replace(/\./, '')
|
355 | } else {
|
356 | digits = bigNum.c.join('')
|
357 | }
|
358 | return (bigNum.s == -1 ? '0' : '1') + ('0' + exp.toString(16)).slice(-2) + digits
|
359 | }
|
360 |
|
361 | function hashPrefix(hashKey, hashType, rangeKey, rangeType) {
|
362 | if (hashType == 'S') {
|
363 | hashKey = Buffer.from(hashKey, 'utf8')
|
364 | } else if (hashType == 'N') {
|
365 | hashKey = numToBuffer(hashKey)
|
366 | } else if (hashType == 'B') {
|
367 | hashKey = Buffer.from(hashKey, 'base64')
|
368 | }
|
369 | if (rangeKey) {
|
370 | if (rangeType == 'S') {
|
371 | rangeKey = Buffer.from(rangeKey, 'utf8')
|
372 | } else if (rangeType == 'N') {
|
373 | rangeKey = numToBuffer(rangeKey)
|
374 | } else if (rangeType == 'B') {
|
375 | rangeKey = Buffer.from(rangeKey, 'base64')
|
376 | }
|
377 | } else {
|
378 | rangeKey = Buffer.from([])
|
379 | }
|
380 |
|
381 | return crypto.createHash('md5').update('Outliers').update(hashKey).update(rangeKey).digest('hex').slice(0, 6)
|
382 | }
|
383 |
|
384 | function numToBuffer(num) {
|
385 | if (+num === 0) return Buffer.from([-128])
|
386 |
|
387 | num = new Big(num)
|
388 |
|
389 | var scale = num.s, mantissa = num.c, exponent = num.e + 1, appendZero = exponent % 2 ? 1 : 0,
|
390 | byteArrayLengthWithoutExponent = Math.floor((mantissa.length + appendZero + 1) / 2),
|
391 | byteArray, appendedZero = false, mantissaIndex, byteArrayIndex
|
392 |
|
393 | if (byteArrayLengthWithoutExponent < 20 && scale == -1) {
|
394 | byteArray = new Array(byteArrayLengthWithoutExponent + 2)
|
395 | byteArray[byteArrayLengthWithoutExponent + 1] = 102
|
396 | } else {
|
397 | byteArray = new Array(byteArrayLengthWithoutExponent + 1)
|
398 | }
|
399 |
|
400 | byteArray[0] = Math.floor((exponent + appendZero) / 2) - 64
|
401 | if (scale == -1)
|
402 | byteArray[0] ^= 0xffffffff
|
403 |
|
404 | for (mantissaIndex = 0; mantissaIndex < mantissa.length; mantissaIndex++) {
|
405 | byteArrayIndex = Math.floor((mantissaIndex + appendZero) / 2) + 1
|
406 | if (appendZero && !mantissaIndex && !appendedZero) {
|
407 | byteArray[byteArrayIndex] = 0
|
408 | appendedZero = true
|
409 | mantissaIndex--
|
410 | } else if ((mantissaIndex + appendZero) % 2 === 0) {
|
411 | byteArray[byteArrayIndex] = mantissa[mantissaIndex] * 10
|
412 | } else {
|
413 | byteArray[byteArrayIndex] += mantissa[mantissaIndex]
|
414 | }
|
415 | if (((mantissaIndex + appendZero) % 2) || (mantissaIndex == mantissa.length - 1)) {
|
416 | if (scale == -1)
|
417 | byteArray[byteArrayIndex] = 101 - byteArray[byteArrayIndex]
|
418 | else
|
419 | byteArray[byteArrayIndex]++
|
420 | }
|
421 | }
|
422 |
|
423 | return Buffer.from(byteArray)
|
424 | }
|
425 |
|
426 | function checkConditional(data, existingItem) {
|
427 | existingItem = existingItem || {}
|
428 |
|
429 | if (data._condition) {
|
430 | if (!matchesExprFilter(existingItem, data._condition.expression)) {
|
431 | return conditionalError()
|
432 | }
|
433 | return null
|
434 | } else if (!data.Expected) {
|
435 | return null
|
436 | }
|
437 | if (!matchesFilter(existingItem, data.Expected, data.ConditionalOperator)) {
|
438 | return conditionalError()
|
439 | }
|
440 | }
|
441 |
|
442 | function validationError(msg) {
|
443 | var err = new Error(msg)
|
444 | err.statusCode = 400
|
445 | err.body = {
|
446 | __type: 'com.amazon.coral.validate#ValidationException',
|
447 | message: msg,
|
448 | }
|
449 | return err
|
450 | }
|
451 |
|
452 | function conditionalError(msg) {
|
453 | if (msg == null) msg = 'The conditional request failed'
|
454 | var err = new Error(msg)
|
455 | err.statusCode = 400
|
456 | err.body = {
|
457 | __type: 'com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException',
|
458 | message: msg,
|
459 | }
|
460 | return err
|
461 | }
|
462 |
|
463 | function limitError(msg) {
|
464 | var err = new Error(msg)
|
465 | err.statusCode = 400
|
466 | err.body = {
|
467 | __type: 'com.amazonaws.dynamodb.v20120810#LimitExceededException',
|
468 | message: msg,
|
469 | }
|
470 | return err
|
471 | }
|
472 |
|
473 | function itemSize(item, compress, addMetaSize, rangeKey) {
|
474 |
|
475 |
|
476 | var rangeKeySize = 0
|
477 | var size = Object.keys(item).reduce(function(sum, attr) {
|
478 | var size = valSizeWithStorage(item[attr], compress && attr != rangeKey)
|
479 | if (compress && attr == rangeKey) {
|
480 | rangeKeySize = size
|
481 | return sum
|
482 | }
|
483 | return sum + size + (compress ? 1 : attr.length)
|
484 | }, 0)
|
485 | return !addMetaSize ? size : 2 + size + ((1 + Math.floor((1 + size) / 3072)) * (18 + rangeKeySize))
|
486 | }
|
487 |
|
488 | function valSizeWithStorage(itemAttr, compress) {
|
489 | var type = Object.keys(itemAttr)[0]
|
490 | var val = itemAttr[type]
|
491 | var size = valSize(val, type, compress)
|
492 | if (!compress) return size
|
493 | switch (type) {
|
494 | case 'S':
|
495 | return size + (size < 128 ? 1 : size < 16384 ? 2 : 3)
|
496 | case 'B':
|
497 | return size + 1
|
498 | case 'N':
|
499 | return size + 1
|
500 | case 'SS':
|
501 | return size + val.length + 1
|
502 | case 'BS':
|
503 | return size + val.length + 1
|
504 | case 'NS':
|
505 | return size + val.length + 1
|
506 | case 'NULL':
|
507 | return 0
|
508 | case 'BOOL':
|
509 | return 1
|
510 | case 'L':
|
511 | return size
|
512 | case 'M':
|
513 | return size
|
514 | }
|
515 | }
|
516 |
|
517 | function valSize(val, type, compress) {
|
518 | switch (type) {
|
519 | case 'S':
|
520 | return val.length
|
521 | case 'B':
|
522 | return Buffer.from(val, 'base64').length
|
523 | case 'N':
|
524 | val = new Big(val)
|
525 | var numDigits = val.c.length
|
526 | if (numDigits == 1 && val.c[0] === 0) return 1
|
527 | return 1 + Math.ceil(numDigits / 2) + (numDigits % 2 || val.e % 2 ? 0 : 1) + (val.s == -1 ? 1 : 0)
|
528 | case 'SS':
|
529 | return val.reduce(function(sum, x) { return sum + valSize(x, 'S') }, 0)
|
530 | case 'BS':
|
531 | return val.reduce(function(sum, x) { return sum + valSize(x, 'B') }, 0)
|
532 | case 'NS':
|
533 | return val.reduce(function(sum, x) { return sum + valSize(x, 'N') }, 0)
|
534 | case 'NULL':
|
535 | return 1
|
536 | case 'BOOL':
|
537 | return 1
|
538 | case 'L':
|
539 | return 3 + val.reduce(function(sum, val) { return sum + 1 + valSizeWithStorage(val, compress) }, 0)
|
540 | case 'M':
|
541 | return 3 + Object.keys(val).length + itemSize(val, compress)
|
542 | }
|
543 | }
|
544 |
|
545 | function capacityUnits(item, isRead, isConsistent) {
|
546 | var size = item ? Math.ceil(itemSize(item) / 1024 / (isRead ? 4 : 1)) : 1
|
547 | return size / (!isRead || isConsistent ? 1 : 2)
|
548 | }
|
549 |
|
550 | function addConsumedCapacity(data, isRead, newItem, oldItem) {
|
551 | if (~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)) {
|
552 | var capacity = capacityUnits(newItem, isRead, data.ConsistentRead)
|
553 | if (oldItem != null) {
|
554 | capacity = Math.max(capacity, capacityUnits(oldItem, isRead, data.ConsistentRead))
|
555 | }
|
556 | return {
|
557 | CapacityUnits: capacity,
|
558 | TableName: data.TableName,
|
559 | Table: data.ReturnConsumedCapacity == 'INDEXES' ? {CapacityUnits: capacity} : undefined,
|
560 | }
|
561 | }
|
562 | }
|
563 |
|
564 | function valsEqual(val1, val2) {
|
565 | if (Array.isArray(val1) && Array.isArray(val2)) {
|
566 | if (val1.length != val2.length) return false
|
567 | return val1.every(function(val) { return ~val2.indexOf(val) })
|
568 | } else {
|
569 | return val1 == val2
|
570 | }
|
571 | }
|
572 |
|
573 | function matchesFilter(val, filter, conditionalOperator) {
|
574 | for (var attr in filter) {
|
575 | var comp = filter[attr].Exists != null ? (filter[attr].Exists ? 'NOT_NULL' : 'NULL') :
|
576 | filter[attr].ComparisonOperator || 'EQ'
|
577 | var result = compare(comp, val[attr], filter[attr].AttributeValueList || filter[attr].Value)
|
578 | if (!result) {
|
579 | return false
|
580 | } else if (conditionalOperator == 'OR') {
|
581 | return true
|
582 | }
|
583 | }
|
584 | return true
|
585 | }
|
586 |
|
587 | function matchesExprFilter(item, expr) {
|
588 | if (expr.type == 'and') {
|
589 | return matchesExprFilter(item, expr.args[0]) && matchesExprFilter(item, expr.args[1])
|
590 | } else if (expr.type == 'or') {
|
591 | return matchesExprFilter(item, expr.args[0]) || matchesExprFilter(item, expr.args[1])
|
592 | } else if (expr.type == 'not') {
|
593 | return !matchesExprFilter(item, expr.args[0])
|
594 | }
|
595 | var args = expr.args.map(function(arg) { return resolveArg(arg, item) })
|
596 | return compare(expr.type == 'function' ? expr.name : expr.type, args[0], args.slice(1))
|
597 | }
|
598 |
|
599 | function resolveArg(arg, item) {
|
600 | if (Array.isArray(arg)) {
|
601 | return mapPath(arg, item)
|
602 | } else if (arg.type == 'function' && arg.name == 'size') {
|
603 | var args = arg.args.map(function(arg) { return resolveArg(arg, item) })
|
604 | var val = args[0], length
|
605 | if (!val) {
|
606 | return null
|
607 | } else if (val.S) {
|
608 | length = val.S.length
|
609 | } else if (val.B) {
|
610 | length = Buffer.from(val.B, 'base64').length
|
611 | } else if (val.SS || val.BS || val.NS || val.L) {
|
612 | length = (val.SS || val.BS || val.NS || val.L).length
|
613 | } else if (val.M) {
|
614 | length = Object.keys(val.M).length
|
615 | }
|
616 | return length != null ? {N: length.toString()} : null
|
617 | } else {
|
618 | return arg
|
619 | }
|
620 | }
|
621 |
|
622 | function compare(comp, val, compVals) {
|
623 | if (!Array.isArray(compVals)) compVals = [compVals]
|
624 |
|
625 | var attrType = val ? Object.keys(val)[0] : null
|
626 | var attrVal = attrType ? val[attrType] : null
|
627 | var compType = compVals && compVals[0] ? Object.keys(compVals[0])[0] : null
|
628 | var compVal = compType ? compVals[0][compType] : null
|
629 |
|
630 | switch (comp) {
|
631 | case 'EQ':
|
632 | case '=':
|
633 | if (compType != attrType || !valsEqual(attrVal, compVal)) return false
|
634 | break
|
635 | case 'NE':
|
636 | case '<>':
|
637 | if (compType == attrType && valsEqual(attrVal, compVal)) return false
|
638 | break
|
639 | case 'LE':
|
640 | case '<=':
|
641 | if (compType != attrType ||
|
642 | (attrType == 'N' && !new Big(attrVal).lte(compVal)) ||
|
643 | (attrType != 'N' && toLexiStr(attrVal, attrType) > toLexiStr(compVal, attrType))) return false
|
644 | break
|
645 | case 'LT':
|
646 | case '<':
|
647 | if (compType != attrType ||
|
648 | (attrType == 'N' && !new Big(attrVal).lt(compVal)) ||
|
649 | (attrType != 'N' && toLexiStr(attrVal, attrType) >= toLexiStr(compVal, attrType))) return false
|
650 | break
|
651 | case 'GE':
|
652 | case '>=':
|
653 | if (compType != attrType ||
|
654 | (attrType == 'N' && !new Big(attrVal).gte(compVal)) ||
|
655 | (attrType != 'N' && toLexiStr(attrVal, attrType) < toLexiStr(compVal, attrType))) return false
|
656 | break
|
657 | case 'GT':
|
658 | case '>':
|
659 | if (compType != attrType ||
|
660 | (attrType == 'N' && !new Big(attrVal).gt(compVal)) ||
|
661 | (attrType != 'N' && toLexiStr(attrVal, attrType) <= toLexiStr(compVal, attrType))) return false
|
662 | break
|
663 | case 'NOT_NULL':
|
664 | case 'attribute_exists':
|
665 | if (attrVal == null) return false
|
666 | break
|
667 | case 'NULL':
|
668 | case 'attribute_not_exists':
|
669 | if (attrVal != null) return false
|
670 | break
|
671 | case 'CONTAINS':
|
672 | case 'contains':
|
673 | return contains(compType, compVal, attrType, attrVal)
|
674 | case 'NOT_CONTAINS':
|
675 | return !contains(compType, compVal, attrType, attrVal)
|
676 | case 'BEGINS_WITH':
|
677 | case 'begins_with':
|
678 | if (compType != attrType) return false
|
679 | if (compType == 'B') {
|
680 | attrVal = Buffer.from(attrVal, 'base64').toString()
|
681 | compVal = Buffer.from(compVal, 'base64').toString()
|
682 | }
|
683 | if (attrVal.indexOf(compVal) !== 0) return false
|
684 | break
|
685 | case 'IN':
|
686 | case 'in':
|
687 | if (!attrVal) return false
|
688 | if (!compVals.some(function(compVal) {
|
689 | compType = Object.keys(compVal)[0]
|
690 | compVal = compVal[compType]
|
691 | return compType == attrType && valsEqual(attrVal, compVal)
|
692 | })) return false
|
693 | break
|
694 | case 'BETWEEN':
|
695 | case 'between':
|
696 | if (!attrVal || compType != attrType ||
|
697 | (attrType == 'N' && (!new Big(attrVal).gte(compVal) || !new Big(attrVal).lte(compVals[1].N))) ||
|
698 | (attrType != 'N' && (toLexiStr(attrVal, attrType) < toLexiStr(compVal, attrType) ||
|
699 | toLexiStr(attrVal, attrType) > toLexiStr(compVals[1][compType], attrType)))) return false
|
700 | break
|
701 | case 'attribute_type':
|
702 | if (!attrVal || !valsEqual(attrType, compVal)) return false
|
703 | }
|
704 | return true
|
705 | }
|
706 |
|
707 | function contains(compType, compVal, attrType, attrVal) {
|
708 | if (compType === 'S') {
|
709 | if (attrType === 'S') return !!~attrVal.indexOf(compVal)
|
710 | if (attrType === 'SS') return attrVal.some(function(val) {
|
711 | return val === compVal
|
712 | })
|
713 | if (attrType === 'L') return attrVal.some(function(val) {
|
714 | return val && val.S && val.S === compVal
|
715 | })
|
716 | return false
|
717 | }
|
718 | if (compType === 'N') {
|
719 | if (attrType === 'NS') return attrVal.some(function(val) {
|
720 | return val === compVal
|
721 | })
|
722 | if (attrType === 'L') return attrVal.some(function(val) {
|
723 | return val && val.N && val.N === compVal
|
724 | })
|
725 | return false
|
726 | }
|
727 | if (compType === 'B') {
|
728 | if (attrType !== 'B' && attrType !== 'BS' && attrType !== 'L') return false
|
729 | var compValString = Buffer.from(compVal, 'base64').toString()
|
730 | if (attrType === 'B') {
|
731 | var attrValString = Buffer.from(attrVal, 'base64').toString()
|
732 | return !!~attrValString.indexOf(compValString)
|
733 | }
|
734 | return attrVal.some(function(val) {
|
735 | if (attrType !== 'L') return compValString === Buffer.from(val, 'base64').toString()
|
736 | if (attrType === 'L' && val.B) return compValString === Buffer.from(val.B, 'base64').toString()
|
737 | return false
|
738 | })
|
739 | }
|
740 | }
|
741 |
|
742 | function mapPaths(paths, item) {
|
743 | var returnItem = Object.create(null), toSquash = []
|
744 | for (var i = 0; i < paths.length; i++) {
|
745 | var path = paths[i]
|
746 | if (!Array.isArray(path)) path = [path]
|
747 | var resolved = mapPath(path, item)
|
748 | if (resolved == null) {
|
749 | continue
|
750 | }
|
751 | if (path.length == 1) {
|
752 | returnItem[path[0]] = resolved
|
753 | continue
|
754 | }
|
755 | var curItem = {M: returnItem}
|
756 | for (var j = 0; j < path.length; j++) {
|
757 | var piece = path[j]
|
758 | if (typeof piece == 'number') {
|
759 | curItem.L = curItem.L || []
|
760 | if (piece > curItem.L.length && !~toSquash.indexOf(curItem)) {
|
761 | toSquash.push(curItem)
|
762 | }
|
763 | if (j < paths[i].length - 1) {
|
764 | curItem.L[piece] = curItem.L[piece] || {}
|
765 | curItem = curItem.L[piece]
|
766 | } else {
|
767 | curItem.L[piece] = resolved
|
768 | }
|
769 | } else {
|
770 | curItem.M = curItem.M || {}
|
771 | if (j < paths[i].length - 1) {
|
772 | curItem.M[piece] = curItem.M[piece] || {}
|
773 | curItem = curItem.M[piece]
|
774 | } else {
|
775 | curItem.M[piece] = resolved
|
776 | }
|
777 | }
|
778 | }
|
779 | }
|
780 | toSquash.forEach(function(obj) { obj.L = obj.L.filter(Boolean) })
|
781 | return returnItem
|
782 | }
|
783 |
|
784 | function mapPath(path, item) {
|
785 | if (path.length == 1) {
|
786 | return item[path[0]]
|
787 | }
|
788 | var resolved = {M: item}
|
789 | for (var i = 0; i < path.length; i++) {
|
790 | var piece = path[i]
|
791 | if (typeof piece == 'number' && resolved.L) {
|
792 | resolved = resolved.L[piece]
|
793 | } else if (resolved.M) {
|
794 | resolved = resolved.M[piece]
|
795 | } else {
|
796 | resolved = null
|
797 | }
|
798 | if (resolved == null) {
|
799 | break
|
800 | }
|
801 | }
|
802 | return resolved
|
803 | }
|
804 |
|
805 | function queryTable(store, table, data, opts, isLocal, fetchFromItemDb, startKeyNames, cb) {
|
806 | cb = once(cb)
|
807 | var itemDb = store.getItemDb(data.TableName), vals
|
808 |
|
809 | if (data.IndexName) {
|
810 | var indexDb = store.getIndexDb(isLocal ? 'local' : 'global', data.TableName, data.IndexName)
|
811 | vals = lazyStream(indexDb.createValueStream(opts), cb)
|
812 | } else {
|
813 | vals = lazyStream(itemDb.createValueStream(opts), cb)
|
814 | }
|
815 |
|
816 | var tableCapacity = 0, indexCapacity = 0,
|
817 | calculateCapacity = ~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)
|
818 |
|
819 | if (fetchFromItemDb) {
|
820 | var em = new events.EventEmitter
|
821 | var queue = async.queue(function(key, cb) {
|
822 | if (!key) {
|
823 | em.emit('end')
|
824 | return cb()
|
825 | }
|
826 | itemDb.get(key, function(err, item) {
|
827 | if (err) {
|
828 | em.emit('error', err)
|
829 | return cb(err)
|
830 | }
|
831 | if (calculateCapacity) tableCapacity += itemSize(item)
|
832 | em.emit('data', item)
|
833 | cb()
|
834 | })
|
835 | })
|
836 | var oldVals = vals
|
837 | vals = new Lazy(em)
|
838 |
|
839 | oldVals.map(function(item) {
|
840 | if (calculateCapacity) indexCapacity += itemSize(item)
|
841 | queue.push(createKey(item, table))
|
842 | }).once('pipe', queue.push.bind(queue, ''))
|
843 | }
|
844 |
|
845 | var size = 0, count = 0, rangeKey = table.KeySchema[1] && table.KeySchema[1].AttributeName
|
846 |
|
847 | vals = vals.takeWhile(function(val) {
|
848 | if (count >= data.Limit || size >= 1024 * 1024) {
|
849 | return false
|
850 | }
|
851 |
|
852 | if (calculateCapacity && !fetchFromItemDb) {
|
853 | var capacitySize = itemSize(val)
|
854 | if (data.IndexName) {
|
855 | indexCapacity += capacitySize
|
856 | } else {
|
857 | tableCapacity += capacitySize
|
858 | }
|
859 | }
|
860 |
|
861 | count++
|
862 |
|
863 | size += itemSize(val, true, true, rangeKey)
|
864 |
|
865 | return true
|
866 | })
|
867 |
|
868 | vals.join(function(items) {
|
869 | var lastItem = items[items.length - 1]
|
870 |
|
871 | var queryFilter = data.QueryFilter || data.ScanFilter
|
872 |
|
873 | if (data._filter) {
|
874 | items = items.filter(function(val) { return matchesExprFilter(val, data._filter.expression) })
|
875 | } else if (queryFilter) {
|
876 | items = items.filter(function(val) { return matchesFilter(val, queryFilter, data.ConditionalOperator) })
|
877 | }
|
878 |
|
879 | var result = {ScannedCount: count}
|
880 | if (count >= data.Limit || size >= 1024 * 1024) {
|
881 | if (data.Limit) items.splice(data.Limit)
|
882 | if (lastItem) {
|
883 | result.LastEvaluatedKey = startKeyNames.reduce(function(key, attr) {
|
884 | key[attr] = lastItem[attr]
|
885 | return key
|
886 | }, {})
|
887 | }
|
888 | }
|
889 |
|
890 | var paths = data._projection ? data._projection.paths : data.AttributesToGet
|
891 | if (paths) {
|
892 | items = items.map(mapPaths.bind(this, paths))
|
893 | }
|
894 |
|
895 | result.Count = items.length
|
896 | if (data.Select != 'COUNT') result.Items = items
|
897 | if (calculateCapacity) {
|
898 | var tableUnits = Math.ceil(tableCapacity / 1024 / 4) * (data.ConsistentRead ? 1 : 0.5)
|
899 | var indexUnits = Math.ceil(indexCapacity / 1024 / 4) * (data.ConsistentRead ? 1 : 0.5)
|
900 | result.ConsumedCapacity = {
|
901 | CapacityUnits: tableUnits + indexUnits,
|
902 | TableName: data.TableName,
|
903 | }
|
904 | if (data.ReturnConsumedCapacity == 'INDEXES') {
|
905 | result.ConsumedCapacity.Table = {CapacityUnits: tableUnits}
|
906 | if (data.IndexName) {
|
907 | var indexAttr = isLocal ? 'LocalSecondaryIndexes' : 'GlobalSecondaryIndexes'
|
908 | result.ConsumedCapacity[indexAttr] = {}
|
909 | result.ConsumedCapacity[indexAttr][data.IndexName] = {CapacityUnits: indexUnits}
|
910 | }
|
911 | }
|
912 | }
|
913 | cb(null, result)
|
914 | })
|
915 | }
|
916 |
|
917 | function updateIndexes(store, table, existingItem, item, cb) {
|
918 | if (!existingItem && !item) return cb()
|
919 | var puts = [], deletes = []
|
920 | ;['Local', 'Global'].forEach(function(indexType) {
|
921 | var indexes = table[indexType + 'SecondaryIndexes'] || []
|
922 | var actions = getIndexActions(indexes, existingItem, item, table)
|
923 | puts = puts.concat(actions.puts.map(function(action) {
|
924 | var indexDb = store.getIndexDb(indexType, table.TableName, action.index)
|
925 | return indexDb.put.bind(indexDb, action.key, action.item)
|
926 | }))
|
927 | deletes = deletes.concat(actions.deletes.map(function(action) {
|
928 | var indexDb = store.getIndexDb(indexType, table.TableName, action.index)
|
929 | return indexDb.del.bind(indexDb, action.key)
|
930 | }))
|
931 | })
|
932 |
|
933 | async.parallel(deletes, function(err) {
|
934 | if (err) return cb(err)
|
935 | async.parallel(puts, cb)
|
936 | })
|
937 | }
|
938 |
|
939 | function getIndexActions(indexes, existingItem, item, table) {
|
940 | var puts = [], deletes = [], tableKeys = table.KeySchema.map(function(key) { return key.AttributeName })
|
941 | indexes.forEach(function(index) {
|
942 | var indexKeys = index.KeySchema.map(function(key) { return key.AttributeName }), key = null, itemPieces = item
|
943 |
|
944 | if (item && indexKeys.every(function(key) { return item[key] != null })) {
|
945 | if (index.Projection.ProjectionType != 'ALL') {
|
946 | var indexAttrs = indexKeys.concat(tableKeys, index.Projection.NonKeyAttributes || [])
|
947 | itemPieces = indexAttrs.reduce(function(obj, attr) {
|
948 | obj[attr] = item[attr]
|
949 | return obj
|
950 | }, Object.create(null))
|
951 | }
|
952 |
|
953 | key = createIndexKey(itemPieces, table, index.KeySchema)
|
954 | puts.push({index: index.IndexName, key: key, item: itemPieces})
|
955 | }
|
956 |
|
957 | if (existingItem && indexKeys.every(function(key) { return existingItem[key] != null })) {
|
958 | var existingKey = createIndexKey(existingItem, table, index.KeySchema)
|
959 | if (existingKey != key) {
|
960 | deletes.push({index: index.IndexName, key: existingKey})
|
961 | }
|
962 | }
|
963 | })
|
964 | return {puts: puts, deletes: deletes}
|
965 | }
|