UNPKG

3.52 kBJavaScriptView Raw
1var async = require('async'),
2 getItem = require('./getItem'),
3 db = require('../db')
4
5module.exports = function batchGetItem(store, data, cb) {
6 var requests = {}
7
8 async.series([
9 async.each.bind(async, Object.keys(data.RequestItems), addTableRequests),
10 async.parallel.bind(async, requests),
11 ], function(err, responses) {
12 if (err) return cb(err)
13 var res = {Responses: {}, UnprocessedKeys: {}}, table, tableResponses = responses[1], totalSize = 0, capacities = {}
14
15 for (table in tableResponses) {
16 // Order is pretty random
17 // Assign keys before we shuffle
18 tableResponses[table].forEach(function(tableRes, ix) { tableRes._key = data.RequestItems[table].Keys[ix] }) // eslint-disable-line no-loop-func
19 shuffle(tableResponses[table])
20 res.Responses[table] = tableResponses[table].map(function(tableRes) { // eslint-disable-line no-loop-func
21 if (tableRes.Item) {
22 // TODO: This is totally inefficient - should fix this
23 var newSize = totalSize + db.itemSize(tableRes.Item)
24 if (newSize > (1024 * 1024 + store.options.maxItemSize - 3)) {
25 if (!res.UnprocessedKeys[table]) {
26 res.UnprocessedKeys[table] = {Keys: []}
27 if (data.RequestItems[table].AttributesToGet)
28 res.UnprocessedKeys[table].AttributesToGet = data.RequestItems[table].AttributesToGet
29 if (data.RequestItems[table].ConsistentRead)
30 res.UnprocessedKeys[table].ConsistentRead = data.RequestItems[table].ConsistentRead
31 }
32 if (!capacities[table]) capacities[table] = 0
33 capacities[table] += 1
34 res.UnprocessedKeys[table].Keys.push(tableRes._key)
35 return null
36 }
37 totalSize = newSize
38 }
39 if (tableRes.ConsumedCapacity) {
40 if (!capacities[table]) capacities[table] = 0
41 capacities[table] += tableRes.ConsumedCapacity.CapacityUnits
42 }
43 return tableRes.Item
44 }).filter(Boolean)
45 }
46
47 if (~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)) {
48 res.ConsumedCapacity = Object.keys(tableResponses).map(function(table) {
49 return {
50 CapacityUnits: capacities[table],
51 TableName: table,
52 Table: data.ReturnConsumedCapacity == 'INDEXES' ? {CapacityUnits: capacities[table]} : undefined,
53 }
54 })
55 }
56
57 cb(null, res)
58 })
59
60 function addTableRequests(tableName, cb) {
61 store.getTable(tableName, function(err, table) {
62 if (err) return cb(err)
63
64 var req = data.RequestItems[tableName], i, key, options, gets = []
65
66 for (i = 0; i < req.Keys.length; i++) {
67 key = req.Keys[i]
68
69 if ((err = db.validateKey(key, table)) != null) return cb(err)
70
71 options = {TableName: tableName, Key: key}
72 if (req._projection) options._projection = req._projection
73 if (req.AttributesToGet) options.AttributesToGet = req.AttributesToGet
74 if (req.ConsistentRead) options.ConsistentRead = req.ConsistentRead
75 if (data.ReturnConsumedCapacity) options.ReturnConsumedCapacity = data.ReturnConsumedCapacity
76 gets.push(options)
77 }
78
79 requests[tableName] = async.map.bind(async, gets, function(data, cb) { return getItem(store, data, cb) })
80
81 cb()
82 })
83 }
84}
85
86function shuffle(arr) {
87 var i, j, temp
88 for (i = arr.length - 1; i >= 1; i--) {
89 j = Math.floor(Math.random() * (i + 1))
90 temp = arr[i]
91 arr[i] = arr[j]
92 arr[j] = temp
93 }
94}