UNPKG

10.2 kBJavaScriptView Raw
1const ddb = require("./aws/ddb/internal");
2const Error = require("./Error");
3const utils = require("./utils");
4const Condition = require("./Condition");
5const Document = require("./Document");
6
7// DocumentRetriever is used for both Scan and Query since a lot of the code is shared between the two
8
9const documentRetrieverTypes = [
10 {"type": "scan", "pastTense": "scanned"},
11 {"type": "query", "pastTense": "queried"}
12];
13
14function main(documentRetrieverTypeString) {
15 const documentRetrieverType = documentRetrieverTypes.find((a) => a.type === documentRetrieverTypeString);
16
17 if (!documentRetrieverType) {
18 throw new Error.InvalidType(`The type: ${documentRetrieverTypeString} for setting up a document retriever is invalid.`);
19 }
20
21 function Carrier(model) {
22 let C = class {
23 constructor(object) {
24 this.settings = {};
25 this.settings.limit = null;
26
27 try {
28 this.settings.condition = new Condition(object);
29 } catch (e) {
30 e.message = `${e.message.replace(" is invalid.", "")} is invalid for the ${documentRetrieverType.type} operation.`;
31 throw e;
32 }
33
34 return this;
35 }
36 };
37 Object.entries(Condition.prototype).forEach((prototype) => {
38 const [key, func] = prototype;
39 if (key !== "requestObject") {
40 C.prototype[key] = function(...args) {
41 func.bind(this.settings.condition)(...args);
42 return this;
43 };
44 }
45 });
46 C.prototype[`get${utils.capitalize_first_letter(documentRetrieverType.type)}Request`] = async function() {
47 const object = {
48 ...this.settings.condition.requestObject({"defaultPrefix": "", "conditionString": "FilterExpression", "conditionStringType": "array"}),
49 "TableName": model.name
50 };
51
52 if (this.settings.limit) {
53 object.Limit = this.settings.limit;
54 }
55 if (this.settings.startAt) {
56 object.ExclusiveStartKey = Document.isDynamoObject(this.settings.startAt) ? this.settings.startAt : model.Document.toDynamo(this.settings.startAt);
57 }
58 if (this.settings.attributes) {
59 object.AttributesToGet = this.settings.attributes;
60 }
61 const indexes = await model.schema.getIndexes(model);
62 if (this.settings.index) {
63 object.IndexName = this.settings.index;
64 } else if (documentRetrieverType.type === "query") {
65 const comparisonChart = this.settings.condition.settings.conditions.reduce((res, item) => {
66 res[item[0]] = {"type": item[1].type};
67 return res;
68 }, {});
69 const index = utils.array_flatten(Object.values(indexes)).find((index) => {
70 const {hash/*, range*/} = index.KeySchema.reduce((res, item) => {
71 res[item.KeyType.toLowerCase()] = item.AttributeName;
72 return res;
73 }, {});
74 // TODO: we need to write logic here to prioritize indexes with a range key that is being queried.
75 return (comparisonChart[hash] || {}).type === "EQ"/* && (!range || comparisonChart[range])*/;
76 });
77 if (!index) {
78 if ((comparisonChart[model.schema.getHashKey()] || {}).type !== "EQ") {
79 throw new Error.InvalidParameter("Index can't be found for query.");
80 }
81 } else {
82 object.IndexName = index.IndexName;
83 }
84 }
85 function moveParameterNames(val, prefix) {
86 const entry = Object.entries(object.ExpressionAttributeNames).find((entry) => entry[1] === val);
87 if (!entry) {
88 return;
89 }
90 const [key, value] = entry;
91 const filterExpressionIndex = object.FilterExpression.findIndex((item) => item.includes(key));
92 const filterExpression = object.FilterExpression[filterExpressionIndex];
93 if (filterExpression.includes("attribute_exists") || filterExpression.includes("contains")) {
94 return;
95 }
96 object.ExpressionAttributeNames[`#${prefix}a`] = value;
97 delete object.ExpressionAttributeNames[key];
98
99 const valueKey = key.replace("#a", ":v");
100 Object.keys(object.ExpressionAttributeValues).filter((key) => key.startsWith(valueKey)).forEach((key) => {
101 object.ExpressionAttributeValues[key.replace(new RegExp(":v\\d"), `:${prefix}v`)] = object.ExpressionAttributeValues[key];
102 delete object.ExpressionAttributeValues[key];
103 });
104 const newExpression = filterExpression.replace(key, `#${prefix}a`).replace(new RegExp(valueKey, "g"), `:${prefix}v`);
105
106 object.KeyConditionExpression = `${object.KeyConditionExpression || ""}${object.KeyConditionExpression ? " AND " : ""}${newExpression}`;
107 utils.object.delete(object.FilterExpression, filterExpressionIndex);
108 const previousElementIndex = filterExpressionIndex === 0 ? 0 : filterExpressionIndex - 1;
109 if (object.FilterExpression[previousElementIndex] === "AND") {
110 utils.object.delete(object.FilterExpression, previousElementIndex);
111 }
112 }
113 if (documentRetrieverType.type === "query") {
114 const index = utils.array_flatten(Object.values(indexes)).find((index) => index.IndexName === object.IndexName);
115 if (index) {
116 const {hash, range} = index.KeySchema.reduce((res, item) => {
117 res[item.KeyType.toLowerCase()] = item.AttributeName;
118 return res;
119 }, {});
120
121 moveParameterNames(hash, "qh");
122 if (range) {
123 moveParameterNames(range, "qr");
124 }
125 } else {
126 moveParameterNames(model.schema.getHashKey(), "qh");
127 if (model.schema.getRangeKey()) {
128 moveParameterNames(model.schema.getRangeKey(), "qr");
129 }
130 }
131 }
132 if (this.settings.consistent) {
133 object.ConsistentRead = this.settings.consistent;
134 }
135 if (this.settings.count) {
136 object.Select = "COUNT";
137 }
138 if (this.settings.parallel) {
139 object.TotalSegments = this.settings.parallel;
140 }
141
142 if (object.FilterExpression) {
143 object.FilterExpression = utils.dynamoose.convertConditionArrayRequestObjectToString(object.FilterExpression);
144 }
145 if (object.FilterExpression === "") {
146 delete object.FilterExpression;
147 }
148
149 return object;
150 };
151 C.prototype.exec = function(callback) {
152 let timesRequested = 0;
153 const prepareForReturn = async (result) => {
154 if (Array.isArray(result)) {
155 result = utils.merge_objects(...result);
156 }
157 if (this.settings.count) {
158 return {
159 "count": result.Count,
160 [`${documentRetrieverType.pastTense}Count`]: result[`${utils.capitalize_first_letter(documentRetrieverType.pastTense)}Count`]
161 };
162 }
163 const array = (await Promise.all(result.Items.map(async (item) => await ((new model.Document(item, {"fromDynamo": true})).conformToSchema({"customTypesDynamo": true, "checkExpiredItem": true, "saveUnknown": true, "modifiers": ["get"], "type": "fromDynamo"}))))).filter((a) => Boolean(a));
164 array.lastKey = result.LastEvaluatedKey ? (Array.isArray(result.LastEvaluatedKey) ? result.LastEvaluatedKey.map((key) => model.Document.fromDynamo(key)) : model.Document.fromDynamo(result.LastEvaluatedKey)) : undefined;
165 array.count = result.Count;
166 array[`${documentRetrieverType.pastTense}Count`] = result[`${utils.capitalize_first_letter(documentRetrieverType.pastTense)}Count`];
167 array[`times${utils.capitalize_first_letter(documentRetrieverType.pastTense)}`] = timesRequested;
168 return array;
169 };
170 const promise = model.pendingTaskPromise().then(() => this[`get${utils.capitalize_first_letter(documentRetrieverType.type)}Request`]()).then((request) => {
171 const allRequest = (extraParameters = {}) => {
172 let promise = ddb(documentRetrieverType.type, {...request, ...extraParameters});
173 timesRequested++;
174
175 if (this.settings.all) {
176 promise = promise.then(async (result) => {
177 if (this.settings.all.delay && this.settings.all.delay > 0) {
178 await utils.timeout(this.settings.all.delay);
179 }
180
181 let lastKey = result.LastEvaluatedKey;
182 let requestedTimes = 1;
183 while (lastKey && (this.settings.all.max === 0 || requestedTimes < this.settings.all.max)) {
184 if (this.settings.all.delay && this.settings.all.delay > 0) {
185 await utils.timeout(this.settings.all.delay);
186 }
187
188 const nextRequest = await ddb(documentRetrieverType.type, {...request, ...extraParameters, "ExclusiveStartKey": lastKey});
189 timesRequested++;
190 result = utils.merge_objects(result, nextRequest);
191 // The operation below is safe because right above we are overwriting the entire `result` variable, so there is no chance it'll be reassigned based on an outdated value since it's already been overwritten. There might be a better way to do this than ignoring the rule on the line below.
192 result.LastEvaluatedKey = nextRequest.LastEvaluatedKey; // eslint-disable-line require-atomic-updates
193 lastKey = nextRequest.LastEvaluatedKey;
194 requestedTimes++;
195 }
196
197 return result;
198 });
199 }
200
201 return promise;
202 };
203
204 if (this.settings.parallel) {
205 return Promise.all(new Array(this.settings.parallel).fill(0).map((a, index) => allRequest({"Segment": index})));
206 } else {
207 return allRequest();
208 }
209 });
210
211 // TODO: we do something similar to do this below in other functions as well (ex. get, save), where we allow a callback or a promise, we should figure out a way to make this code more DRY and have a standard way of doing this throughout Dynamoose
212 if (callback) {
213 promise.then((result) => prepareForReturn(result)).then((result) => callback(null, result)).catch((error) => callback(error));
214 } else {
215 return (async () => {
216 const result = await promise;
217 const finalResult = await prepareForReturn(result);
218 return finalResult;
219 })();
220 }
221 };
222 const settings = [
223 "limit",
224 "startAt",
225 "attributes",
226 {"name": "parallel", "only": ["scan"]},
227 {"name": "count", "boolean": true},
228 {"name": "consistent", "boolean": true},
229 {"name": "using", "settingsName": "index"}
230 ];
231 settings.forEach((item) => {
232 if (!item.only || item.only.includes(documentRetrieverType.type)) {
233 C.prototype[item.name || item] = function(value) {
234 const key = item.settingsName || item.name || item;
235 this.settings[key] = item.boolean ? !this.settings[key] : value;
236 return this;
237 };
238 }
239 });
240 C.prototype.all = function(delay = 0, max = 0) {
241 this.settings.all = {delay, max};
242 return this;
243 };
244
245 Object.defineProperty(C, "name", {"value": utils.capitalize_first_letter(documentRetrieverType.type)});
246 return C;
247 }
248
249 return Carrier;
250}
251
252module.exports = main;