1 | var async = require('async'),
|
2 | getItem = require('./getItem'),
|
3 | db = require('../db')
|
4 |
|
5 | module.exports = function batchGetItem(store, data, cb) {
|
6 | var requests = {}
|
7 |
|
8 | async.series([
|
9 | async.each.bind(async, Object.keys(data.RequestItems), addTableRequests),
|
10 | async.parallel.bind(async, requests),
|
11 | ], function(err, responses) {
|
12 | if (err) return cb(err)
|
13 | var res = {Responses: {}, UnprocessedKeys: {}}, table, tableResponses = responses[1], totalSize = 0, capacities = {}
|
14 |
|
15 | for (table in tableResponses) {
|
16 |
|
17 |
|
18 | tableResponses[table].forEach(function(tableRes, ix) { tableRes._key = data.RequestItems[table].Keys[ix] })
|
19 | shuffle(tableResponses[table])
|
20 | res.Responses[table] = tableResponses[table].map(function(tableRes) {
|
21 | if (tableRes.Item) {
|
22 |
|
23 | var newSize = totalSize + db.itemSize(tableRes.Item)
|
24 | if (newSize > (1024 * 1024 + store.options.maxItemSize - 3)) {
|
25 | if (!res.UnprocessedKeys[table]) {
|
26 | res.UnprocessedKeys[table] = {Keys: []}
|
27 | if (data.RequestItems[table].AttributesToGet)
|
28 | res.UnprocessedKeys[table].AttributesToGet = data.RequestItems[table].AttributesToGet
|
29 | if (data.RequestItems[table].ConsistentRead)
|
30 | res.UnprocessedKeys[table].ConsistentRead = data.RequestItems[table].ConsistentRead
|
31 | }
|
32 | if (!capacities[table]) capacities[table] = 0
|
33 | capacities[table] += 1
|
34 | res.UnprocessedKeys[table].Keys.push(tableRes._key)
|
35 | return null
|
36 | }
|
37 | totalSize = newSize
|
38 | }
|
39 | if (tableRes.ConsumedCapacity) {
|
40 | if (!capacities[table]) capacities[table] = 0
|
41 | capacities[table] += tableRes.ConsumedCapacity.CapacityUnits
|
42 | }
|
43 | return tableRes.Item
|
44 | }).filter(Boolean)
|
45 | }
|
46 |
|
47 | if (~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)) {
|
48 | res.ConsumedCapacity = Object.keys(tableResponses).map(function(table) {
|
49 | return {
|
50 | CapacityUnits: capacities[table],
|
51 | TableName: table,
|
52 | Table: data.ReturnConsumedCapacity == 'INDEXES' ? {CapacityUnits: capacities[table]} : undefined,
|
53 | }
|
54 | })
|
55 | }
|
56 |
|
57 | cb(null, res)
|
58 | })
|
59 |
|
60 | function addTableRequests(tableName, cb) {
|
61 | store.getTable(tableName, function(err, table) {
|
62 | if (err) return cb(err)
|
63 |
|
64 | var req = data.RequestItems[tableName], i, key, options, gets = []
|
65 |
|
66 | for (i = 0; i < req.Keys.length; i++) {
|
67 | key = req.Keys[i]
|
68 |
|
69 | if ((err = db.validateKey(key, table)) != null) return cb(err)
|
70 |
|
71 | options = {TableName: tableName, Key: key}
|
72 | if (req._projection) options._projection = req._projection
|
73 | if (req.AttributesToGet) options.AttributesToGet = req.AttributesToGet
|
74 | if (req.ConsistentRead) options.ConsistentRead = req.ConsistentRead
|
75 | if (data.ReturnConsumedCapacity) options.ReturnConsumedCapacity = data.ReturnConsumedCapacity
|
76 | gets.push(options)
|
77 | }
|
78 |
|
79 | requests[tableName] = async.map.bind(async, gets, function(data, cb) { return getItem(store, data, cb) })
|
80 |
|
81 | cb()
|
82 | })
|
83 | }
|
84 | }
|
85 |
|
86 | function shuffle(arr) {
|
87 | var i, j, temp
|
88 | for (i = arr.length - 1; i >= 1; i--) {
|
89 | j = Math.floor(Math.random() * (i + 1))
|
90 | temp = arr[i]
|
91 | arr[i] = arr[j]
|
92 | arr[j] = temp
|
93 | }
|
94 | }
|