UNPKG

46.6 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Table = void 0;
19const common_1 = require("@google-cloud/common");
20const paginator_1 = require("@google-cloud/paginator");
21const promisify_1 = require("@google-cloud/promisify");
22const arrify = require("arrify");
23const Big = require("big.js");
24const extend = require("extend");
25const events_1 = require("events");
26const fs = require("fs");
27const is = require("is");
28const path = require("path");
29const streamEvents = require("stream-events");
30const uuid = require("uuid");
31const _1 = require(".");
32const stream_1 = require("stream");
33const rowQueue_1 = require("./rowQueue");
34// eslint-disable-next-line @typescript-eslint/no-var-requires
35const duplexify = require('duplexify');
36/**
37 * The file formats accepted by BigQuery.
38 *
39 * @type {object}
40 * @private
41 */
42const FORMATS = {
43 avro: 'AVRO',
44 csv: 'CSV',
45 export_metadata: 'DATASTORE_BACKUP',
46 json: 'NEWLINE_DELIMITED_JSON',
47 orc: 'ORC',
48 parquet: 'PARQUET',
49};
50/**
51 * Table objects are returned by methods such as
52 * {@link Dataset#table}, {@link Dataset#createTable}, and
53 * {@link Dataset#getTables}.
54 *
55 * @class
56 * @param {Dataset} dataset {@link Dataset} instance.
57 * @param {string} id The ID of the table.
58 * @param {object} [options] Table options.
59 * @param {string} [options.location] The geographic location of the table, by
60 * default this value is inherited from the dataset. This can be used to
61 * configure the location of all jobs created through a table instance. It
62 * cannot be used to set the actual location of the table. This value will
63 * be superseded by any API responses containing location data for the
64 * table.
65 *
66 * @example
67 * ```
68 * const {BigQuery} = require('@google-cloud/bigquery');
69 * const bigquery = new BigQuery();
70 * const dataset = bigquery.dataset('my-dataset');
71 *
72 * const table = dataset.table('my-table');
73 * ```
74 */
75class Table extends common_1.ServiceObject {
76 createReadStream(options) {
77 // placeholder body, overwritten in constructor
78 return new paginator_1.ResourceStream({}, () => { });
79 }
80 constructor(dataset, id, options) {
81 const methods = {
82 /**
83 * @callback CreateTableCallback
84 * @param {?Error} err Request error, if any.
85 * @param {Table} table The table.
86 * @param {object} apiResponse The full API response body.
87 */
88 /**
89 * @typedef {array} CreateTableResponse
90 * @property {Table} 0 The table.
91 * @property {object} 1 The full API response body.
92 */
93 /**
94 * Create a table.
95 *
96 * @method Table#create
97 * @param {object} [options] See {@link Dataset#createTable}.
98 * @param {CreateTableCallback} [callback]
99 * @param {?error} callback.err An error returned while making this
100 * request.
101 * @param {Table} callback.table The new {@link Table}.
102 * @param {object} callback.apiResponse The full API response.
103 * @returns {Promise<CreateTableResponse>}
104 *
105 * @example
106 * ```
107 * const {BigQuery} = require('@google-cloud/bigquery');
108 * const bigquery = new BigQuery();
109 * const dataset = bigquery.dataset('my-dataset');
110 *
111 * const table = dataset.table('my-table');
112 *
113 * table.create((err, table, apiResponse) => {
114 * if (!err) {
115 * // The table was created successfully.
116 * }
117 * });
118 *
119 * //-
120 * // If the callback is omitted, we'll return a Promise.
121 * //-
122 * table.create().then((data) => {
123 * const table = data[0];
124 * const apiResponse = data[1];
125 * });
126 * ```
127 */
128 create: true,
129 /**
130 * @callback DeleteTableCallback
131 * @param {?Error} err Request error, if any.
132 * @param {object} apiResponse The full API response.
133 */
134 /**
135 * @typedef {array} DeleteTableResponse
136 * @property {object} 0 The full API response.
137 */
138 /**
139 * Delete a table and all its data.
140 *
141 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tables/delete| Tables: delete API Documentation}
142 *
143 * @method Table#delete
144 * @param {DeleteTableCallback} [callback]
145 * @param {?error} callback.err An error returned while making this
146 * request.
147 * @param {object} callback.apiResponse The full API response.
148 * @returns {Promise<DeleteTableResponse>}
149 *
150 * @example
151 * ```
152 * const {BigQuery} = require('@google-cloud/bigquery');
153 * const bigquery = new BigQuery();
154 * const dataset = bigquery.dataset('my-dataset');
155 *
156 * const table = dataset.table('my-table');
157 *
158 * table.delete((err, apiResponse) => {});
159 *
160 * //-
161 * // If the callback is omitted, we'll return a Promise.
162 * //-
163 * table.delete().then((data) => {
164 * const apiResponse = data[0];
165 * });
166 * ```
167 */
168 delete: true,
169 /**
170 * @callback TableExistsCallback
171 * @param {?Error} err Request error, if any.
172 * @param {boolean} exists Indicates if the table exists.
173 */
174 /**
175 * @typedef {array} TableExistsCallback
176 * @property {boolean} 0 Indicates if the table exists.
177 */
178 /**
179 * Check if the table exists.
180 *
181 * @method Table#exists
182 * @param {TableExistsCallback} [callback]
183 * @param {?error} callback.err An error returned while making this
184 * request.
185 * @param {boolean} callback.exists Whether the table exists or not.
186 * @returns {Promise<TableExistsCallback>}
187 *
188 * @example
189 * ```
190 * const {BigQuery} = require('@google-cloud/bigquery');
191 * const bigquery = new BigQuery();
192 * const dataset = bigquery.dataset('my-dataset');
193 *
194 * const table = dataset.table('my-table');
195 *
196 * table.exists((err, exists) => {});
197 *
198 * //-
199 * // If the callback is omitted, we'll return a Promise.
200 * //-
201 * table.exists().then((data) => {
202 * const exists = data[0];
203 * });
204 * ```
205 */
206 exists: true,
207 /**
208 * @callback GetTableCallback
209 * @param {?Error} err Request error, if any.
210 * @param {Table} table The table.
211 * @param {object} apiResponse The full API response body.
212 */
213 /**
214 * @typedef {array} GetTableResponse
215 * @property {Table} 0 The table.
216 * @property {object} 1 The full API response body.
217 */
218 /**
219 * Get a table if it exists.
220 *
221 * You may optionally use this to "get or create" an object by providing
222 * an object with `autoCreate` set to `true`. Any extra configuration that
223 * is normally required for the `create` method must be contained within
224 * this object as well.
225 *
226 * If you wish to get a selection of metadata instead of the full table metadata
227 * (retrieved by both Table#get by default and by Table#getMetadata), use
228 * the `options` parameter to set the `view` and/or `selectedFields` query parameters.
229 *
230 * See {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView| Tables.get and TableMetadataView }
231 *
232 * @method Table#get
233 * @param {options} [options] Configuration object.
234 * @param {boolean} [options.autoCreate=false] Automatically create the
235 * object if it does not exist.
236 * @param {function} [callback]
237 * @param {?error} callback.err An error returned while making this
238 * request.
239 * @param {Table} callback.table The {@link Table}.
240 * @param {object} callback.apiResponse The full API response.
241 * @returns {Promise<GetTableResponse>}
242 *
243 * @example
244 * ```
245 * const {BigQuery} = require('@google-cloud/bigquery');
246 * const bigquery = new BigQuery();
247 * const dataset = bigquery.dataset('my-dataset');
248 *
249 * const table = dataset.table('my-table');
250 *
251 * const options = {
252 * view: "BASIC"
253 * }
254 *
255 * table.get((err, table, apiResponse) => {
256 * // `table.metadata` has been populated.
257 * });
258 *
259 * table.get(options, (err, table, apiResponse) => {
260 * // A selection of `table.metadata` has been populated
261 * })
262 *
263 * //-
264 * // If the callback is omitted, we'll return a Promise.
265 * //-
266 * table.get().then((data) => {
267 * const table = data[0];
268 * const apiResponse = data[1];
269 * });
270 * ```
271 */
272 get: true,
273 /**
274 * @callback GetTableMetadataCallback
275 * @param {?Error} err Request error, if any.
276 * @param {object} metadata The table metadata.
277 * @param {object} apiResponse The full API response.
278 */
279 /**
280 * @typedef {array} GetTableMetadataResponse
281 * @property {object} 0 The table metadata.
282 * @property {object} 1 The full API response.
283 */
284 /**
285 * Return the metadata associated with the Table.
286 *
287 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tables/get| Tables: get API Documentation}
288 *
289 * @method Table#getMetadata
290 * @param {GetTableMetadataCallback} [callback] The callback function.
291 * @param {?error} callback.err An error returned while making this
292 * request.
293 * @param {object} callback.metadata The metadata of the Table.
294 * @param {object} callback.apiResponse The full API response.
295 * @returns {Promise<GetTableMetadataResponse>}
296 *
297 * @example
298 * ```
299 * const {BigQuery} = require('@google-cloud/bigquery');
300 * const bigquery = new BigQuery();
301 * const dataset = bigquery.dataset('my-dataset');
302 *
303 * const table = dataset.table('my-table');
304 *
305 * table.getMetadata((err, metadata, apiResponse) => {});
306 *
307 * //-
308 * // If the callback is omitted, we'll return a Promise.
309 * //-
310 * table.getMetadata().then((data) => {
311 * const metadata = data[0];
312 * const apiResponse = data[1];
313 * });
314 * ```
315 */
316 getMetadata: true,
317 };
318 super({
319 parent: dataset,
320 baseUrl: '/tables',
321 id,
322 createMethod: dataset.createTable.bind(dataset),
323 methods,
324 });
325 if (options && options.location) {
326 this.location = options.location;
327 }
328 this.bigQuery = dataset.bigQuery;
329 this.dataset = dataset;
330 // Catch all for read-modify-write cycle
331 // https://cloud.google.com/bigquery/docs/api-performance#read-patch-write
332 this.interceptors.push({
333 request: (reqOpts) => {
334 if (reqOpts.method === 'PATCH' && reqOpts.json.etag) {
335 reqOpts.headers = reqOpts.headers || {};
336 reqOpts.headers['If-Match'] = reqOpts.json.etag;
337 }
338 return reqOpts;
339 },
340 });
341 /**
342 * Create a readable stream of the rows of data in your table. This method
343 * is simply a wrapper around {@link Table#getRows}.
344 *
345 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list| Tabledata: list API Documentation}
346 *
347 * @returns {ReadableStream}
348 *
349 * @example
350 * ```
351 * const {BigQuery} = require('@google-cloud/bigquery');
352 * const bigquery = new BigQuery();
353 * const dataset = bigquery.dataset('my-dataset');
354 * const table = dataset.table('my-table');
355 *
356 * table.createReadStream(options)
357 * .on('error', console.error)
358 * .on('data', row => {})
359 * .on('end', function() {
360 * // All rows have been retrieved.
361 * });
362 *
363 * //-
364 * // If you anticipate many results, you can end a stream early to prevent
365 * // unnecessary processing and API requests.
366 * //-
367 * table.createReadStream()
368 * .on('data', function(row) {
369 * this.end();
370 * });
371 * ```
372 */
373 this.createReadStream = paginator_1.paginator.streamify('getRows');
374 }
375 /**
376 * Convert a comma-separated name:type string to a table schema object.
377 *
378 * @static
379 * @private
380 *
381 * @param {string} str Comma-separated schema string.
382 * @returns {object} Table schema in the format the API expects.
383 */
384 static createSchemaFromString_(str) {
385 return str.split(/\s*,\s*/).reduce((acc, pair) => {
386 acc.fields.push({
387 name: pair.split(':')[0].trim(),
388 type: (pair.split(':')[1] || 'STRING').toUpperCase().trim(),
389 });
390 return acc;
391 }, {
392 fields: [],
393 });
394 }
395 /**
396 * Convert a row entry from native types to their encoded types that the API
397 * expects.
398 *
399 * @static
400 * @private
401 *
402 * @param {*} value The value to be converted.
403 * @returns {*} The converted value.
404 */
405 static encodeValue_(value) {
406 var _a;
407 if (typeof value === 'undefined' || value === null) {
408 return null;
409 }
410 if (value instanceof Buffer) {
411 return value.toString('base64');
412 }
413 if (value instanceof Big) {
414 return value.toFixed();
415 }
416 const customTypeConstructorNames = [
417 'BigQueryDate',
418 'BigQueryDatetime',
419 'BigQueryInt',
420 'BigQueryTime',
421 'BigQueryTimestamp',
422 'BigQueryRange',
423 'Geography',
424 ];
425 const constructorName = (_a = value.constructor) === null || _a === void 0 ? void 0 : _a.name;
426 const isCustomType = customTypeConstructorNames.indexOf(constructorName) > -1;
427 if (isCustomType) {
428 return value.value;
429 }
430 if (is.date(value)) {
431 return value.toJSON();
432 }
433 if (is.array(value)) {
434 return value.map(Table.encodeValue_);
435 }
436 if (typeof value === 'object') {
437 return Object.keys(value).reduce((acc, key) => {
438 acc[key] = Table.encodeValue_(value[key]);
439 return acc;
440 }, {});
441 }
442 return value;
443 }
444 /**
445 * @private
446 */
447 static formatMetadata_(options) {
448 const body = extend(true, {}, options);
449 if (options.name) {
450 body.friendlyName = options.name;
451 delete body.name;
452 }
453 if (is.string(options.schema)) {
454 body.schema = Table.createSchemaFromString_(options.schema);
455 }
456 if (is.array(options.schema)) {
457 body.schema = {
458 fields: options.schema,
459 };
460 }
461 if (body.schema && body.schema.fields) {
462 body.schema.fields = body.schema.fields.map(field => {
463 if (field.fields) {
464 field.type = 'RECORD';
465 }
466 return field;
467 });
468 }
469 if (is.string(options.partitioning)) {
470 body.timePartitioning = {
471 type: options.partitioning.toUpperCase(),
472 };
473 delete body.partitioning;
474 }
475 if (is.string(options.view)) {
476 body.view = {
477 query: options.view,
478 useLegacySql: false,
479 };
480 }
481 return body;
482 }
483 copy(destination, metadataOrCallback, cb) {
484 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
485 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
486 this.createCopyJob(destination, metadata, (err, job, resp) => {
487 if (err) {
488 callback(err, resp);
489 return;
490 }
491 job.on('error', callback).on('complete', (metadata) => {
492 callback(null, metadata);
493 });
494 });
495 }
496 copyFrom(sourceTables, metadataOrCallback, cb) {
497 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
498 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
499 this.createCopyFromJob(sourceTables, metadata, (err, job, resp) => {
500 if (err) {
501 callback(err, resp);
502 return;
503 }
504 job.on('error', callback).on('complete', metadata => {
505 callback(null, metadata);
506 });
507 });
508 }
509 createCopyJob(destination, metadataOrCallback, cb) {
510 if (!(destination instanceof Table)) {
511 throw new Error('Destination must be a Table object.');
512 }
513 const metadata = typeof metadataOrCallback === 'object'
514 ? metadataOrCallback
515 : {};
516 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
517 // eslint-disable-next-line @typescript-eslint/no-explicit-any
518 const body = {
519 configuration: {
520 copy: extend(true, metadata, {
521 destinationTable: {
522 datasetId: destination.dataset.id,
523 projectId: destination.dataset.projectId,
524 tableId: destination.id,
525 },
526 sourceTable: {
527 datasetId: this.dataset.id,
528 projectId: this.dataset.projectId,
529 tableId: this.id,
530 },
531 }),
532 },
533 };
534 if (metadata.jobPrefix) {
535 body.jobPrefix = metadata.jobPrefix;
536 delete metadata.jobPrefix;
537 }
538 if (this.location) {
539 body.location = this.location;
540 }
541 if (metadata.jobId) {
542 body.jobId = metadata.jobId;
543 delete metadata.jobId;
544 }
545 this.bigQuery.createJob(body, callback);
546 }
547 createCopyFromJob(source, metadataOrCallback, cb) {
548 const sourceTables = arrify(source);
549 sourceTables.forEach(sourceTable => {
550 if (!(sourceTable instanceof Table)) {
551 throw new Error('Source must be a Table object.');
552 }
553 });
554 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
555 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
556 // eslint-disable-next-line @typescript-eslint/no-explicit-any
557 const body = {
558 configuration: {
559 copy: extend(true, metadata, {
560 destinationTable: {
561 datasetId: this.dataset.id,
562 projectId: this.dataset.projectId,
563 tableId: this.id,
564 },
565 sourceTables: sourceTables.map(sourceTable => {
566 return {
567 datasetId: sourceTable.dataset.id,
568 projectId: sourceTable.dataset.projectId,
569 tableId: sourceTable.id,
570 };
571 }),
572 }),
573 },
574 };
575 if (metadata.jobPrefix) {
576 body.jobPrefix = metadata.jobPrefix;
577 delete metadata.jobPrefix;
578 }
579 if (this.location) {
580 body.location = this.location;
581 }
582 if (metadata.jobId) {
583 body.jobId = metadata.jobId;
584 delete metadata.jobId;
585 }
586 this.bigQuery.createJob(body, callback);
587 }
588 createExtractJob(destination, optionsOrCallback, cb) {
589 let options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
590 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
591 options = extend(true, options, {
592 destinationUris: arrify(destination).map(dest => {
593 if (!common_1.util.isCustomType(dest, 'storage/file')) {
594 throw new Error('Destination must be a File object.');
595 }
596 // If no explicit format was provided, attempt to find a match from the
597 // file's extension. If no match, don't set, and default upstream to
598 // CSV.
599 const format = path.extname(dest.name).substr(1).toLowerCase();
600 if (!options.destinationFormat && !options.format && FORMATS[format]) {
601 options.destinationFormat = FORMATS[format];
602 }
603 return 'gs://' + dest.bucket.name + '/' + dest.name;
604 }),
605 });
606 if (options.format) {
607 options.format = options.format.toLowerCase();
608 if (FORMATS[options.format]) {
609 options.destinationFormat = FORMATS[options.format];
610 delete options.format;
611 }
612 else {
613 throw new Error('Destination format not recognized: ' + options.format);
614 }
615 }
616 if (options.gzip) {
617 options.compression = 'GZIP';
618 delete options.gzip;
619 }
620 // eslint-disable-next-line @typescript-eslint/no-explicit-any
621 const body = {
622 configuration: {
623 extract: extend(true, options, {
624 sourceTable: {
625 datasetId: this.dataset.id,
626 projectId: this.dataset.projectId,
627 tableId: this.id,
628 },
629 }),
630 },
631 };
632 if (options.jobPrefix) {
633 body.jobPrefix = options.jobPrefix;
634 delete options.jobPrefix;
635 }
636 if (this.location) {
637 body.location = this.location;
638 }
639 if (options.jobId) {
640 body.jobId = options.jobId;
641 delete options.jobId;
642 }
643 this.bigQuery.createJob(body, callback);
644 }
645 createLoadJob(source, metadataOrCallback, cb) {
646 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
647 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
648 this._createLoadJob(source, metadata).then(([resp]) => callback(null, resp, resp.metadata), err => callback(err));
649 }
650 /**
651 * @param {string | File | File[]} source
652 * @param {JobLoadMetadata} metadata
653 * @returns {Promise<JobResponse>}
654 * @private
655 */
656 async _createLoadJob(source, metadata) {
657 if (metadata.format) {
658 metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
659 delete metadata.format;
660 }
661 if (this.location) {
662 metadata.location = this.location;
663 }
664 if (typeof source === 'string') {
665 // A path to a file was given. If a sourceFormat wasn't specified, try to
666 // find a match from the file's extension.
667 const detectedFormat = FORMATS[path.extname(source).substr(1).toLowerCase()];
668 if (!metadata.sourceFormat && detectedFormat) {
669 metadata.sourceFormat = detectedFormat;
670 }
671 // Read the file into a new write stream.
672 const jobWritable = fs
673 .createReadStream(source)
674 .pipe(this.createWriteStream_(metadata));
675 const [jobResponse] = (await (0, events_1.once)(jobWritable, 'job'));
676 return [jobResponse, jobResponse.metadata];
677 }
678 // eslint-disable-next-line @typescript-eslint/no-explicit-any
679 const body = {
680 configuration: {
681 load: {
682 destinationTable: {
683 projectId: this.dataset.projectId,
684 datasetId: this.dataset.id,
685 tableId: this.id,
686 },
687 },
688 },
689 };
690 if (metadata.jobPrefix) {
691 body.jobPrefix = metadata.jobPrefix;
692 delete metadata.jobPrefix;
693 }
694 if (metadata.location) {
695 body.location = metadata.location;
696 delete metadata.location;
697 }
698 if (metadata.jobId) {
699 body.jobId = metadata.jobId;
700 delete metadata.jobId;
701 }
702 extend(true, body.configuration.load, metadata, {
703 sourceUris: arrify(source).map(src => {
704 if (!common_1.util.isCustomType(src, 'storage/file')) {
705 throw new Error('Source must be a File object.');
706 }
707 // If no explicit format was provided, attempt to find a match from
708 // the file's extension. If no match, don't set, and default upstream
709 // to CSV.
710 const format = FORMATS[path.extname(src.name).substr(1).toLowerCase()];
711 if (!metadata.sourceFormat && format) {
712 body.configuration.load.sourceFormat = format;
713 }
714 return 'gs://' + src.bucket.name + '/' + src.name;
715 }),
716 });
717 return this.bigQuery.createJob(body);
718 }
719 createQueryJob(options, callback) {
720 return this.dataset.createQueryJob(options, callback);
721 }
722 /**
723 * Run a query scoped to your dataset as a readable object stream.
724 *
725 * See {@link BigQuery#createQueryStream} for full documentation of this
726 * method.
727 *
728 * @param {object} query See {@link BigQuery#createQueryStream} for full
729 * documentation of this method.
730 * @returns {stream} See {@link BigQuery#createQueryStream} for full
731 * documentation of this method.
732 */
733 createQueryStream(query) {
734 return this.dataset.createQueryStream(query);
735 }
736 /**
737 * Creates a write stream. Unlike the public version, this will not
738 * automatically poll the underlying job.
739 *
740 * @private
741 *
742 * @param {string|object} [metadata] Metadata to set with the load operation.
743 * The metadata object should be in the format of the
744 * {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad| `configuration.load`}
745 * property of a Jobs resource. If a string is given, it will be used
746 * as the filetype.
747 * @param {string} [metadata.jobId] Custom job id.
748 * @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
749 * @returns {WritableStream}
750 */
751 createWriteStream_(metadata) {
752 metadata = metadata || {};
753 if (typeof metadata === 'string') {
754 metadata = {
755 sourceFormat: FORMATS[metadata.toLowerCase()],
756 };
757 }
758 if (typeof metadata.schema === 'string') {
759 metadata.schema = Table.createSchemaFromString_(metadata.schema);
760 }
761 metadata = extend(true, {
762 destinationTable: {
763 projectId: this.dataset.projectId,
764 datasetId: this.dataset.id,
765 tableId: this.id,
766 },
767 }, metadata);
768 let jobId = metadata.jobId || uuid.v4();
769 if (metadata.jobId) {
770 delete metadata.jobId;
771 }
772 if (metadata.jobPrefix) {
773 jobId = metadata.jobPrefix + jobId;
774 delete metadata.jobPrefix;
775 }
776 const dup = streamEvents(duplexify());
777 dup.once('writing', () => {
778 common_1.util.makeWritableStream(dup, {
779 makeAuthenticatedRequest: this.bigQuery.makeAuthenticatedRequest,
780 metadata: {
781 configuration: {
782 load: metadata,
783 },
784 jobReference: {
785 jobId,
786 projectId: this.dataset.projectId,
787 location: this.location,
788 },
789 },
790 request: {
791 uri: `${this.bigQuery.apiEndpoint}/upload/bigquery/v2/projects/${this.dataset.projectId}/jobs`,
792 },
793 },
794 // eslint-disable-next-line @typescript-eslint/no-explicit-any
795 (data) => {
796 const job = this.bigQuery.job(data.jobReference.jobId, {
797 location: data.jobReference.location,
798 projectId: data.jobReference.projectId,
799 });
800 job.metadata = data;
801 dup.emit('job', job);
802 });
803 });
804 return dup;
805 }
806 /**
807 * Load data into your table from a readable stream of AVRO, CSV, JSON, ORC,
808 * or PARQUET data.
809 *
810 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert| Jobs: insert API Documentation}
811 *
812 * @param {string|object} [metadata] Metadata to set with the load operation.
813 * The metadata object should be in the format of the
814 * {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad| `configuration.load`}
815 * property of a Jobs resource. If a string is given,
816 * it will be used as the filetype.
817 * @param {string} [metadata.jobId] Custom job id.
818 * @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
819 * @returns {WritableStream}
820 *
821 * @throws {Error} If source format isn't recognized.
822 *
823 * @example
824 * ```
825 * const {BigQuery} = require('@google-cloud/bigquery');
826 * const bigquery = new BigQuery();
827 * const dataset = bigquery.dataset('my-dataset');
828 * const table = dataset.table('my-table');
829 *
830 * //-
831 * // Load data from a CSV file.
832 * //-
833 * const request = require('request');
834 *
835 * const csvUrl = 'http://goo.gl/kSE7z6';
836 *
837 * const metadata = {
838 * allowJaggedRows: true,
839 * skipLeadingRows: 1
840 * };
841 *
842 * request.get(csvUrl)
843 * .pipe(table.createWriteStream(metadata))
844 * .on('job', (job) => {
845 * // `job` is a Job object that can be used to check the status of the
846 * // request.
847 * })
848 * .on('complete', (job) => {
849 * // The job has completed successfully.
850 * });
851 *
852 * //-
853 * // Load data from a JSON file.
854 * //-
855 * const fs = require('fs');
856 *
857 * fs.createReadStream('./test/testdata/testfile.json')
858 * .pipe(table.createWriteStream('json'))
859 * .on('job', (job) => {
860 * // `job` is a Job object that can be used to check the status of the
861 * // request.
862 * })
863 * .on('complete', (job) => {
864 * // The job has completed successfully.
865 * });
866 * ```
867 */
868 createWriteStream(metadata) {
869 const stream = this.createWriteStream_(metadata);
870 stream.on('prefinish', () => {
871 stream.cork();
872 });
873 stream.on('job', (job) => {
874 job
875 .on('error', err => {
876 stream.destroy(err);
877 })
878 .on('complete', () => {
879 stream.emit('complete', job);
880 stream.uncork();
881 });
882 });
883 return stream;
884 }
885 extract(destination, optionsOrCallback, cb) {
886 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
887 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
888 this.createExtractJob(destination, options, (err, job, resp) => {
889 if (err) {
890 callback(err, resp);
891 return;
892 }
893 job.on('error', callback).on('complete', metadata => {
894 callback(null, metadata);
895 });
896 });
897 }
898 /**
899 * Retrieves table data from a specified set of rows. The rows are returned to
900 * your callback as an array of objects matching your table's schema.
901 *
902 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list| Tabledata: list API Documentation}
903 *
904 * @param {object} [options] The configuration object.
905 * @param {boolean} [options.autoPaginate=true] Have pagination handled
906 * automatically.
907 * @param {number} [options.maxApiCalls] Maximum number of API calls to make.
908 * @param {number} [options.maxResults] Maximum number of results to return.
909 * @param {boolean|IntegerTypeCastOptions} [options.wrapIntegers=false] Wrap values
910 * of 'INT64' type in {@link BigQueryInt} objects.
911 * If a `boolean`, this will wrap values in {@link BigQueryInt} objects.
912 * If an `object`, this will return a value returned by
913 * `wrapIntegers.integerTypeCastFunction`.
914 * @param {RowsCallback} [callback] The callback function. If `autoPaginate`
915 * is set to false a {@link ManualQueryResultsCallback} should be used.
916 * @param {?error} callback.err An error returned while making this request
917 * @param {array} callback.rows The table data from specified set of rows.
918 * @returns {Promise<RowsResponse>}
919 *
920 * @example
921 * ```
922 * const {BigQuery} = require('@google-cloud/bigquery');
923 * const bigquery = new BigQuery();
924 * const dataset = bigquery.dataset('my-dataset');
925 * const table = dataset.table('my-table');
926 *
927 * table.getRows((err, rows) => {
928 * if (!err) {
929 * // rows is an array of results.
930 * }
931 * });
932 *
933 * //-
934 * // To control how many API requests are made and page through the results
935 * // manually, set `autoPaginate` to `false`.
936 * //-
937 * function manualPaginationCallback(err, rows, nextQuery, apiResponse) {
938 * if (nextQuery) {
939 * // More results exist.
940 * table.getRows(nextQuery, manualPaginationCallback);
941 * }
942 * }
943 *
944 * table.getRows({
945 * autoPaginate: false
946 * }, manualPaginationCallback);
947 *
948 * //-
949 * // If the callback is omitted, we'll return a Promise.
950 * //-
951 * table.getRows().then((data) => {
952 * const rows = data[0];
953 * });
954 * ```
955 */
956 getRows(optionsOrCallback, cb) {
957 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
958 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
959 const wrapIntegers = options.wrapIntegers ? options.wrapIntegers : false;
960 delete options.wrapIntegers;
961 const parseJSON = options.parseJSON ? options.parseJSON : false;
962 delete options.parseJSON;
963 const onComplete = (err, rows, nextQuery, resp) => {
964 if (err) {
965 callback(err, null, null, resp);
966 return;
967 }
968 rows = _1.BigQuery.mergeSchemaWithRows_(this.metadata.schema, rows || [], {
969 wrapIntegers: wrapIntegers,
970 selectedFields: options.selectedFields
971 ? options.selectedFields.split(',')
972 : [],
973 parseJSON,
974 });
975 callback(null, rows, nextQuery, resp);
976 };
977 const qs = extend({
978 'formatOptions.useInt64Timestamp': true,
979 }, options);
980 this.request({
981 uri: '/data',
982 qs,
983 }, (err, resp) => {
984 if (err) {
985 onComplete(err, null, null, resp);
986 return;
987 }
988 let nextQuery = null;
989 if (resp.pageToken) {
990 nextQuery = Object.assign({}, qs, {
991 pageToken: resp.pageToken,
992 });
993 }
994 if (resp.rows && resp.rows.length > 0 && !this.metadata.schema) {
995 // We don't know the schema for this table yet. Do a quick stat.
996 this.getMetadata((err, metadata, apiResponse) => {
997 if (err) {
998 onComplete(err, null, null, apiResponse);
999 return;
1000 }
1001 onComplete(null, resp.rows, nextQuery, resp);
1002 });
1003 return;
1004 }
1005 onComplete(null, resp.rows, nextQuery, resp);
1006 });
1007 }
1008 insert(rows, optionsOrCallback, cb) {
1009 const options = typeof optionsOrCallback === 'object'
1010 ? optionsOrCallback
1011 : {};
1012 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1013 const promise = this._insertAndCreateTable(rows, options);
1014 if (callback) {
1015 promise.then(resp => callback(null, resp), err => callback(err, null));
1016 }
1017 else {
1018 return promise.then(r => [r]);
1019 }
1020 }
1021 /**
1022 * Insert rows with retries, but will create the table if not exists.
1023 *
1024 * @param {RowMetadata | RowMetadata[]} rows
1025 * @param {InsertRowsOptions} options
1026 * @returns {Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable>}
1027 * @private
1028 */
1029 async _insertAndCreateTable(rows, options) {
1030 const { schema } = options;
1031 const delay = 60000;
1032 try {
1033 return await this._insertWithRetry(rows, options);
1034 }
1035 catch (err) {
1036 if (err.code !== 404 || !schema) {
1037 throw err;
1038 }
1039 }
1040 try {
1041 await this.create({ schema });
1042 }
1043 catch (err) {
1044 if (err.code !== 409) {
1045 throw err;
1046 }
1047 }
1048 // table creation after failed access is subject to failure caching and
1049 // eventual consistency, see:
1050 // https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292
1051 await new Promise(resolve => setTimeout(resolve, delay));
1052 return this._insertAndCreateTable(rows, options);
1053 }
1054 /**
1055 * This method will attempt to insert rows while retrying any partial failures
1056 * that occur along the way. Because partial insert failures are returned
1057 * differently, we can't depend on our usual retry strategy.
1058 *
1059 * @private
1060 *
1061 * @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1062 * @param {InsertRowsOptions} options Insert options.
1063 * @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1064 */
1065 async _insertWithRetry(rows, options) {
1066 const { partialRetries = 3 } = options;
1067 let error;
1068 const maxAttempts = Math.max(partialRetries, 0) + 1;
1069 for (let attempts = 0; attempts < maxAttempts; attempts++) {
1070 try {
1071 return await this._insert(rows, options);
1072 }
1073 catch (e) {
1074 error = e;
1075 rows = (e.errors || [])
1076 .filter(err => !!err.row)
1077 .map(err => err.row);
1078 if (!rows.length) {
1079 break;
1080 }
1081 }
1082 }
1083 throw error;
1084 }
1085 /**
1086 * This method does the bulk of the work for processing options and making the
1087 * network request.
1088 *
1089 * @private
1090 *
1091 * @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1092 * @param {InsertRowsOptions} options Insert options.
1093 * @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1094 */
1095 async _insert(rows, options) {
1096 rows = arrify(rows);
1097 if (!rows.length) {
1098 throw new Error('You must provide at least 1 row to be inserted.');
1099 }
1100 const json = extend(true, {}, options, { rows });
1101 if (!options.raw) {
1102 json.rows = rows.map((row) => {
1103 const encoded = {
1104 json: Table.encodeValue_(row),
1105 };
1106 if (options.createInsertId !== false) {
1107 encoded.insertId = uuid.v4();
1108 }
1109 return encoded;
1110 });
1111 }
1112 delete json.createInsertId;
1113 delete json.partialRetries;
1114 delete json.raw;
1115 delete json.schema;
1116 const [resp] = await this.request({
1117 method: 'POST',
1118 uri: '/insertAll',
1119 json,
1120 });
1121 const partialFailures = (resp.insertErrors || []).map((insertError) => {
1122 return {
1123 errors: insertError.errors.map(error => {
1124 return {
1125 message: error.message,
1126 reason: error.reason,
1127 };
1128 }),
1129 // eslint-disable-next-line @typescript-eslint/no-explicit-any
1130 row: rows[insertError.index],
1131 };
1132 });
1133 if (partialFailures.length > 0) {
1134 throw new common_1.util.PartialFailureError({
1135 errors: partialFailures,
1136 response: resp,
1137 });
1138 }
1139 return resp;
1140 }
1141 createInsertStream(options) {
1142 options = typeof options === 'object' ? options : {};
1143 const dup = new stream_1.Duplex({ objectMode: true });
1144 dup._write = (chunk, encoding, cb) => {
1145 this.rowQueue.add(chunk, () => { });
1146 cb();
1147 };
1148 this.rowQueue = new rowQueue_1.RowQueue(this, dup, options);
1149 return dup;
1150 }
1151 load(source, metadataOrCallback, cb) {
1152 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
1153 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
1154 this.createLoadJob(source, metadata, (err, job, resp) => {
1155 if (err) {
1156 callback(err, resp);
1157 return;
1158 }
1159 job.on('error', callback).on('complete', metadata => {
1160 callback(null, metadata);
1161 });
1162 });
1163 }
1164 query(query, callback) {
1165 if (typeof query === 'string') {
1166 query = {
1167 query,
1168 };
1169 }
1170 this.dataset.query(query, callback);
1171 }
1172 setMetadata(metadata, callback) {
1173 const body = Table.formatMetadata_(metadata);
1174 super.setMetadata(body, callback);
1175 }
1176 getIamPolicy(optionsOrCallback, cb) {
1177 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
1178 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1179 if (typeof options.requestedPolicyVersion === 'number' &&
1180 options.requestedPolicyVersion !== 1) {
1181 throw new Error('Only IAM policy version 1 is supported.');
1182 }
1183 const json = extend(true, {}, { options });
1184 this.request({
1185 method: 'POST',
1186 uri: '/:getIamPolicy',
1187 json,
1188 }, (err, resp) => {
1189 if (err) {
1190 callback(err, null);
1191 return;
1192 }
1193 callback(null, resp);
1194 });
1195 }
1196 setIamPolicy(policy, optionsOrCallback, cb) {
1197 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
1198 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1199 if (policy.version && policy.version !== 1) {
1200 throw new Error('Only IAM policy version 1 is supported.');
1201 }
1202 const json = extend(true, {}, options, { policy });
1203 this.request({
1204 method: 'POST',
1205 uri: '/:setIamPolicy',
1206 json,
1207 }, (err, resp) => {
1208 if (err) {
1209 callback(err, null);
1210 return;
1211 }
1212 callback(null, resp);
1213 });
1214 }
1215 testIamPermissions(permissions, callback) {
1216 permissions = arrify(permissions);
1217 const json = extend(true, {}, { permissions });
1218 this.request({
1219 method: 'POST',
1220 uri: '/:testIamPermissions',
1221 json,
1222 }, (err, resp) => {
1223 if (err) {
1224 callback(err, null);
1225 return;
1226 }
1227 callback(null, resp);
1228 });
1229 }
1230}
1231exports.Table = Table;
1232/*! Developer Documentation
1233 *
1234 * These methods can be auto-paginated.
1235 */
1236paginator_1.paginator.extend(Table, ['getRows']);
1237/*! Developer Documentation
1238 *
1239 * All async methods (except for streams) will return a Promise in the event
1240 * that a callback is omitted.
1241 */
1242(0, promisify_1.promisifyAll)(Table);
1243//# sourceMappingURL=table.js.map
\No newline at end of file