UNPKG

45.6 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Table = void 0;
19const common_1 = require("@google-cloud/common");
20const paginator_1 = require("@google-cloud/paginator");
21const promisify_1 = require("@google-cloud/promisify");
22const arrify = require("arrify");
23const big_js_1 = require("big.js");
24const extend = require("extend");
25const p_event_1 = require("p-event");
26const fs = require("fs");
27const is = require("is");
28const path = require("path");
29const streamEvents = require("stream-events");
30const uuid = require("uuid");
31const _1 = require(".");
32const stream_1 = require("stream");
33const rowQueue_1 = require("./rowQueue");
34// eslint-disable-next-line @typescript-eslint/no-var-requires
35const duplexify = require('duplexify');
36/**
37 * The file formats accepted by BigQuery.
38 *
39 * @type {object}
40 * @private
41 */
42const FORMATS = {
43 avro: 'AVRO',
44 csv: 'CSV',
45 export_metadata: 'DATASTORE_BACKUP',
46 json: 'NEWLINE_DELIMITED_JSON',
47 orc: 'ORC',
48 parquet: 'PARQUET',
49};
50/**
51 * Table objects are returned by methods such as
52 * {@link Dataset#table}, {@link Dataset#createTable}, and
53 * {@link Dataset#getTables}.
54 *
55 * @class
56 * @param {Dataset} dataset {@link Dataset} instance.
57 * @param {string} id The ID of the table.
58 * @param {object} [options] Table options.
59 * @param {string} [options.location] The geographic location of the table, by
60 * default this value is inherited from the dataset. This can be used to
61 * configure the location of all jobs created through a table instance. It
62 * cannot be used to set the actual location of the table. This value will
63 * be superseded by any API responses containing location data for the
64 * table.
65 *
66 * @example
67 * ```
68 * const {BigQuery} = require('@google-cloud/bigquery');
69 * const bigquery = new BigQuery();
70 * const dataset = bigquery.dataset('my-dataset');
71 *
72 * const table = dataset.table('my-table');
73 * ```
74 */
75class Table extends common_1.ServiceObject {
76 constructor(dataset, id, options) {
77 const methods = {
78 /**
79 * @callback CreateTableCallback
80 * @param {?Error} err Request error, if any.
81 * @param {Table} table The table.
82 * @param {object} apiResponse The full API response body.
83 */
84 /**
85 * @typedef {array} CreateTableResponse
86 * @property {Table} 0 The table.
87 * @property {object} 1 The full API response body.
88 */
89 /**
90 * Create a table.
91 *
92 * @method Table#create
93 * @param {object} [options] See {@link Dataset#createTable}.
94 * @param {CreateTableCallback} [callback]
95 * @param {?error} callback.err An error returned while making this
96 * request.
97 * @param {Table} callback.table The new {@link Table}.
98 * @param {object} callback.apiResponse The full API response.
99 * @returns {Promise<CreateTableResponse>}
100 *
101 * @example
102 * ```
103 * const {BigQuery} = require('@google-cloud/bigquery');
104 * const bigquery = new BigQuery();
105 * const dataset = bigquery.dataset('my-dataset');
106 *
107 * const table = dataset.table('my-table');
108 *
109 * table.create((err, table, apiResponse) => {
110 * if (!err) {
111 * // The table was created successfully.
112 * }
113 * });
114 *
115 * //-
116 * // If the callback is omitted, we'll return a Promise.
117 * //-
118 * table.create().then((data) => {
119 * const table = data[0];
120 * const apiResponse = data[1];
121 * });
122 * ```
123 */
124 create: true,
125 /**
126 * @callback DeleteTableCallback
127 * @param {?Error} err Request error, if any.
128 * @param {object} apiResponse The full API response.
129 */
130 /**
131 * @typedef {array} DeleteTableResponse
132 * @property {object} 0 The full API response.
133 */
134 /**
135 * Delete a table and all its data.
136 *
137 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tables/delete| Tables: delete API Documentation}
138 *
139 * @method Table#delete
140 * @param {DeleteTableCallback} [callback]
141 * @param {?error} callback.err An error returned while making this
142 * request.
143 * @param {object} callback.apiResponse The full API response.
144 * @returns {Promise<DeleteTableResponse>}
145 *
146 * @example
147 * ```
148 * const {BigQuery} = require('@google-cloud/bigquery');
149 * const bigquery = new BigQuery();
150 * const dataset = bigquery.dataset('my-dataset');
151 *
152 * const table = dataset.table('my-table');
153 *
154 * table.delete((err, apiResponse) => {});
155 *
156 * //-
157 * // If the callback is omitted, we'll return a Promise.
158 * //-
159 * table.delete().then((data) => {
160 * const apiResponse = data[0];
161 * });
162 * ```
163 */
164 delete: true,
165 /**
166 * @callback TableExistsCallback
167 * @param {?Error} err Request error, if any.
168 * @param {boolean} exists Indicates if the table exists.
169 */
170 /**
171 * @typedef {array} TableExistsCallback
172 * @property {boolean} 0 Indicates if the table exists.
173 */
174 /**
175 * Check if the table exists.
176 *
177 * @method Table#exists
178 * @param {TableExistsCallback} [callback]
179 * @param {?error} callback.err An error returned while making this
180 * request.
181 * @param {boolean} callback.exists Whether the table exists or not.
182 * @returns {Promise<TableExistsCallback>}
183 *
184 * @example
185 * ```
186 * const {BigQuery} = require('@google-cloud/bigquery');
187 * const bigquery = new BigQuery();
188 * const dataset = bigquery.dataset('my-dataset');
189 *
190 * const table = dataset.table('my-table');
191 *
192 * table.exists((err, exists) => {});
193 *
194 * //-
195 * // If the callback is omitted, we'll return a Promise.
196 * //-
197 * table.exists().then((data) => {
198 * const exists = data[0];
199 * });
200 * ```
201 */
202 exists: true,
203 /**
204 * @callback GetTableCallback
205 * @param {?Error} err Request error, if any.
206 * @param {Table} table The table.
207 * @param {object} apiResponse The full API response body.
208 */
209 /**
210 * @typedef {array} GetTableResponse
211 * @property {Table} 0 The table.
212 * @property {object} 1 The full API response body.
213 */
214 /**
215 * Get a table if it exists.
216 *
217 * You may optionally use this to "get or create" an object by providing
218 * an object with `autoCreate` set to `true`. Any extra configuration that
219 * is normally required for the `create` method must be contained within
220 * this object as well.
221 *
222 * @method Table#get
223 * @param {options} [options] Configuration object.
224 * @param {boolean} [options.autoCreate=false] Automatically create the
225 * object if it does not exist.
226 * @param {function} [callback]
227 * @param {?error} callback.err An error returned while making this
228 * request.
229 * @param {Table} callback.table The {@link Table}.
230 * @param {object} callback.apiResponse The full API response.
231 * @returns {Promise<GetTableResponse>}
232 *
233 * @example
234 * ```
235 * const {BigQuery} = require('@google-cloud/bigquery');
236 * const bigquery = new BigQuery();
237 * const dataset = bigquery.dataset('my-dataset');
238 *
239 * const table = dataset.table('my-table');
240 *
241 * table.get((err, table, apiResponse) => {
242 * // `table.metadata` has been populated.
243 * });
244 *
245 * //-
246 * // If the callback is omitted, we'll return a Promise.
247 * //-
248 * table.get().then((data) => {
249 * const table = data[0];
250 * const apiResponse = data[1];
251 * });
252 * ```
253 */
254 get: true,
255 /**
256 * @callback GetTableMetadataCallback
257 * @param {?Error} err Request error, if any.
258 * @param {object} metadata The table metadata.
259 * @param {object} apiResponse The full API response.
260 */
261 /**
262 * @typedef {array} GetTableMetadataResponse
263 * @property {object} 0 The table metadata.
264 * @property {object} 1 The full API response.
265 */
266 /**
267 * Return the metadata associated with the Table.
268 *
269 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tables/get| Tables: get API Documentation}
270 *
271 * @method Table#getMetadata
272 * @param {GetTableMetadataCallback} [callback] The callback function.
273 * @param {?error} callback.err An error returned while making this
274 * request.
275 * @param {object} callback.metadata The metadata of the Table.
276 * @param {object} callback.apiResponse The full API response.
277 * @returns {Promise<GetTableMetadataResponse>}
278 *
279 * @example
280 * ```
281 * const {BigQuery} = require('@google-cloud/bigquery');
282 * const bigquery = new BigQuery();
283 * const dataset = bigquery.dataset('my-dataset');
284 *
285 * const table = dataset.table('my-table');
286 *
287 * table.getMetadata((err, metadata, apiResponse) => {});
288 *
289 * //-
290 * // If the callback is omitted, we'll return a Promise.
291 * //-
292 * table.getMetadata().then((data) => {
293 * const metadata = data[0];
294 * const apiResponse = data[1];
295 * });
296 * ```
297 */
298 getMetadata: true,
299 };
300 super({
301 parent: dataset,
302 baseUrl: '/tables',
303 id,
304 createMethod: dataset.createTable.bind(dataset),
305 methods,
306 });
307 if (options && options.location) {
308 this.location = options.location;
309 }
310 this.bigQuery = dataset.bigQuery;
311 this.dataset = dataset;
312 // Catch all for read-modify-write cycle
313 // https://cloud.google.com/bigquery/docs/api-performance#read-patch-write
314 this.interceptors.push({
315 request: (reqOpts) => {
316 if (reqOpts.method === 'PATCH' && reqOpts.json.etag) {
317 reqOpts.headers = reqOpts.headers || {};
318 reqOpts.headers['If-Match'] = reqOpts.json.etag;
319 }
320 return reqOpts;
321 },
322 });
323 /**
324 * Create a readable stream of the rows of data in your table. This method
325 * is simply a wrapper around {@link Table#getRows}.
326 *
327 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list| Tabledata: list API Documentation}
328 *
329 * @returns {ReadableStream}
330 *
331 * @example
332 * ```
333 * const {BigQuery} = require('@google-cloud/bigquery');
334 * const bigquery = new BigQuery();
335 * const dataset = bigquery.dataset('my-dataset');
336 * const table = dataset.table('my-table');
337 *
338 * table.createReadStream(options)
339 * .on('error', console.error)
340 * .on('data', row => {})
341 * .on('end', function() {
342 * // All rows have been retrieved.
343 * });
344 *
345 * //-
346 * // If you anticipate many results, you can end a stream early to prevent
347 * // unnecessary processing and API requests.
348 * //-
349 * table.createReadStream()
350 * .on('data', function(row) {
351 * this.end();
352 * });
353 * ```
354 */
355 this.createReadStream = paginator_1.paginator.streamify('getRows');
356 }
357 createReadStream(options) {
358 // placeholder body, overwritten in constructor
359 return new paginator_1.ResourceStream({}, () => { });
360 }
361 /**
362 * Convert a comma-separated name:type string to a table schema object.
363 *
364 * @static
365 * @private
366 *
367 * @param {string} str Comma-separated schema string.
368 * @returns {object} Table schema in the format the API expects.
369 */
370 static createSchemaFromString_(str) {
371 return str.split(/\s*,\s*/).reduce((acc, pair) => {
372 acc.fields.push({
373 name: pair.split(':')[0].trim(),
374 type: (pair.split(':')[1] || 'STRING').toUpperCase().trim(),
375 });
376 return acc;
377 }, {
378 fields: [],
379 });
380 }
381 /**
382 * Convert a row entry from native types to their encoded types that the API
383 * expects.
384 *
385 * @static
386 * @private
387 *
388 * @param {*} value The value to be converted.
389 * @returns {*} The converted value.
390 */
391 static encodeValue_(value) {
392 if (typeof value === 'undefined' || value === null) {
393 return null;
394 }
395 if (value instanceof Buffer) {
396 return value.toString('base64');
397 }
398 if (value instanceof big_js_1.default) {
399 return value.toFixed();
400 }
401 const customTypeConstructorNames = [
402 'BigQueryDate',
403 'BigQueryDatetime',
404 'BigQueryInt',
405 'BigQueryTime',
406 'BigQueryTimestamp',
407 'Geography',
408 ];
409 const constructorName = value.constructor.name;
410 const isCustomType = customTypeConstructorNames.indexOf(constructorName) > -1;
411 if (isCustomType) {
412 return value.value;
413 }
414 if (is.date(value)) {
415 return value.toJSON();
416 }
417 if (is.array(value)) {
418 return value.map(Table.encodeValue_);
419 }
420 if (typeof value === 'object') {
421 return Object.keys(value).reduce((acc, key) => {
422 acc[key] = Table.encodeValue_(value[key]);
423 return acc;
424 }, {});
425 }
426 return value;
427 }
428 /**
429 * @private
430 */
431 static formatMetadata_(options) {
432 const body = extend(true, {}, options);
433 if (options.name) {
434 body.friendlyName = options.name;
435 delete body.name;
436 }
437 if (is.string(options.schema)) {
438 body.schema = Table.createSchemaFromString_(options.schema);
439 }
440 if (is.array(options.schema)) {
441 body.schema = {
442 fields: options.schema,
443 };
444 }
445 if (body.schema && body.schema.fields) {
446 body.schema.fields = body.schema.fields.map(field => {
447 if (field.fields) {
448 field.type = 'RECORD';
449 }
450 return field;
451 });
452 }
453 if (is.string(options.partitioning)) {
454 body.timePartitioning = {
455 type: options.partitioning.toUpperCase(),
456 };
457 delete body.partitioning;
458 }
459 if (is.string(options.view)) {
460 body.view = {
461 query: options.view,
462 useLegacySql: false,
463 };
464 }
465 return body;
466 }
467 copy(destination, metadataOrCallback, cb) {
468 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
469 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
470 this.createCopyJob(destination, metadata, (err, job, resp) => {
471 if (err) {
472 callback(err, resp);
473 return;
474 }
475 job.on('error', callback).on('complete', (metadata) => {
476 callback(null, metadata);
477 });
478 });
479 }
480 copyFrom(sourceTables, metadataOrCallback, cb) {
481 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
482 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
483 this.createCopyFromJob(sourceTables, metadata, (err, job, resp) => {
484 if (err) {
485 callback(err, resp);
486 return;
487 }
488 job.on('error', callback).on('complete', metadata => {
489 callback(null, metadata);
490 });
491 });
492 }
493 createCopyJob(destination, metadataOrCallback, cb) {
494 if (!(destination instanceof Table)) {
495 throw new Error('Destination must be a Table object.');
496 }
497 const metadata = typeof metadataOrCallback === 'object'
498 ? metadataOrCallback
499 : {};
500 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
501 // eslint-disable-next-line @typescript-eslint/no-explicit-any
502 const body = {
503 configuration: {
504 copy: extend(true, metadata, {
505 destinationTable: {
506 datasetId: destination.dataset.id,
507 projectId: destination.bigQuery.projectId,
508 tableId: destination.id,
509 },
510 sourceTable: {
511 datasetId: this.dataset.id,
512 projectId: this.bigQuery.projectId,
513 tableId: this.id,
514 },
515 }),
516 },
517 };
518 if (metadata.jobPrefix) {
519 body.jobPrefix = metadata.jobPrefix;
520 delete metadata.jobPrefix;
521 }
522 if (this.location) {
523 body.location = this.location;
524 }
525 if (metadata.jobId) {
526 body.jobId = metadata.jobId;
527 delete metadata.jobId;
528 }
529 this.bigQuery.createJob(body, callback);
530 }
531 createCopyFromJob(source, metadataOrCallback, cb) {
532 const sourceTables = arrify(source);
533 sourceTables.forEach(sourceTable => {
534 if (!(sourceTable instanceof Table)) {
535 throw new Error('Source must be a Table object.');
536 }
537 });
538 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
539 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
540 // eslint-disable-next-line @typescript-eslint/no-explicit-any
541 const body = {
542 configuration: {
543 copy: extend(true, metadata, {
544 destinationTable: {
545 datasetId: this.dataset.id,
546 projectId: this.bigQuery.projectId,
547 tableId: this.id,
548 },
549 sourceTables: sourceTables.map(sourceTable => {
550 return {
551 datasetId: sourceTable.dataset.id,
552 projectId: sourceTable.bigQuery.projectId,
553 tableId: sourceTable.id,
554 };
555 }),
556 }),
557 },
558 };
559 if (metadata.jobPrefix) {
560 body.jobPrefix = metadata.jobPrefix;
561 delete metadata.jobPrefix;
562 }
563 if (this.location) {
564 body.location = this.location;
565 }
566 if (metadata.jobId) {
567 body.jobId = metadata.jobId;
568 delete metadata.jobId;
569 }
570 this.bigQuery.createJob(body, callback);
571 }
572 createExtractJob(destination, optionsOrCallback, cb) {
573 let options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
574 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
575 options = extend(true, options, {
576 destinationUris: arrify(destination).map(dest => {
577 if (!common_1.util.isCustomType(dest, 'storage/file')) {
578 throw new Error('Destination must be a File object.');
579 }
580 // If no explicit format was provided, attempt to find a match from the
581 // file's extension. If no match, don't set, and default upstream to
582 // CSV.
583 const format = path
584 .extname(dest.name)
585 .substr(1)
586 .toLowerCase();
587 if (!options.destinationFormat && !options.format && FORMATS[format]) {
588 options.destinationFormat = FORMATS[format];
589 }
590 return 'gs://' + dest.bucket.name + '/' + dest.name;
591 }),
592 });
593 if (options.format) {
594 options.format = options.format.toLowerCase();
595 if (FORMATS[options.format]) {
596 options.destinationFormat = FORMATS[options.format];
597 delete options.format;
598 }
599 else {
600 throw new Error('Destination format not recognized: ' + options.format);
601 }
602 }
603 if (options.gzip) {
604 options.compression = 'GZIP';
605 delete options.gzip;
606 }
607 // eslint-disable-next-line @typescript-eslint/no-explicit-any
608 const body = {
609 configuration: {
610 extract: extend(true, options, {
611 sourceTable: {
612 datasetId: this.dataset.id,
613 projectId: this.bigQuery.projectId,
614 tableId: this.id,
615 },
616 }),
617 },
618 };
619 if (options.jobPrefix) {
620 body.jobPrefix = options.jobPrefix;
621 delete options.jobPrefix;
622 }
623 if (this.location) {
624 body.location = this.location;
625 }
626 if (options.jobId) {
627 body.jobId = options.jobId;
628 delete options.jobId;
629 }
630 this.bigQuery.createJob(body, callback);
631 }
632 createLoadJob(source, metadataOrCallback, cb) {
633 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
634 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
635 this._createLoadJob(source, metadata).then(([resp]) => callback(null, resp, resp.metadata), err => callback(err));
636 }
637 /**
638 * @param {string | File | File[]} source
639 * @param {JobLoadMetadata} metadata
640 * @returns {Promise<JobResponse>}
641 * @private
642 */
643 async _createLoadJob(source, metadata) {
644 if (metadata.format) {
645 metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
646 delete metadata.format;
647 }
648 if (this.location) {
649 metadata.location = this.location;
650 }
651 if (typeof source === 'string') {
652 // A path to a file was given. If a sourceFormat wasn't specified, try to
653 // find a match from the file's extension.
654 const detectedFormat = FORMATS[path
655 .extname(source)
656 .substr(1)
657 .toLowerCase()];
658 if (!metadata.sourceFormat && detectedFormat) {
659 metadata.sourceFormat = detectedFormat;
660 }
661 // Read the file into a new write stream.
662 const jobWritable = fs
663 .createReadStream(source)
664 .pipe(this.createWriteStream_(metadata));
665 const jobResponse = (await p_event_1.default(jobWritable, 'job'));
666 return [jobResponse, jobResponse.metadata];
667 }
668 // eslint-disable-next-line @typescript-eslint/no-explicit-any
669 const body = {
670 configuration: {
671 load: {
672 destinationTable: {
673 projectId: this.bigQuery.projectId,
674 datasetId: this.dataset.id,
675 tableId: this.id,
676 },
677 },
678 },
679 };
680 if (metadata.jobPrefix) {
681 body.jobPrefix = metadata.jobPrefix;
682 delete metadata.jobPrefix;
683 }
684 if (metadata.location) {
685 body.location = metadata.location;
686 delete metadata.location;
687 }
688 if (metadata.jobId) {
689 body.jobId = metadata.jobId;
690 delete metadata.jobId;
691 }
692 extend(true, body.configuration.load, metadata, {
693 sourceUris: arrify(source).map(src => {
694 if (!common_1.util.isCustomType(src, 'storage/file')) {
695 throw new Error('Source must be a File object.');
696 }
697 // If no explicit format was provided, attempt to find a match from
698 // the file's extension. If no match, don't set, and default upstream
699 // to CSV.
700 const format = FORMATS[path
701 .extname(src.name)
702 .substr(1)
703 .toLowerCase()];
704 if (!metadata.sourceFormat && format) {
705 body.configuration.load.sourceFormat = format;
706 }
707 return 'gs://' + src.bucket.name + '/' + src.name;
708 }),
709 });
710 return this.bigQuery.createJob(body);
711 }
712 createQueryJob(options, callback) {
713 return this.dataset.createQueryJob(options, callback);
714 }
715 /**
716 * Run a query scoped to your dataset as a readable object stream.
717 *
718 * See {@link BigQuery#createQueryStream} for full documentation of this
719 * method.
720 *
721 * @param {object} query See {@link BigQuery#createQueryStream} for full
722 * documentation of this method.
723 * @returns {stream} See {@link BigQuery#createQueryStream} for full
724 * documentation of this method.
725 */
726 createQueryStream(query) {
727 return this.dataset.createQueryStream(query);
728 }
729 /**
730 * Creates a write stream. Unlike the public version, this will not
731 * automatically poll the underlying job.
732 *
733 * @private
734 *
735 * @param {string|object} [metadata] Metadata to set with the load operation.
736 * The metadata object should be in the format of the
737 * {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad| `configuration.load`}
738 * property of a Jobs resource. If a string is given, it will be used
739 * as the filetype.
740 * @param {string} [metadata.jobId] Custom job id.
741 * @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
742 * @returns {WritableStream}
743 */
744 createWriteStream_(metadata) {
745 metadata = metadata || {};
746 if (typeof metadata === 'string') {
747 metadata = {
748 sourceFormat: FORMATS[metadata.toLowerCase()],
749 };
750 }
751 if (typeof metadata.schema === 'string') {
752 metadata.schema = Table.createSchemaFromString_(metadata.schema);
753 }
754 metadata = extend(true, {
755 destinationTable: {
756 projectId: this.bigQuery.projectId,
757 datasetId: this.dataset.id,
758 tableId: this.id,
759 },
760 }, metadata);
761 let jobId = metadata.jobId || uuid.v4();
762 if (metadata.jobId) {
763 delete metadata.jobId;
764 }
765 if (metadata.jobPrefix) {
766 jobId = metadata.jobPrefix + jobId;
767 delete metadata.jobPrefix;
768 }
769 const dup = streamEvents(duplexify());
770 dup.once('writing', () => {
771 common_1.util.makeWritableStream(dup, {
772 makeAuthenticatedRequest: this.bigQuery.makeAuthenticatedRequest,
773 metadata: {
774 configuration: {
775 load: metadata,
776 },
777 jobReference: {
778 jobId,
779 projectId: this.bigQuery.projectId,
780 location: this.location,
781 },
782 },
783 request: {
784 uri: `${this.bigQuery.apiEndpoint}/upload/bigquery/v2/projects/${this.bigQuery.projectId}/jobs`,
785 },
786 },
787 // eslint-disable-next-line @typescript-eslint/no-explicit-any
788 (data) => {
789 const job = this.bigQuery.job(data.jobReference.jobId, {
790 location: data.jobReference.location,
791 });
792 job.metadata = data;
793 dup.emit('job', job);
794 });
795 });
796 return dup;
797 }
798 /**
799 * Load data into your table from a readable stream of AVRO, CSV, JSON, ORC,
800 * or PARQUET data.
801 *
802 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert| Jobs: insert API Documentation}
803 *
804 * @param {string|object} [metadata] Metadata to set with the load operation.
805 * The metadata object should be in the format of the
806 * {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad| `configuration.load`}
807 * property of a Jobs resource. If a string is given,
808 * it will be used as the filetype.
809 * @param {string} [metadata.jobId] Custom job id.
810 * @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
811 * @returns {WritableStream}
812 *
813 * @throws {Error} If source format isn't recognized.
814 *
815 * @example
816 * ```
817 * const {BigQuery} = require('@google-cloud/bigquery');
818 * const bigquery = new BigQuery();
819 * const dataset = bigquery.dataset('my-dataset');
820 * const table = dataset.table('my-table');
821 *
822 * //-
823 * // Load data from a CSV file.
824 * //-
825 * const request = require('request');
826 *
827 * const csvUrl = 'http://goo.gl/kSE7z6';
828 *
829 * const metadata = {
830 * allowJaggedRows: true,
831 * skipLeadingRows: 1
832 * };
833 *
834 * request.get(csvUrl)
835 * .pipe(table.createWriteStream(metadata))
836 * .on('job', (job) => {
837 * // `job` is a Job object that can be used to check the status of the
838 * // request.
839 * })
840 * .on('complete', (job) => {
841 * // The job has completed successfully.
842 * });
843 *
844 * //-
845 * // Load data from a JSON file.
846 * //-
847 * const fs = require('fs');
848 *
849 * fs.createReadStream('./test/testdata/testfile.json')
850 * .pipe(table.createWriteStream('json'))
851 * .on('job', (job) => {
852 * // `job` is a Job object that can be used to check the status of the
853 * // request.
854 * })
855 * .on('complete', (job) => {
856 * // The job has completed successfully.
857 * });
858 * ```
859 */
860 createWriteStream(metadata) {
861 const stream = this.createWriteStream_(metadata);
862 stream.on('prefinish', () => {
863 stream.cork();
864 });
865 stream.on('job', (job) => {
866 job
867 .on('error', err => {
868 stream.destroy(err);
869 })
870 .on('complete', () => {
871 stream.emit('complete', job);
872 stream.uncork();
873 });
874 });
875 return stream;
876 }
877 extract(destination, optionsOrCallback, cb) {
878 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
879 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
880 this.createExtractJob(destination, options, (err, job, resp) => {
881 if (err) {
882 callback(err, resp);
883 return;
884 }
885 job.on('error', callback).on('complete', metadata => {
886 callback(null, metadata);
887 });
888 });
889 }
890 /**
891 * Retrieves table data from a specified set of rows. The rows are returned to
892 * your callback as an array of objects matching your table's schema.
893 *
894 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list| Tabledata: list API Documentation}
895 *
896 * @param {object} [options] The configuration object.
897 * @param {boolean} [options.autoPaginate=true] Have pagination handled
898 * automatically.
899 * @param {number} [options.maxApiCalls] Maximum number of API calls to make.
900 * @param {number} [options.maxResults] Maximum number of results to return.
901 * @param {boolean|IntegerTypeCastOptions} [options.wrapIntegers=false] Wrap values
902 * of 'INT64' type in {@link BigQueryInt} objects.
903 * If a `boolean`, this will wrap values in {@link BigQueryInt} objects.
904 * If an `object`, this will return a value returned by
905 * `wrapIntegers.integerTypeCastFunction`.
906 * @param {RowsCallback} [callback] The callback function. If `autoPaginate`
907 * is set to false a {@link ManualQueryResultsCallback} should be used.
908 * @param {?error} callback.err An error returned while making this request
909 * @param {array} callback.rows The table data from specified set of rows.
910 * @returns {Promise<RowsResponse>}
911 *
912 * @example
913 * ```
914 * const {BigQuery} = require('@google-cloud/bigquery');
915 * const bigquery = new BigQuery();
916 * const dataset = bigquery.dataset('my-dataset');
917 * const table = dataset.table('my-table');
918 *
919 * table.getRows((err, rows) => {
920 * if (!err) {
921 * // rows is an array of results.
922 * }
923 * });
924 *
925 * //-
926 * // To control how many API requests are made and page through the results
927 * // manually, set `autoPaginate` to `false`.
928 * //-
929 * function manualPaginationCallback(err, rows, nextQuery, apiResponse) {
930 * if (nextQuery) {
931 * // More results exist.
932 * table.getRows(nextQuery, manualPaginationCallback);
933 * }
934 * }
935 *
936 * table.getRows({
937 * autoPaginate: false
938 * }, manualPaginationCallback);
939 *
940 * //-
941 * // If the callback is omitted, we'll return a Promise.
942 * //-
943 * table.getRows().then((data) => {
944 * const rows = data[0];
945 * });
946 * ```
947 */
948 getRows(optionsOrCallback, cb) {
949 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
950 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
951 const wrapIntegers = options.wrapIntegers ? options.wrapIntegers : false;
952 delete options.wrapIntegers;
953 const onComplete = (err, rows, nextQuery, resp) => {
954 if (err) {
955 callback(err, null, null, resp);
956 return;
957 }
958 rows = _1.BigQuery.mergeSchemaWithRows_(this.metadata.schema, rows || [], wrapIntegers, options.selectedFields ? options.selectedFields.split(',') : []);
959 callback(null, rows, nextQuery, resp);
960 };
961 this.request({
962 uri: '/data',
963 qs: options,
964 }, (err, resp) => {
965 if (err) {
966 onComplete(err, null, null, resp);
967 return;
968 }
969 let nextQuery = null;
970 if (resp.pageToken) {
971 nextQuery = Object.assign({}, options, {
972 pageToken: resp.pageToken,
973 });
974 }
975 if (resp.rows && resp.rows.length > 0 && !this.metadata.schema) {
976 // We don't know the schema for this table yet. Do a quick stat.
977 this.getMetadata((err, metadata, apiResponse) => {
978 if (err) {
979 onComplete(err, null, null, apiResponse);
980 return;
981 }
982 onComplete(null, resp.rows, nextQuery, resp);
983 });
984 return;
985 }
986 onComplete(null, resp.rows, nextQuery, resp);
987 });
988 }
989 insert(rows, optionsOrCallback, cb) {
990 const options = typeof optionsOrCallback === 'object'
991 ? optionsOrCallback
992 : {};
993 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
994 const promise = this._insertAndCreateTable(rows, options);
995 if (callback) {
996 promise.then(resp => callback(null, resp), err => callback(err, null));
997 }
998 else {
999 return promise.then(r => [r]);
1000 }
1001 }
1002 /**
1003 * Insert rows with retries, but will create the table if not exists.
1004 *
1005 * @param {RowMetadata | RowMetadata[]} rows
1006 * @param {InsertRowsOptions} options
1007 * @returns {Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable>}
1008 * @private
1009 */
1010 async _insertAndCreateTable(rows, options) {
1011 const { schema } = options;
1012 const delay = 60000;
1013 try {
1014 return await this._insertWithRetry(rows, options);
1015 }
1016 catch (err) {
1017 if (err.code !== 404 || !schema) {
1018 throw err;
1019 }
1020 }
1021 try {
1022 await this.create({ schema });
1023 }
1024 catch (err) {
1025 if (err.code !== 409) {
1026 throw err;
1027 }
1028 }
1029 // table creation after failed access is subject to failure caching and
1030 // eventual consistency, see:
1031 // https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292
1032 await new Promise(resolve => setTimeout(resolve, delay));
1033 return this._insertAndCreateTable(rows, options);
1034 }
1035 /**
1036 * This method will attempt to insert rows while retrying any partial failures
1037 * that occur along the way. Because partial insert failures are returned
1038 * differently, we can't depend on our usual retry strategy.
1039 *
1040 * @private
1041 *
1042 * @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1043 * @param {InsertRowsOptions} options Insert options.
1044 * @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1045 */
1046 async _insertWithRetry(rows, options) {
1047 const { partialRetries = 3 } = options;
1048 let error;
1049 const maxAttempts = Math.max(partialRetries, 0) + 1;
1050 for (let attempts = 0; attempts < maxAttempts; attempts++) {
1051 try {
1052 return await this._insert(rows, options);
1053 }
1054 catch (e) {
1055 error = e;
1056 rows = (e.errors || [])
1057 .filter(err => !!err.row)
1058 .map(err => err.row);
1059 if (!rows.length) {
1060 break;
1061 }
1062 }
1063 }
1064 throw error;
1065 }
1066 /**
1067 * This method does the bulk of the work for processing options and making the
1068 * network request.
1069 *
1070 * @private
1071 *
1072 * @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1073 * @param {InsertRowsOptions} options Insert options.
1074 * @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1075 */
1076 async _insert(rows, options) {
1077 rows = arrify(rows);
1078 if (!rows.length) {
1079 throw new Error('You must provide at least 1 row to be inserted.');
1080 }
1081 const json = extend(true, {}, options, { rows });
1082 if (!options.raw) {
1083 json.rows = rows.map((row) => {
1084 const encoded = {
1085 json: Table.encodeValue_(row),
1086 };
1087 if (options.createInsertId !== false) {
1088 encoded.insertId = uuid.v4();
1089 }
1090 return encoded;
1091 });
1092 }
1093 delete json.createInsertId;
1094 delete json.partialRetries;
1095 delete json.raw;
1096 delete json.schema;
1097 const [resp] = await this.request({
1098 method: 'POST',
1099 uri: '/insertAll',
1100 json,
1101 });
1102 const partialFailures = (resp.insertErrors || []).map((insertError) => {
1103 return {
1104 errors: insertError.errors.map(error => {
1105 return {
1106 message: error.message,
1107 reason: error.reason,
1108 };
1109 }),
1110 // eslint-disable-next-line @typescript-eslint/no-explicit-any
1111 row: rows[insertError.index],
1112 };
1113 });
1114 if (partialFailures.length > 0) {
1115 throw new common_1.util.PartialFailureError({
1116 errors: partialFailures,
1117 response: resp,
1118 });
1119 }
1120 return resp;
1121 }
1122 createInsertStream(options) {
1123 options = typeof options === 'object' ? options : {};
1124 const dup = new stream_1.Duplex({ objectMode: true });
1125 dup._write = (chunk, encoding, cb) => {
1126 this.rowQueue.add(chunk, () => { });
1127 cb();
1128 };
1129 this.rowQueue = new rowQueue_1.RowQueue(this, dup, options);
1130 return dup;
1131 }
1132 load(source, metadataOrCallback, cb) {
1133 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
1134 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
1135 this.createLoadJob(source, metadata, (err, job, resp) => {
1136 if (err) {
1137 callback(err, resp);
1138 return;
1139 }
1140 job.on('error', callback).on('complete', metadata => {
1141 callback(null, metadata);
1142 });
1143 });
1144 }
1145 query(query, callback) {
1146 if (typeof query === 'string') {
1147 query = {
1148 query,
1149 };
1150 }
1151 this.dataset.query(query, callback);
1152 }
1153 setMetadata(metadata, callback) {
1154 const body = Table.formatMetadata_(metadata);
1155 super.setMetadata(body, callback);
1156 }
1157 getIamPolicy(optionsOrCallback, cb) {
1158 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
1159 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1160 if (typeof options.requestedPolicyVersion === 'number' &&
1161 options.requestedPolicyVersion !== 1) {
1162 throw new Error('Only IAM policy version 1 is supported.');
1163 }
1164 const json = extend(true, {}, { options });
1165 this.request({
1166 method: 'POST',
1167 uri: '/:getIamPolicy',
1168 json,
1169 }, (err, resp) => {
1170 if (err) {
1171 callback(err, null);
1172 return;
1173 }
1174 callback(null, resp);
1175 });
1176 }
1177 setIamPolicy(policy, optionsOrCallback, cb) {
1178 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
1179 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1180 if (policy.version && policy.version !== 1) {
1181 throw new Error('Only IAM policy version 1 is supported.');
1182 }
1183 const json = extend(true, {}, options, { policy });
1184 this.request({
1185 method: 'POST',
1186 uri: '/:setIamPolicy',
1187 json,
1188 }, (err, resp) => {
1189 if (err) {
1190 callback(err, null);
1191 return;
1192 }
1193 callback(null, resp);
1194 });
1195 }
1196 testIamPermissions(permissions, callback) {
1197 permissions = arrify(permissions);
1198 const json = extend(true, {}, { permissions });
1199 this.request({
1200 method: 'POST',
1201 uri: '/:testIamPermissions',
1202 json,
1203 }, (err, resp) => {
1204 if (err) {
1205 callback(err, null);
1206 return;
1207 }
1208 callback(null, resp);
1209 });
1210 }
1211}
1212exports.Table = Table;
1213/*! Developer Documentation
1214 *
1215 * These methods can be auto-paginated.
1216 */
1217paginator_1.paginator.extend(Table, ['getRows']);
1218/*! Developer Documentation
1219 *
1220 * All async methods (except for streams) will return a Promise in the event
1221 * that a callback is omitted.
1222 */
1223promisify_1.promisifyAll(Table);
1224//# sourceMappingURL=table.js.map
\No newline at end of file