1 | var async = require('async'),
|
2 | putItem = require('./putItem'),
|
3 | deleteItem = require('./deleteItem'),
|
4 | db = require('../db')
|
5 |
|
6 | module.exports = function batchWriteItem(store, data, cb) {
|
7 | var actions = []
|
8 |
|
9 | async.series([
|
10 | async.each.bind(async, Object.keys(data.RequestItems), addTableActions),
|
11 | async.parallel.bind(async, actions),
|
12 | ], function(err, responses) {
|
13 | if (err) {
|
14 | if (err.body && (/Missing the key/.test(err.body.message) || /Type mismatch for key/.test(err.body.message)))
|
15 | err.body.message = 'The provided key element does not match the schema'
|
16 | return cb(err)
|
17 | }
|
18 | var res = {UnprocessedItems: {}}, tableUnits = {}
|
19 |
|
20 | if (~['TOTAL', 'INDEXES'].indexOf(data.ReturnConsumedCapacity)) {
|
21 | responses[1].forEach(function(action) {
|
22 | var table = action.ConsumedCapacity.TableName
|
23 | if (!tableUnits[table]) tableUnits[table] = 0
|
24 | tableUnits[table] += action.ConsumedCapacity.CapacityUnits
|
25 | })
|
26 | res.ConsumedCapacity = Object.keys(tableUnits).map(function(table) {
|
27 | return {
|
28 | CapacityUnits: tableUnits[table],
|
29 | TableName: table,
|
30 | Table: data.ReturnConsumedCapacity == 'INDEXES' ? {CapacityUnits: tableUnits[table]} : undefined,
|
31 | }
|
32 | })
|
33 | }
|
34 |
|
35 | cb(null, res)
|
36 | })
|
37 |
|
38 | function addTableActions(tableName, cb) {
|
39 | store.getTable(tableName, function(err, table) {
|
40 | if (err) return cb(err)
|
41 |
|
42 | var reqs = data.RequestItems[tableName], i, req, key, seenKeys = {}, options
|
43 |
|
44 | for (i = 0; i < reqs.length; i++) {
|
45 | req = reqs[i]
|
46 |
|
47 | options = {TableName: tableName}
|
48 | if (data.ReturnConsumedCapacity) options.ReturnConsumedCapacity = data.ReturnConsumedCapacity
|
49 |
|
50 | if (req.PutRequest) {
|
51 |
|
52 | if ((err = db.validateItem(req.PutRequest.Item, table)) != null) return cb(err)
|
53 |
|
54 | options.Item = req.PutRequest.Item
|
55 | actions.push(putItem.bind(null, store, options))
|
56 |
|
57 | key = db.createKey(options.Item, table)
|
58 |
|
59 | } else if (req.DeleteRequest) {
|
60 |
|
61 | if ((err = db.validateKey(req.DeleteRequest.Key, table) != null)) return cb(err)
|
62 |
|
63 | options.Key = req.DeleteRequest.Key
|
64 | actions.push(deleteItem.bind(null, store, options))
|
65 |
|
66 | key = db.createKey(options.Key, table)
|
67 | }
|
68 | if (seenKeys[key])
|
69 | return cb(db.validationError('Provided list of item keys contains duplicates'))
|
70 | seenKeys[key] = true
|
71 | }
|
72 |
|
73 | cb()
|
74 | })
|
75 | }
|
76 | }
|