1 | "use strict";
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.DatastoreRequest = void 0;
|
19 | const promisify_1 = require("@google-cloud/promisify");
|
20 | const arrify = require("arrify");
|
21 |
|
22 | const concat = require('concat-stream');
|
23 | const extend = require("extend");
|
24 | const split_array_stream_1 = require("split-array-stream");
|
25 | const stream_1 = require("stream");
|
26 |
|
27 | const streamEvents = require('stream-events');
|
28 |
|
29 | const gapic = Object.freeze({
|
30 | v1: require('./v1'),
|
31 | });
|
32 | const entity_1 = require("./entity");
|
33 | const query_1 = require("./query");
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | const CONSISTENCY_PROTO_CODE = {
|
41 | eventual: 2,
|
42 | strong: 1,
|
43 | };
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 | class DatastoreRequest {
|
54 | |
55 |
|
56 |
|
57 |
|
58 |
|
59 |
|
60 |
|
61 |
|
62 |
|
63 |
|
64 |
|
65 |
|
66 |
|
67 |
|
68 |
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 | static prepareEntityObject_(obj) {
|
77 | const entityObject = extend(true, {}, obj);
|
78 |
|
79 | if (obj[entity_1.entity.KEY_SYMBOL]) {
|
80 | return {
|
81 | key: obj[entity_1.entity.KEY_SYMBOL],
|
82 | data: entityObject,
|
83 | };
|
84 | }
|
85 | return entityObject;
|
86 | }
|
87 | allocateIds(key, options, callback) {
|
88 | if (entity_1.entity.isKeyComplete(key)) {
|
89 | throw new Error('An incomplete key should be provided.');
|
90 | }
|
91 | options = typeof options === 'number' ? { allocations: options } : options;
|
92 | this.request_({
|
93 | client: 'DatastoreClient',
|
94 | method: 'allocateIds',
|
95 | reqOpts: {
|
96 | keys: new Array(options.allocations).fill(entity_1.entity.keyToKeyProto(key)),
|
97 | },
|
98 | gaxOpts: options.gaxOptions,
|
99 | }, (err, resp) => {
|
100 | if (err) {
|
101 | callback(err, null, resp);
|
102 | return;
|
103 | }
|
104 | const keys = arrify(resp.keys).map(entity_1.entity.keyFromKeyProto);
|
105 | callback(null, keys, resp);
|
106 | });
|
107 | }
|
108 | |
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 |
|
128 |
|
129 |
|
130 |
|
131 |
|
132 |
|
133 |
|
134 | createReadStream(keys, options = {}) {
|
135 | keys = arrify(keys).map(entity_1.entity.keyToKeyProto);
|
136 | if (keys.length === 0) {
|
137 | throw new Error('At least one Key object is required.');
|
138 | }
|
139 | const makeRequest = (keys) => {
|
140 | const reqOpts = this.getRequestOptions(options);
|
141 | Object.assign(reqOpts, { keys });
|
142 | this.request_({
|
143 | client: 'DatastoreClient',
|
144 | method: 'lookup',
|
145 | reqOpts,
|
146 | gaxOpts: options.gaxOptions,
|
147 | }, (err, resp) => {
|
148 | if (err) {
|
149 | stream.destroy(err);
|
150 | return;
|
151 | }
|
152 | let entities = [];
|
153 | try {
|
154 | entities = entity_1.entity.formatArray(resp.found, options.wrapNumbers);
|
155 | }
|
156 | catch (err) {
|
157 | stream.destroy(err);
|
158 | return;
|
159 | }
|
160 | const nextKeys = (resp.deferred || [])
|
161 | .map(entity_1.entity.keyFromKeyProto)
|
162 | .map(entity_1.entity.keyToKeyProto);
|
163 | (0, split_array_stream_1.split)(entities, stream).then(streamEnded => {
|
164 | if (streamEnded) {
|
165 | return;
|
166 | }
|
167 | if (nextKeys.length > 0) {
|
168 | makeRequest(nextKeys);
|
169 | return;
|
170 | }
|
171 | stream.push(null);
|
172 | });
|
173 | });
|
174 | };
|
175 | const stream = streamEvents(new stream_1.Transform({ objectMode: true }));
|
176 | stream.once('reading', () => {
|
177 | makeRequest(keys);
|
178 | });
|
179 | return stream;
|
180 | }
|
181 | delete(keys, gaxOptionsOrCallback, cb) {
|
182 | const gaxOptions = typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
|
183 | const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb;
|
184 | const reqOpts = {
|
185 | mutations: arrify(keys).map(key => {
|
186 | return {
|
187 | delete: entity_1.entity.keyToKeyProto(key),
|
188 | };
|
189 | }),
|
190 |
|
191 | };
|
192 | if (this.id) {
|
193 | this.requests_.push(reqOpts);
|
194 | return;
|
195 | }
|
196 | this.request_({
|
197 | client: 'DatastoreClient',
|
198 | method: 'commit',
|
199 | reqOpts,
|
200 | gaxOpts: gaxOptions,
|
201 | }, callback);
|
202 | }
|
203 | get(keys, optionsOrCallback, cb) {
|
204 | const options = typeof optionsOrCallback === 'object' && optionsOrCallback
|
205 | ? optionsOrCallback
|
206 | : {};
|
207 | const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
|
208 | this.createReadStream(keys, options)
|
209 | .on('error', callback)
|
210 | .pipe(concat((results) => {
|
211 | const isSingleLookup = !Array.isArray(keys);
|
212 | callback(null, isSingleLookup ? results[0] : results);
|
213 | }));
|
214 | }
|
215 | runAggregationQuery(query, optionsOrCallback, cb) {
|
216 | const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
|
217 | const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
|
218 | query.query = extend(true, new query_1.Query(), query.query);
|
219 | let queryProto;
|
220 | try {
|
221 | queryProto = entity_1.entity.queryToQueryProto(query.query);
|
222 | }
|
223 | catch (e) {
|
224 |
|
225 |
|
226 | setImmediate(callback, e);
|
227 | return;
|
228 | }
|
229 | const sharedQueryOpts = this.getQueryOptions(query.query, options);
|
230 | const aggregationQueryOptions = {
|
231 | nestedQuery: queryProto,
|
232 | aggregations: query.toProto(),
|
233 | };
|
234 | const reqOpts = Object.assign(sharedQueryOpts, {
|
235 | aggregationQuery: aggregationQueryOptions,
|
236 | });
|
237 | this.request_({
|
238 | client: 'DatastoreClient',
|
239 | method: 'runAggregationQuery',
|
240 | reqOpts,
|
241 | gaxOpts: options.gaxOptions,
|
242 | }, (err, res) => {
|
243 | if (res && res.batch) {
|
244 | const results = res.batch.aggregationResults;
|
245 | const finalResults = results
|
246 | .map((aggregationResult) => aggregationResult.aggregateProperties)
|
247 | .map((aggregateProperties) => Object.fromEntries(new Map(Object.keys(aggregateProperties).map(key => [
|
248 | key,
|
249 | entity_1.entity.decodeValueProto(aggregateProperties[key]),
|
250 | ]))));
|
251 | callback(err, finalResults);
|
252 | }
|
253 | else {
|
254 | callback(err, res);
|
255 | }
|
256 | });
|
257 | }
|
258 | runQuery(query, optionsOrCallback, cb) {
|
259 | const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
|
260 | const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
|
261 | let info;
|
262 | this.runQueryStream(query, options)
|
263 | .on('error', callback)
|
264 | .on('info', info_ => {
|
265 | info = info_;
|
266 | })
|
267 | .pipe(concat((results) => {
|
268 | callback(null, results, info);
|
269 | }));
|
270 | }
|
271 | |
272 |
|
273 |
|
274 |
|
275 |
|
276 |
|
277 |
|
278 |
|
279 |
|
280 |
|
281 |
|
282 |
|
283 |
|
284 |
|
285 |
|
286 |
|
287 |
|
288 |
|
289 |
|
290 |
|
291 |
|
292 |
|
293 |
|
294 |
|
295 |
|
296 |
|
297 |
|
298 |
|
299 |
|
300 |
|
301 |
|
302 |
|
303 |
|
304 | runQueryStream(query, options = {}) {
|
305 | query = extend(true, new query_1.Query(), query);
|
306 | const makeRequest = (query) => {
|
307 | let queryProto;
|
308 | try {
|
309 | queryProto = entity_1.entity.queryToQueryProto(query);
|
310 | }
|
311 | catch (e) {
|
312 |
|
313 |
|
314 | setImmediate(onResultSet, e);
|
315 | return;
|
316 | }
|
317 | const sharedQueryOpts = this.getQueryOptions(query, options);
|
318 | const reqOpts = sharedQueryOpts;
|
319 | reqOpts.query = queryProto;
|
320 | this.request_({
|
321 | client: 'DatastoreClient',
|
322 | method: 'runQuery',
|
323 | reqOpts,
|
324 | gaxOpts: options.gaxOptions,
|
325 | }, onResultSet);
|
326 | };
|
327 | function onResultSet(err, resp) {
|
328 | if (err) {
|
329 | stream.destroy(err);
|
330 | return;
|
331 | }
|
332 | const info = {
|
333 | moreResults: resp.batch.moreResults,
|
334 | };
|
335 | if (resp.batch.endCursor) {
|
336 | info.endCursor = resp.batch.endCursor.toString('base64');
|
337 | }
|
338 | let entities = [];
|
339 | if (resp.batch.entityResults) {
|
340 | try {
|
341 | entities = entity_1.entity.formatArray(resp.batch.entityResults, options.wrapNumbers);
|
342 | }
|
343 | catch (err) {
|
344 | stream.destroy(err);
|
345 | return;
|
346 | }
|
347 | }
|
348 |
|
349 | (0, split_array_stream_1.split)(entities, stream).then(streamEnded => {
|
350 | if (streamEnded) {
|
351 | return;
|
352 | }
|
353 | if (resp.batch.moreResults !== 'NOT_FINISHED') {
|
354 | stream.emit('info', info);
|
355 | stream.push(null);
|
356 | return;
|
357 | }
|
358 |
|
359 | const offset = query.offsetVal === -1 ? 0 : query.offsetVal;
|
360 | query.start(info.endCursor).offset(offset - resp.batch.skippedResults);
|
361 | const limit = query.limitVal;
|
362 | if (limit && limit > -1) {
|
363 | query.limit(limit - resp.batch.entityResults.length);
|
364 | }
|
365 | makeRequest(query);
|
366 | });
|
367 | }
|
368 | const stream = streamEvents(new stream_1.Transform({ objectMode: true }));
|
369 | stream.once('reading', () => {
|
370 | makeRequest(query);
|
371 | });
|
372 | return stream;
|
373 | }
|
374 | getRequestOptions(options) {
|
375 | const sharedQueryOpts = {};
|
376 | if (options.consistency) {
|
377 | const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()];
|
378 | sharedQueryOpts.readOptions = {
|
379 | readConsistency: code,
|
380 | };
|
381 | }
|
382 | if (options.readTime) {
|
383 | if (sharedQueryOpts.readOptions === undefined) {
|
384 | sharedQueryOpts.readOptions = {};
|
385 | }
|
386 | const readTime = options.readTime;
|
387 | const seconds = readTime / 1000;
|
388 | sharedQueryOpts.readOptions.readTime = {
|
389 | seconds: Math.floor(seconds),
|
390 | };
|
391 | }
|
392 | return sharedQueryOpts;
|
393 | }
|
394 | getQueryOptions(query, options = {}) {
|
395 | const sharedQueryOpts = this.getRequestOptions(options);
|
396 | if (query.namespace) {
|
397 | sharedQueryOpts.partitionId = {
|
398 | namespaceId: query.namespace,
|
399 | };
|
400 | }
|
401 | return sharedQueryOpts;
|
402 | }
|
403 | merge(entities, callback) {
|
404 | const transaction = this.datastore.transaction();
|
405 | transaction.run(async (err) => {
|
406 | if (err) {
|
407 | try {
|
408 | await transaction.rollback();
|
409 | }
|
410 | catch (error) {
|
411 |
|
412 |
|
413 |
|
414 | }
|
415 | callback(err);
|
416 | return;
|
417 | }
|
418 | try {
|
419 | await Promise.all(arrify(entities).map(async (objEntity) => {
|
420 | const obj = DatastoreRequest.prepareEntityObject_(objEntity);
|
421 | const [data] = await transaction.get(obj.key);
|
422 | obj.method = 'upsert';
|
423 | obj.data = Object.assign({}, data, obj.data);
|
424 | transaction.save(obj);
|
425 | }));
|
426 | const [response] = await transaction.commit();
|
427 | callback(null, response);
|
428 | }
|
429 | catch (err) {
|
430 | try {
|
431 | await transaction.rollback();
|
432 | }
|
433 | catch (error) {
|
434 |
|
435 |
|
436 |
|
437 | }
|
438 | callback(err);
|
439 | }
|
440 | });
|
441 | }
|
442 | |
443 |
|
444 |
|
445 | prepareGaxRequest_(config, callback) {
|
446 | const datastore = this.datastore;
|
447 | const isTransaction = this.id ? true : false;
|
448 | const method = config.method;
|
449 | const reqOpts = extend(true, {}, config.reqOpts);
|
450 |
|
451 | if (method === 'commit') {
|
452 | if (isTransaction) {
|
453 | reqOpts.mode = 'TRANSACTIONAL';
|
454 | reqOpts.transaction = this.id;
|
455 | }
|
456 | else {
|
457 | reqOpts.mode = 'NON_TRANSACTIONAL';
|
458 | }
|
459 | }
|
460 | if (datastore.options && datastore.options.databaseId) {
|
461 | reqOpts.databaseId = datastore.options.databaseId;
|
462 | }
|
463 | if (method === 'rollback') {
|
464 | reqOpts.transaction = this.id;
|
465 | }
|
466 | if (isTransaction &&
|
467 | (method === 'lookup' ||
|
468 | method === 'runQuery' ||
|
469 | method === 'runAggregationQuery')) {
|
470 | if (reqOpts.readOptions && reqOpts.readOptions.readConsistency) {
|
471 | throw new Error('Read consistency cannot be specified in a transaction.');
|
472 | }
|
473 | reqOpts.readOptions = {
|
474 | transaction: this.id,
|
475 | };
|
476 | }
|
477 | datastore.auth.getProjectId((err, projectId) => {
|
478 | if (err) {
|
479 | callback(err);
|
480 | return;
|
481 | }
|
482 | const clientName = config.client;
|
483 | if (!datastore.clients_.has(clientName)) {
|
484 | datastore.clients_.set(clientName, new gapic.v1[clientName](datastore.options));
|
485 | }
|
486 | const gaxClient = datastore.clients_.get(clientName);
|
487 | reqOpts.projectId = projectId;
|
488 | const gaxOpts = extend(true, {}, config.gaxOpts, {
|
489 | headers: {
|
490 | 'google-cloud-resource-prefix': `projects/${projectId}`,
|
491 | },
|
492 | });
|
493 | const requestFn = gaxClient[method].bind(gaxClient, reqOpts, gaxOpts);
|
494 | callback(null, requestFn);
|
495 | });
|
496 | }
|
497 | request_(config, callback) {
|
498 | this.prepareGaxRequest_(config, (err, requestFn) => {
|
499 | if (err) {
|
500 | callback(err);
|
501 | return;
|
502 | }
|
503 | requestFn(callback);
|
504 | });
|
505 | }
|
506 | |
507 |
|
508 |
|
509 |
|
510 |
|
511 |
|
512 |
|
513 |
|
514 |
|
515 | requestStream_(config) {
|
516 | let gaxStream;
|
517 | const stream = streamEvents(new stream_1.PassThrough({ objectMode: true }));
|
518 | stream.abort = () => {
|
519 | if (gaxStream && gaxStream.cancel) {
|
520 | gaxStream.cancel();
|
521 | }
|
522 | };
|
523 | stream.once('reading', () => {
|
524 | this.prepareGaxRequest_(config, (err, requestFn) => {
|
525 | if (err) {
|
526 | stream.destroy(err);
|
527 | return;
|
528 | }
|
529 | gaxStream = requestFn();
|
530 | gaxStream
|
531 | .on('error', stream.destroy.bind(stream))
|
532 | .on('response', stream.emit.bind(stream, 'response'))
|
533 | .pipe(stream);
|
534 | });
|
535 | });
|
536 | return stream;
|
537 | }
|
538 | }
|
539 | exports.DatastoreRequest = DatastoreRequest;
|
540 |
|
541 |
|
542 |
|
543 |
|
544 |
|
545 | (0, promisify_1.promisifyAll)(DatastoreRequest, {
|
546 | exclude: ['getQueryOptions', 'getRequestOptions'],
|
547 | });
|
548 |
|
\ | No newline at end of file |