UNPKG

20 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google LLC.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.DatastoreRequest = void 0;
19const promisify_1 = require("@google-cloud/promisify");
20const arrify = require("arrify");
21// eslint-disable-next-line @typescript-eslint/no-var-requires
22const concat = require('concat-stream');
23const extend = require("extend");
24const split_array_stream_1 = require("split-array-stream");
25const stream_1 = require("stream");
26// eslint-disable-next-line @typescript-eslint/no-var-requires
27const streamEvents = require('stream-events');
28// Import the clients for each version supported by this package.
29const gapic = Object.freeze({
30 v1: require('./v1'),
31});
32const entity_1 = require("./entity");
33const query_1 = require("./query");
34/**
35 * A map of read consistency values to proto codes.
36 *
37 * @type {object}
38 * @private
39 */
40const CONSISTENCY_PROTO_CODE = {
41 eventual: 2,
42 strong: 1,
43};
44/**
45 * Handle logic for Datastore API operations. Handles request logic for
46 * Datastore.
47 *
48 * Creates requests to the Datastore endpoint. Designed to be inherited by
49 * the {@link Datastore} and {@link Transaction} classes.
50 *
51 * @class
52 */
53class DatastoreRequest {
54 /**
55 * Format a user's input to mutation methods. This will create a deep clone of
56 * the input, as well as allow users to pass an object in the format of an
57 * entity.
58 *
59 * Both of the following formats can be supplied supported:
60 *
61 * datastore.save({
62 * key: datastore.key('Kind'),
63 * data: { foo: 'bar' }
64 * }, (err) => {})
65 *
66 * const entity = { foo: 'bar' }
67 * entity[datastore.KEY] = datastore.key('Kind')
68 * datastore.save(entity, (err) => {})
69 *
70 * @internal
71 *
72 * @see {@link https://github.com/GoogleCloudPlatform/google-cloud-node/issues/1803}
73 *
74 * @param {object} obj The user's input object.
75 */
76 static prepareEntityObject_(obj) {
77 const entityObject = extend(true, {}, obj);
78 // Entity objects are also supported.
79 if (obj[entity_1.entity.KEY_SYMBOL]) {
80 return {
81 key: obj[entity_1.entity.KEY_SYMBOL],
82 data: entityObject,
83 };
84 }
85 return entityObject;
86 }
87 allocateIds(key, options, callback) {
88 if (entity_1.entity.isKeyComplete(key)) {
89 throw new Error('An incomplete key should be provided.');
90 }
91 options = typeof options === 'number' ? { allocations: options } : options;
92 this.request_({
93 client: 'DatastoreClient',
94 method: 'allocateIds',
95 reqOpts: {
96 keys: new Array(options.allocations).fill(entity_1.entity.keyToKeyProto(key)),
97 },
98 gaxOpts: options.gaxOptions,
99 }, (err, resp) => {
100 if (err) {
101 callback(err, null, resp);
102 return;
103 }
104 const keys = arrify(resp.keys).map(entity_1.entity.keyFromKeyProto);
105 callback(null, keys, resp);
106 });
107 }
108 /**
109 * Retrieve the entities as a readable object stream.
110 *
111 * @throws {Error} If at least one Key object is not provided.
112 *
113 * @param {Key|Key[]} keys Datastore key object(s).
114 * @param {object} [options] Optional configuration. See {@link Datastore#get}
115 * for a complete list of options.
116 *
117 * @example
118 * ```
119 * const keys = [
120 * datastore.key(['Company', 123]),
121 * datastore.key(['Product', 'Computer'])
122 * ];
123 *
124 * datastore.createReadStream(keys)
125 * .on('error', (err) => {})
126 * .on('data', (entity) => {
127 * // entity is an entity object.
128 * })
129 * .on('end', () => {
130 * // All entities retrieved.
131 * });
132 * ```
133 */
134 createReadStream(keys, options = {}) {
135 keys = arrify(keys).map(entity_1.entity.keyToKeyProto);
136 if (keys.length === 0) {
137 throw new Error('At least one Key object is required.');
138 }
139 const makeRequest = (keys) => {
140 const reqOpts = this.getRequestOptions(options);
141 Object.assign(reqOpts, { keys });
142 this.request_({
143 client: 'DatastoreClient',
144 method: 'lookup',
145 reqOpts,
146 gaxOpts: options.gaxOptions,
147 }, (err, resp) => {
148 if (err) {
149 stream.destroy(err);
150 return;
151 }
152 let entities = [];
153 try {
154 entities = entity_1.entity.formatArray(resp.found, options.wrapNumbers);
155 }
156 catch (err) {
157 stream.destroy(err);
158 return;
159 }
160 const nextKeys = (resp.deferred || [])
161 .map(entity_1.entity.keyFromKeyProto)
162 .map(entity_1.entity.keyToKeyProto);
163 (0, split_array_stream_1.split)(entities, stream).then(streamEnded => {
164 if (streamEnded) {
165 return;
166 }
167 if (nextKeys.length > 0) {
168 makeRequest(nextKeys);
169 return;
170 }
171 stream.push(null);
172 });
173 });
174 };
175 const stream = streamEvents(new stream_1.Transform({ objectMode: true }));
176 stream.once('reading', () => {
177 makeRequest(keys);
178 });
179 return stream;
180 }
181 delete(keys, gaxOptionsOrCallback, cb) {
182 const gaxOptions = typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
183 const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb;
184 const reqOpts = {
185 mutations: arrify(keys).map(key => {
186 return {
187 delete: entity_1.entity.keyToKeyProto(key),
188 };
189 }),
190 // eslint-disable-next-line @typescript-eslint/no-explicit-any
191 };
192 if (this.id) {
193 this.requests_.push(reqOpts);
194 return;
195 }
196 this.request_({
197 client: 'DatastoreClient',
198 method: 'commit',
199 reqOpts,
200 gaxOpts: gaxOptions,
201 }, callback);
202 }
203 get(keys, optionsOrCallback, cb) {
204 const options = typeof optionsOrCallback === 'object' && optionsOrCallback
205 ? optionsOrCallback
206 : {};
207 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
208 this.createReadStream(keys, options)
209 .on('error', callback)
210 .pipe(concat((results) => {
211 const isSingleLookup = !Array.isArray(keys);
212 callback(null, isSingleLookup ? results[0] : results);
213 }));
214 }
215 runAggregationQuery(query, optionsOrCallback, cb) {
216 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
217 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
218 query.query = extend(true, new query_1.Query(), query.query);
219 let queryProto;
220 try {
221 queryProto = entity_1.entity.queryToQueryProto(query.query);
222 }
223 catch (e) {
224 // using setImmediate here to make sure this doesn't throw a
225 // synchronous error
226 setImmediate(callback, e);
227 return;
228 }
229 const sharedQueryOpts = this.getQueryOptions(query.query, options);
230 const aggregationQueryOptions = {
231 nestedQuery: queryProto,
232 aggregations: query.toProto(),
233 };
234 const reqOpts = Object.assign(sharedQueryOpts, {
235 aggregationQuery: aggregationQueryOptions,
236 });
237 this.request_({
238 client: 'DatastoreClient',
239 method: 'runAggregationQuery',
240 reqOpts,
241 gaxOpts: options.gaxOptions,
242 }, (err, res) => {
243 if (res && res.batch) {
244 const results = res.batch.aggregationResults;
245 const finalResults = results
246 .map((aggregationResult) => aggregationResult.aggregateProperties)
247 .map((aggregateProperties) => Object.fromEntries(new Map(Object.keys(aggregateProperties).map(key => [
248 key,
249 entity_1.entity.decodeValueProto(aggregateProperties[key]),
250 ]))));
251 callback(err, finalResults);
252 }
253 else {
254 callback(err, res);
255 }
256 });
257 }
258 runQuery(query, optionsOrCallback, cb) {
259 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
260 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
261 let info;
262 this.runQueryStream(query, options)
263 .on('error', callback)
264 .on('info', info_ => {
265 info = info_;
266 })
267 .pipe(concat((results) => {
268 callback(null, results, info);
269 }));
270 }
271 /**
272 * Get a list of entities as a readable object stream.
273 *
274 * See {@link Datastore#runQuery} for a list of all available options.
275 *
276 * @param {Query} query Query object.
277 * @param {object} [options] Optional configuration.
278 * @param {object} [options.gaxOptions] Request configuration options, outlined
279 * here: https://googleapis.github.io/gax-nodejs/global.html#CallOptions.
280 *
281 * @example
282 * ```
283 * datastore.runQueryStream(query)
284 * .on('error', console.error)
285 * .on('data', (entity) => {
286 * // Access the Key object for this entity.
287 * const key = entity[datastore.KEY];
288 * })
289 * .on('info', (info) => {})
290 * .on('end', () => {
291 * // All entities retrieved.
292 * });
293 *
294 * //-
295 * // If you anticipate many results, you can end a stream early to prevent
296 * // unnecessary processing and API requests.
297 * //-
298 * datastore.runQueryStream(query)
299 * .on('data', (entity) => {
300 * this.end();
301 * });
302 * ```
303 */
304 runQueryStream(query, options = {}) {
305 query = extend(true, new query_1.Query(), query);
306 const makeRequest = (query) => {
307 let queryProto;
308 try {
309 queryProto = entity_1.entity.queryToQueryProto(query);
310 }
311 catch (e) {
312 // using setImmediate here to make sure this doesn't throw a
313 // synchronous error
314 setImmediate(onResultSet, e);
315 return;
316 }
317 const sharedQueryOpts = this.getQueryOptions(query, options);
318 const reqOpts = sharedQueryOpts;
319 reqOpts.query = queryProto;
320 this.request_({
321 client: 'DatastoreClient',
322 method: 'runQuery',
323 reqOpts,
324 gaxOpts: options.gaxOptions,
325 }, onResultSet);
326 };
327 function onResultSet(err, resp) {
328 if (err) {
329 stream.destroy(err);
330 return;
331 }
332 const info = {
333 moreResults: resp.batch.moreResults,
334 };
335 if (resp.batch.endCursor) {
336 info.endCursor = resp.batch.endCursor.toString('base64');
337 }
338 let entities = [];
339 if (resp.batch.entityResults) {
340 try {
341 entities = entity_1.entity.formatArray(resp.batch.entityResults, options.wrapNumbers);
342 }
343 catch (err) {
344 stream.destroy(err);
345 return;
346 }
347 }
348 // Emit each result right away, then get the rest if necessary.
349 (0, split_array_stream_1.split)(entities, stream).then(streamEnded => {
350 if (streamEnded) {
351 return;
352 }
353 if (resp.batch.moreResults !== 'NOT_FINISHED') {
354 stream.emit('info', info);
355 stream.push(null);
356 return;
357 }
358 // The query is "NOT_FINISHED". Get the rest of the results.
359 const offset = query.offsetVal === -1 ? 0 : query.offsetVal;
360 query.start(info.endCursor).offset(offset - resp.batch.skippedResults);
361 const limit = query.limitVal;
362 if (limit && limit > -1) {
363 query.limit(limit - resp.batch.entityResults.length);
364 }
365 makeRequest(query);
366 });
367 }
368 const stream = streamEvents(new stream_1.Transform({ objectMode: true }));
369 stream.once('reading', () => {
370 makeRequest(query);
371 });
372 return stream;
373 }
374 getRequestOptions(options) {
375 const sharedQueryOpts = {};
376 if (options.consistency) {
377 const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()];
378 sharedQueryOpts.readOptions = {
379 readConsistency: code,
380 };
381 }
382 if (options.readTime) {
383 if (sharedQueryOpts.readOptions === undefined) {
384 sharedQueryOpts.readOptions = {};
385 }
386 const readTime = options.readTime;
387 const seconds = readTime / 1000;
388 sharedQueryOpts.readOptions.readTime = {
389 seconds: Math.floor(seconds),
390 };
391 }
392 return sharedQueryOpts;
393 }
394 getQueryOptions(query, options = {}) {
395 const sharedQueryOpts = this.getRequestOptions(options);
396 if (query.namespace) {
397 sharedQueryOpts.partitionId = {
398 namespaceId: query.namespace,
399 };
400 }
401 return sharedQueryOpts;
402 }
403 merge(entities, callback) {
404 const transaction = this.datastore.transaction();
405 transaction.run(async (err) => {
406 if (err) {
407 try {
408 await transaction.rollback();
409 }
410 catch (error) {
411 // Provide the error & API response from the failed run to the user.
412 // Even a failed rollback should be transparent.
413 // RE: https://github.com/GoogleCloudPlatform/gcloud-node/pull/1369#discussion_r66833976
414 }
415 callback(err);
416 return;
417 }
418 try {
419 await Promise.all(arrify(entities).map(async (objEntity) => {
420 const obj = DatastoreRequest.prepareEntityObject_(objEntity);
421 const [data] = await transaction.get(obj.key);
422 obj.method = 'upsert';
423 obj.data = Object.assign({}, data, obj.data);
424 transaction.save(obj);
425 }));
426 const [response] = await transaction.commit();
427 callback(null, response);
428 }
429 catch (err) {
430 try {
431 await transaction.rollback();
432 }
433 catch (error) {
434 // Provide the error & API response from the failed commit to the user.
435 // Even a failed rollback should be transparent.
436 // RE: https://github.com/GoogleCloudPlatform/gcloud-node/pull/1369#discussion_r66833976
437 }
438 callback(err);
439 }
440 });
441 }
442 /**
443 * @private
444 */
445 prepareGaxRequest_(config, callback) {
446 const datastore = this.datastore;
447 const isTransaction = this.id ? true : false;
448 const method = config.method;
449 const reqOpts = extend(true, {}, config.reqOpts);
450 // Set properties to indicate if we're in a transaction or not.
451 if (method === 'commit') {
452 if (isTransaction) {
453 reqOpts.mode = 'TRANSACTIONAL';
454 reqOpts.transaction = this.id;
455 }
456 else {
457 reqOpts.mode = 'NON_TRANSACTIONAL';
458 }
459 }
460 if (datastore.options && datastore.options.databaseId) {
461 reqOpts.databaseId = datastore.options.databaseId;
462 }
463 if (method === 'rollback') {
464 reqOpts.transaction = this.id;
465 }
466 if (isTransaction &&
467 (method === 'lookup' ||
468 method === 'runQuery' ||
469 method === 'runAggregationQuery')) {
470 if (reqOpts.readOptions && reqOpts.readOptions.readConsistency) {
471 throw new Error('Read consistency cannot be specified in a transaction.');
472 }
473 reqOpts.readOptions = {
474 transaction: this.id,
475 };
476 }
477 datastore.auth.getProjectId((err, projectId) => {
478 if (err) {
479 callback(err);
480 return;
481 }
482 const clientName = config.client;
483 if (!datastore.clients_.has(clientName)) {
484 datastore.clients_.set(clientName, new gapic.v1[clientName](datastore.options));
485 }
486 const gaxClient = datastore.clients_.get(clientName);
487 reqOpts.projectId = projectId;
488 const gaxOpts = extend(true, {}, config.gaxOpts, {
489 headers: {
490 'google-cloud-resource-prefix': `projects/${projectId}`,
491 },
492 });
493 const requestFn = gaxClient[method].bind(gaxClient, reqOpts, gaxOpts);
494 callback(null, requestFn);
495 });
496 }
497 request_(config, callback) {
498 this.prepareGaxRequest_(config, (err, requestFn) => {
499 if (err) {
500 callback(err);
501 return;
502 }
503 requestFn(callback);
504 });
505 }
506 /**
507 * Make a request as a stream.
508 *
509 * @param {object} config Configuration object.
510 * @param {object} config.gaxOpts GAX options.
511 * @param {string} config.client The name of the gax client.
512 * @param {string} config.method The gax method to call.
513 * @param {object} config.reqOpts Request options.
514 */
515 requestStream_(config) {
516 let gaxStream;
517 const stream = streamEvents(new stream_1.PassThrough({ objectMode: true }));
518 stream.abort = () => {
519 if (gaxStream && gaxStream.cancel) {
520 gaxStream.cancel();
521 }
522 };
523 stream.once('reading', () => {
524 this.prepareGaxRequest_(config, (err, requestFn) => {
525 if (err) {
526 stream.destroy(err);
527 return;
528 }
529 gaxStream = requestFn();
530 gaxStream
531 .on('error', stream.destroy.bind(stream))
532 .on('response', stream.emit.bind(stream, 'response'))
533 .pipe(stream);
534 });
535 });
536 return stream;
537 }
538}
539exports.DatastoreRequest = DatastoreRequest;
540/*! Developer Documentation
541 *
542 * All async methods (except for streams) will return a Promise in the event
543 * that a callback is omitted.
544 */
545(0, promisify_1.promisifyAll)(DatastoreRequest, {
546 exclude: ['getQueryOptions', 'getRequestOptions'],
547});
548//# sourceMappingURL=request.js.map
\No newline at end of file