UNPKG

45.6 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2014 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.Table = void 0;
19const common_1 = require("@google-cloud/common");
20const paginator_1 = require("@google-cloud/paginator");
21const promisify_1 = require("@google-cloud/promisify");
22const arrify = require("arrify");
23const big_js_1 = require("big.js");
24const extend = require("extend");
25const p_event_1 = require("p-event");
26const fs = require("fs");
27const is = require("is");
28const path = require("path");
29const streamEvents = require("stream-events");
30const uuid = require("uuid");
31const _1 = require(".");
32const stream_1 = require("stream");
33const rowQueue_1 = require("./rowQueue");
34// eslint-disable-next-line @typescript-eslint/no-var-requires
35const duplexify = require('duplexify');
36/**
37 * The file formats accepted by BigQuery.
38 *
39 * @type {object}
40 * @private
41 */
42const FORMATS = {
43 avro: 'AVRO',
44 csv: 'CSV',
45 export_metadata: 'DATASTORE_BACKUP',
46 json: 'NEWLINE_DELIMITED_JSON',
47 orc: 'ORC',
48 parquet: 'PARQUET',
49};
50/**
51 * Table objects are returned by methods such as
52 * {@link Dataset#table}, {@link Dataset#createTable}, and
53 * {@link Dataset#getTables}.
54 *
55 * @class
56 * @param {Dataset} dataset {@link Dataset} instance.
57 * @param {string} id The ID of the table.
58 * @param {object} [options] Table options.
59 * @param {string} [options.location] The geographic location of the table, by
60 * default this value is inherited from the dataset. This can be used to
61 * configure the location of all jobs created through a table instance. It
62 * cannot be used to set the actual location of the table. This value will
63 * be superseded by any API responses containing location data for the
64 * table.
65 *
66 * @example
67 * ```
68 * const {BigQuery} = require('@google-cloud/bigquery');
69 * const bigquery = new BigQuery();
70 * const dataset = bigquery.dataset('my-dataset');
71 *
72 * const table = dataset.table('my-table');
73 * ```
74 */
75class Table extends common_1.ServiceObject {
76 createReadStream(options) {
77 // placeholder body, overwritten in constructor
78 return new paginator_1.ResourceStream({}, () => { });
79 }
80 constructor(dataset, id, options) {
81 const methods = {
82 /**
83 * @callback CreateTableCallback
84 * @param {?Error} err Request error, if any.
85 * @param {Table} table The table.
86 * @param {object} apiResponse The full API response body.
87 */
88 /**
89 * @typedef {array} CreateTableResponse
90 * @property {Table} 0 The table.
91 * @property {object} 1 The full API response body.
92 */
93 /**
94 * Create a table.
95 *
96 * @method Table#create
97 * @param {object} [options] See {@link Dataset#createTable}.
98 * @param {CreateTableCallback} [callback]
99 * @param {?error} callback.err An error returned while making this
100 * request.
101 * @param {Table} callback.table The new {@link Table}.
102 * @param {object} callback.apiResponse The full API response.
103 * @returns {Promise<CreateTableResponse>}
104 *
105 * @example
106 * ```
107 * const {BigQuery} = require('@google-cloud/bigquery');
108 * const bigquery = new BigQuery();
109 * const dataset = bigquery.dataset('my-dataset');
110 *
111 * const table = dataset.table('my-table');
112 *
113 * table.create((err, table, apiResponse) => {
114 * if (!err) {
115 * // The table was created successfully.
116 * }
117 * });
118 *
119 * //-
120 * // If the callback is omitted, we'll return a Promise.
121 * //-
122 * table.create().then((data) => {
123 * const table = data[0];
124 * const apiResponse = data[1];
125 * });
126 * ```
127 */
128 create: true,
129 /**
130 * @callback DeleteTableCallback
131 * @param {?Error} err Request error, if any.
132 * @param {object} apiResponse The full API response.
133 */
134 /**
135 * @typedef {array} DeleteTableResponse
136 * @property {object} 0 The full API response.
137 */
138 /**
139 * Delete a table and all its data.
140 *
141 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tables/delete| Tables: delete API Documentation}
142 *
143 * @method Table#delete
144 * @param {DeleteTableCallback} [callback]
145 * @param {?error} callback.err An error returned while making this
146 * request.
147 * @param {object} callback.apiResponse The full API response.
148 * @returns {Promise<DeleteTableResponse>}
149 *
150 * @example
151 * ```
152 * const {BigQuery} = require('@google-cloud/bigquery');
153 * const bigquery = new BigQuery();
154 * const dataset = bigquery.dataset('my-dataset');
155 *
156 * const table = dataset.table('my-table');
157 *
158 * table.delete((err, apiResponse) => {});
159 *
160 * //-
161 * // If the callback is omitted, we'll return a Promise.
162 * //-
163 * table.delete().then((data) => {
164 * const apiResponse = data[0];
165 * });
166 * ```
167 */
168 delete: true,
169 /**
170 * @callback TableExistsCallback
171 * @param {?Error} err Request error, if any.
172 * @param {boolean} exists Indicates if the table exists.
173 */
174 /**
175 * @typedef {array} TableExistsCallback
176 * @property {boolean} 0 Indicates if the table exists.
177 */
178 /**
179 * Check if the table exists.
180 *
181 * @method Table#exists
182 * @param {TableExistsCallback} [callback]
183 * @param {?error} callback.err An error returned while making this
184 * request.
185 * @param {boolean} callback.exists Whether the table exists or not.
186 * @returns {Promise<TableExistsCallback>}
187 *
188 * @example
189 * ```
190 * const {BigQuery} = require('@google-cloud/bigquery');
191 * const bigquery = new BigQuery();
192 * const dataset = bigquery.dataset('my-dataset');
193 *
194 * const table = dataset.table('my-table');
195 *
196 * table.exists((err, exists) => {});
197 *
198 * //-
199 * // If the callback is omitted, we'll return a Promise.
200 * //-
201 * table.exists().then((data) => {
202 * const exists = data[0];
203 * });
204 * ```
205 */
206 exists: true,
207 /**
208 * @callback GetTableCallback
209 * @param {?Error} err Request error, if any.
210 * @param {Table} table The table.
211 * @param {object} apiResponse The full API response body.
212 */
213 /**
214 * @typedef {array} GetTableResponse
215 * @property {Table} 0 The table.
216 * @property {object} 1 The full API response body.
217 */
218 /**
219 * Get a table if it exists.
220 *
221 * You may optionally use this to "get or create" an object by providing
222 * an object with `autoCreate` set to `true`. Any extra configuration that
223 * is normally required for the `create` method must be contained within
224 * this object as well.
225 *
226 * @method Table#get
227 * @param {options} [options] Configuration object.
228 * @param {boolean} [options.autoCreate=false] Automatically create the
229 * object if it does not exist.
230 * @param {function} [callback]
231 * @param {?error} callback.err An error returned while making this
232 * request.
233 * @param {Table} callback.table The {@link Table}.
234 * @param {object} callback.apiResponse The full API response.
235 * @returns {Promise<GetTableResponse>}
236 *
237 * @example
238 * ```
239 * const {BigQuery} = require('@google-cloud/bigquery');
240 * const bigquery = new BigQuery();
241 * const dataset = bigquery.dataset('my-dataset');
242 *
243 * const table = dataset.table('my-table');
244 *
245 * table.get((err, table, apiResponse) => {
246 * // `table.metadata` has been populated.
247 * });
248 *
249 * //-
250 * // If the callback is omitted, we'll return a Promise.
251 * //-
252 * table.get().then((data) => {
253 * const table = data[0];
254 * const apiResponse = data[1];
255 * });
256 * ```
257 */
258 get: true,
259 /**
260 * @callback GetTableMetadataCallback
261 * @param {?Error} err Request error, if any.
262 * @param {object} metadata The table metadata.
263 * @param {object} apiResponse The full API response.
264 */
265 /**
266 * @typedef {array} GetTableMetadataResponse
267 * @property {object} 0 The table metadata.
268 * @property {object} 1 The full API response.
269 */
270 /**
271 * Return the metadata associated with the Table.
272 *
273 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tables/get| Tables: get API Documentation}
274 *
275 * @method Table#getMetadata
276 * @param {GetTableMetadataCallback} [callback] The callback function.
277 * @param {?error} callback.err An error returned while making this
278 * request.
279 * @param {object} callback.metadata The metadata of the Table.
280 * @param {object} callback.apiResponse The full API response.
281 * @returns {Promise<GetTableMetadataResponse>}
282 *
283 * @example
284 * ```
285 * const {BigQuery} = require('@google-cloud/bigquery');
286 * const bigquery = new BigQuery();
287 * const dataset = bigquery.dataset('my-dataset');
288 *
289 * const table = dataset.table('my-table');
290 *
291 * table.getMetadata((err, metadata, apiResponse) => {});
292 *
293 * //-
294 * // If the callback is omitted, we'll return a Promise.
295 * //-
296 * table.getMetadata().then((data) => {
297 * const metadata = data[0];
298 * const apiResponse = data[1];
299 * });
300 * ```
301 */
302 getMetadata: true,
303 };
304 super({
305 parent: dataset,
306 baseUrl: '/tables',
307 id,
308 createMethod: dataset.createTable.bind(dataset),
309 methods,
310 });
311 if (options && options.location) {
312 this.location = options.location;
313 }
314 this.bigQuery = dataset.bigQuery;
315 this.dataset = dataset;
316 // Catch all for read-modify-write cycle
317 // https://cloud.google.com/bigquery/docs/api-performance#read-patch-write
318 this.interceptors.push({
319 request: (reqOpts) => {
320 if (reqOpts.method === 'PATCH' && reqOpts.json.etag) {
321 reqOpts.headers = reqOpts.headers || {};
322 reqOpts.headers['If-Match'] = reqOpts.json.etag;
323 }
324 return reqOpts;
325 },
326 });
327 /**
328 * Create a readable stream of the rows of data in your table. This method
329 * is simply a wrapper around {@link Table#getRows}.
330 *
331 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list| Tabledata: list API Documentation}
332 *
333 * @returns {ReadableStream}
334 *
335 * @example
336 * ```
337 * const {BigQuery} = require('@google-cloud/bigquery');
338 * const bigquery = new BigQuery();
339 * const dataset = bigquery.dataset('my-dataset');
340 * const table = dataset.table('my-table');
341 *
342 * table.createReadStream(options)
343 * .on('error', console.error)
344 * .on('data', row => {})
345 * .on('end', function() {
346 * // All rows have been retrieved.
347 * });
348 *
349 * //-
350 * // If you anticipate many results, you can end a stream early to prevent
351 * // unnecessary processing and API requests.
352 * //-
353 * table.createReadStream()
354 * .on('data', function(row) {
355 * this.end();
356 * });
357 * ```
358 */
359 this.createReadStream = paginator_1.paginator.streamify('getRows');
360 }
361 /**
362 * Convert a comma-separated name:type string to a table schema object.
363 *
364 * @static
365 * @private
366 *
367 * @param {string} str Comma-separated schema string.
368 * @returns {object} Table schema in the format the API expects.
369 */
370 static createSchemaFromString_(str) {
371 return str.split(/\s*,\s*/).reduce((acc, pair) => {
372 acc.fields.push({
373 name: pair.split(':')[0].trim(),
374 type: (pair.split(':')[1] || 'STRING').toUpperCase().trim(),
375 });
376 return acc;
377 }, {
378 fields: [],
379 });
380 }
381 /**
382 * Convert a row entry from native types to their encoded types that the API
383 * expects.
384 *
385 * @static
386 * @private
387 *
388 * @param {*} value The value to be converted.
389 * @returns {*} The converted value.
390 */
391 static encodeValue_(value) {
392 if (typeof value === 'undefined' || value === null) {
393 return null;
394 }
395 if (value instanceof Buffer) {
396 return value.toString('base64');
397 }
398 if (value instanceof big_js_1.default) {
399 return value.toFixed();
400 }
401 const customTypeConstructorNames = [
402 'BigQueryDate',
403 'BigQueryDatetime',
404 'BigQueryInt',
405 'BigQueryTime',
406 'BigQueryTimestamp',
407 'Geography',
408 ];
409 const constructorName = value.constructor.name;
410 const isCustomType = customTypeConstructorNames.indexOf(constructorName) > -1;
411 if (isCustomType) {
412 return value.value;
413 }
414 if (is.date(value)) {
415 return value.toJSON();
416 }
417 if (is.array(value)) {
418 return value.map(Table.encodeValue_);
419 }
420 if (typeof value === 'object') {
421 return Object.keys(value).reduce((acc, key) => {
422 acc[key] = Table.encodeValue_(value[key]);
423 return acc;
424 }, {});
425 }
426 return value;
427 }
428 /**
429 * @private
430 */
431 static formatMetadata_(options) {
432 const body = extend(true, {}, options);
433 if (options.name) {
434 body.friendlyName = options.name;
435 delete body.name;
436 }
437 if (is.string(options.schema)) {
438 body.schema = Table.createSchemaFromString_(options.schema);
439 }
440 if (is.array(options.schema)) {
441 body.schema = {
442 fields: options.schema,
443 };
444 }
445 if (body.schema && body.schema.fields) {
446 body.schema.fields = body.schema.fields.map(field => {
447 if (field.fields) {
448 field.type = 'RECORD';
449 }
450 return field;
451 });
452 }
453 if (is.string(options.partitioning)) {
454 body.timePartitioning = {
455 type: options.partitioning.toUpperCase(),
456 };
457 delete body.partitioning;
458 }
459 if (is.string(options.view)) {
460 body.view = {
461 query: options.view,
462 useLegacySql: false,
463 };
464 }
465 return body;
466 }
467 copy(destination, metadataOrCallback, cb) {
468 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
469 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
470 this.createCopyJob(destination, metadata, (err, job, resp) => {
471 if (err) {
472 callback(err, resp);
473 return;
474 }
475 job.on('error', callback).on('complete', (metadata) => {
476 callback(null, metadata);
477 });
478 });
479 }
480 copyFrom(sourceTables, metadataOrCallback, cb) {
481 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
482 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
483 this.createCopyFromJob(sourceTables, metadata, (err, job, resp) => {
484 if (err) {
485 callback(err, resp);
486 return;
487 }
488 job.on('error', callback).on('complete', metadata => {
489 callback(null, metadata);
490 });
491 });
492 }
493 createCopyJob(destination, metadataOrCallback, cb) {
494 if (!(destination instanceof Table)) {
495 throw new Error('Destination must be a Table object.');
496 }
497 const metadata = typeof metadataOrCallback === 'object'
498 ? metadataOrCallback
499 : {};
500 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
501 // eslint-disable-next-line @typescript-eslint/no-explicit-any
502 const body = {
503 configuration: {
504 copy: extend(true, metadata, {
505 destinationTable: {
506 datasetId: destination.dataset.id,
507 projectId: destination.bigQuery.projectId,
508 tableId: destination.id,
509 },
510 sourceTable: {
511 datasetId: this.dataset.id,
512 projectId: this.bigQuery.projectId,
513 tableId: this.id,
514 },
515 }),
516 },
517 };
518 if (metadata.jobPrefix) {
519 body.jobPrefix = metadata.jobPrefix;
520 delete metadata.jobPrefix;
521 }
522 if (this.location) {
523 body.location = this.location;
524 }
525 if (metadata.jobId) {
526 body.jobId = metadata.jobId;
527 delete metadata.jobId;
528 }
529 this.bigQuery.createJob(body, callback);
530 }
531 createCopyFromJob(source, metadataOrCallback, cb) {
532 const sourceTables = arrify(source);
533 sourceTables.forEach(sourceTable => {
534 if (!(sourceTable instanceof Table)) {
535 throw new Error('Source must be a Table object.');
536 }
537 });
538 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
539 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
540 // eslint-disable-next-line @typescript-eslint/no-explicit-any
541 const body = {
542 configuration: {
543 copy: extend(true, metadata, {
544 destinationTable: {
545 datasetId: this.dataset.id,
546 projectId: this.bigQuery.projectId,
547 tableId: this.id,
548 },
549 sourceTables: sourceTables.map(sourceTable => {
550 return {
551 datasetId: sourceTable.dataset.id,
552 projectId: sourceTable.bigQuery.projectId,
553 tableId: sourceTable.id,
554 };
555 }),
556 }),
557 },
558 };
559 if (metadata.jobPrefix) {
560 body.jobPrefix = metadata.jobPrefix;
561 delete metadata.jobPrefix;
562 }
563 if (this.location) {
564 body.location = this.location;
565 }
566 if (metadata.jobId) {
567 body.jobId = metadata.jobId;
568 delete metadata.jobId;
569 }
570 this.bigQuery.createJob(body, callback);
571 }
572 createExtractJob(destination, optionsOrCallback, cb) {
573 let options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
574 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
575 options = extend(true, options, {
576 destinationUris: arrify(destination).map(dest => {
577 if (!common_1.util.isCustomType(dest, 'storage/file')) {
578 throw new Error('Destination must be a File object.');
579 }
580 // If no explicit format was provided, attempt to find a match from the
581 // file's extension. If no match, don't set, and default upstream to
582 // CSV.
583 const format = path
584 .extname(dest.name)
585 .substr(1)
586 .toLowerCase();
587 if (!options.destinationFormat && !options.format && FORMATS[format]) {
588 options.destinationFormat = FORMATS[format];
589 }
590 return 'gs://' + dest.bucket.name + '/' + dest.name;
591 }),
592 });
593 if (options.format) {
594 options.format = options.format.toLowerCase();
595 if (FORMATS[options.format]) {
596 options.destinationFormat = FORMATS[options.format];
597 delete options.format;
598 }
599 else {
600 throw new Error('Destination format not recognized: ' + options.format);
601 }
602 }
603 if (options.gzip) {
604 options.compression = 'GZIP';
605 delete options.gzip;
606 }
607 // eslint-disable-next-line @typescript-eslint/no-explicit-any
608 const body = {
609 configuration: {
610 extract: extend(true, options, {
611 sourceTable: {
612 datasetId: this.dataset.id,
613 projectId: this.bigQuery.projectId,
614 tableId: this.id,
615 },
616 }),
617 },
618 };
619 if (options.jobPrefix) {
620 body.jobPrefix = options.jobPrefix;
621 delete options.jobPrefix;
622 }
623 if (this.location) {
624 body.location = this.location;
625 }
626 if (options.jobId) {
627 body.jobId = options.jobId;
628 delete options.jobId;
629 }
630 this.bigQuery.createJob(body, callback);
631 }
632 createLoadJob(source, metadataOrCallback, cb) {
633 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
634 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
635 this._createLoadJob(source, metadata).then(([resp]) => callback(null, resp, resp.metadata), err => callback(err));
636 }
637 /**
638 * @param {string | File | File[]} source
639 * @param {JobLoadMetadata} metadata
640 * @returns {Promise<JobResponse>}
641 * @private
642 */
643 async _createLoadJob(source, metadata) {
644 if (metadata.format) {
645 metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
646 delete metadata.format;
647 }
648 if (this.location) {
649 metadata.location = this.location;
650 }
651 if (typeof source === 'string') {
652 // A path to a file was given. If a sourceFormat wasn't specified, try to
653 // find a match from the file's extension.
654 const detectedFormat = FORMATS[path
655 .extname(source)
656 .substr(1)
657 .toLowerCase()];
658 if (!metadata.sourceFormat && detectedFormat) {
659 metadata.sourceFormat = detectedFormat;
660 }
661 // Read the file into a new write stream.
662 const jobWritable = fs
663 .createReadStream(source)
664 .pipe(this.createWriteStream_(metadata));
665 const jobResponse = (await (0, p_event_1.default)(jobWritable, 'job'));
666 return [jobResponse, jobResponse.metadata];
667 }
668 // eslint-disable-next-line @typescript-eslint/no-explicit-any
669 const body = {
670 configuration: {
671 load: {
672 destinationTable: {
673 projectId: this.bigQuery.projectId,
674 datasetId: this.dataset.id,
675 tableId: this.id,
676 },
677 },
678 },
679 };
680 if (metadata.jobPrefix) {
681 body.jobPrefix = metadata.jobPrefix;
682 delete metadata.jobPrefix;
683 }
684 if (metadata.location) {
685 body.location = metadata.location;
686 delete metadata.location;
687 }
688 if (metadata.jobId) {
689 body.jobId = metadata.jobId;
690 delete metadata.jobId;
691 }
692 extend(true, body.configuration.load, metadata, {
693 sourceUris: arrify(source).map(src => {
694 if (!common_1.util.isCustomType(src, 'storage/file')) {
695 throw new Error('Source must be a File object.');
696 }
697 // If no explicit format was provided, attempt to find a match from
698 // the file's extension. If no match, don't set, and default upstream
699 // to CSV.
700 const format = FORMATS[path
701 .extname(src.name)
702 .substr(1)
703 .toLowerCase()];
704 if (!metadata.sourceFormat && format) {
705 body.configuration.load.sourceFormat = format;
706 }
707 return 'gs://' + src.bucket.name + '/' + src.name;
708 }),
709 });
710 return this.bigQuery.createJob(body);
711 }
712 createQueryJob(options, callback) {
713 return this.dataset.createQueryJob(options, callback);
714 }
715 /**
716 * Run a query scoped to your dataset as a readable object stream.
717 *
718 * See {@link BigQuery#createQueryStream} for full documentation of this
719 * method.
720 *
721 * @param {object} query See {@link BigQuery#createQueryStream} for full
722 * documentation of this method.
723 * @returns {stream} See {@link BigQuery#createQueryStream} for full
724 * documentation of this method.
725 */
726 createQueryStream(query) {
727 return this.dataset.createQueryStream(query);
728 }
729 /**
730 * Creates a write stream. Unlike the public version, this will not
731 * automatically poll the underlying job.
732 *
733 * @private
734 *
735 * @param {string|object} [metadata] Metadata to set with the load operation.
736 * The metadata object should be in the format of the
737 * {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad| `configuration.load`}
738 * property of a Jobs resource. If a string is given, it will be used
739 * as the filetype.
740 * @param {string} [metadata.jobId] Custom job id.
741 * @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
742 * @returns {WritableStream}
743 */
744 createWriteStream_(metadata) {
745 metadata = metadata || {};
746 if (typeof metadata === 'string') {
747 metadata = {
748 sourceFormat: FORMATS[metadata.toLowerCase()],
749 };
750 }
751 if (typeof metadata.schema === 'string') {
752 metadata.schema = Table.createSchemaFromString_(metadata.schema);
753 }
754 metadata = extend(true, {
755 destinationTable: {
756 projectId: this.bigQuery.projectId,
757 datasetId: this.dataset.id,
758 tableId: this.id,
759 },
760 }, metadata);
761 let jobId = metadata.jobId || uuid.v4();
762 if (metadata.jobId) {
763 delete metadata.jobId;
764 }
765 if (metadata.jobPrefix) {
766 jobId = metadata.jobPrefix + jobId;
767 delete metadata.jobPrefix;
768 }
769 const dup = streamEvents(duplexify());
770 dup.once('writing', () => {
771 common_1.util.makeWritableStream(dup, {
772 makeAuthenticatedRequest: this.bigQuery.makeAuthenticatedRequest,
773 metadata: {
774 configuration: {
775 load: metadata,
776 },
777 jobReference: {
778 jobId,
779 projectId: this.bigQuery.projectId,
780 location: this.location,
781 },
782 },
783 request: {
784 uri: `${this.bigQuery.apiEndpoint}/upload/bigquery/v2/projects/${this.bigQuery.projectId}/jobs`,
785 },
786 },
787 // eslint-disable-next-line @typescript-eslint/no-explicit-any
788 (data) => {
789 const job = this.bigQuery.job(data.jobReference.jobId, {
790 location: data.jobReference.location,
791 });
792 job.metadata = data;
793 dup.emit('job', job);
794 });
795 });
796 return dup;
797 }
798 /**
799 * Load data into your table from a readable stream of AVRO, CSV, JSON, ORC,
800 * or PARQUET data.
801 *
802 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert| Jobs: insert API Documentation}
803 *
804 * @param {string|object} [metadata] Metadata to set with the load operation.
805 * The metadata object should be in the format of the
806 * {@link https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad| `configuration.load`}
807 * property of a Jobs resource. If a string is given,
808 * it will be used as the filetype.
809 * @param {string} [metadata.jobId] Custom job id.
810 * @param {string} [metadata.jobPrefix] Prefix to apply to the job id.
811 * @returns {WritableStream}
812 *
813 * @throws {Error} If source format isn't recognized.
814 *
815 * @example
816 * ```
817 * const {BigQuery} = require('@google-cloud/bigquery');
818 * const bigquery = new BigQuery();
819 * const dataset = bigquery.dataset('my-dataset');
820 * const table = dataset.table('my-table');
821 *
822 * //-
823 * // Load data from a CSV file.
824 * //-
825 * const request = require('request');
826 *
827 * const csvUrl = 'http://goo.gl/kSE7z6';
828 *
829 * const metadata = {
830 * allowJaggedRows: true,
831 * skipLeadingRows: 1
832 * };
833 *
834 * request.get(csvUrl)
835 * .pipe(table.createWriteStream(metadata))
836 * .on('job', (job) => {
837 * // `job` is a Job object that can be used to check the status of the
838 * // request.
839 * })
840 * .on('complete', (job) => {
841 * // The job has completed successfully.
842 * });
843 *
844 * //-
845 * // Load data from a JSON file.
846 * //-
847 * const fs = require('fs');
848 *
849 * fs.createReadStream('./test/testdata/testfile.json')
850 * .pipe(table.createWriteStream('json'))
851 * .on('job', (job) => {
852 * // `job` is a Job object that can be used to check the status of the
853 * // request.
854 * })
855 * .on('complete', (job) => {
856 * // The job has completed successfully.
857 * });
858 * ```
859 */
860 createWriteStream(metadata) {
861 const stream = this.createWriteStream_(metadata);
862 stream.on('prefinish', () => {
863 stream.cork();
864 });
865 stream.on('job', (job) => {
866 job
867 .on('error', err => {
868 stream.destroy(err);
869 })
870 .on('complete', () => {
871 stream.emit('complete', job);
872 stream.uncork();
873 });
874 });
875 return stream;
876 }
877 extract(destination, optionsOrCallback, cb) {
878 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
879 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
880 this.createExtractJob(destination, options, (err, job, resp) => {
881 if (err) {
882 callback(err, resp);
883 return;
884 }
885 job.on('error', callback).on('complete', metadata => {
886 callback(null, metadata);
887 });
888 });
889 }
890 /**
891 * Retrieves table data from a specified set of rows. The rows are returned to
892 * your callback as an array of objects matching your table's schema.
893 *
894 * See {@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list| Tabledata: list API Documentation}
895 *
896 * @param {object} [options] The configuration object.
897 * @param {boolean} [options.autoPaginate=true] Have pagination handled
898 * automatically.
899 * @param {number} [options.maxApiCalls] Maximum number of API calls to make.
900 * @param {number} [options.maxResults] Maximum number of results to return.
901 * @param {boolean|IntegerTypeCastOptions} [options.wrapIntegers=false] Wrap values
902 * of 'INT64' type in {@link BigQueryInt} objects.
903 * If a `boolean`, this will wrap values in {@link BigQueryInt} objects.
904 * If an `object`, this will return a value returned by
905 * `wrapIntegers.integerTypeCastFunction`.
906 * @param {RowsCallback} [callback] The callback function. If `autoPaginate`
907 * is set to false a {@link ManualQueryResultsCallback} should be used.
908 * @param {?error} callback.err An error returned while making this request
909 * @param {array} callback.rows The table data from specified set of rows.
910 * @returns {Promise<RowsResponse>}
911 *
912 * @example
913 * ```
914 * const {BigQuery} = require('@google-cloud/bigquery');
915 * const bigquery = new BigQuery();
916 * const dataset = bigquery.dataset('my-dataset');
917 * const table = dataset.table('my-table');
918 *
919 * table.getRows((err, rows) => {
920 * if (!err) {
921 * // rows is an array of results.
922 * }
923 * });
924 *
925 * //-
926 * // To control how many API requests are made and page through the results
927 * // manually, set `autoPaginate` to `false`.
928 * //-
929 * function manualPaginationCallback(err, rows, nextQuery, apiResponse) {
930 * if (nextQuery) {
931 * // More results exist.
932 * table.getRows(nextQuery, manualPaginationCallback);
933 * }
934 * }
935 *
936 * table.getRows({
937 * autoPaginate: false
938 * }, manualPaginationCallback);
939 *
940 * //-
941 * // If the callback is omitted, we'll return a Promise.
942 * //-
943 * table.getRows().then((data) => {
944 * const rows = data[0];
945 * });
946 * ```
947 */
948 getRows(optionsOrCallback, cb) {
949 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
950 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
951 const wrapIntegers = options.wrapIntegers ? options.wrapIntegers : false;
952 delete options.wrapIntegers;
953 const onComplete = (err, rows, nextQuery, resp) => {
954 if (err) {
955 callback(err, null, null, resp);
956 return;
957 }
958 rows = _1.BigQuery.mergeSchemaWithRows_(this.metadata.schema, rows || [], wrapIntegers, options.selectedFields ? options.selectedFields.split(',') : []);
959 callback(null, rows, nextQuery, resp);
960 };
961 this.request({
962 uri: '/data',
963 qs: options,
964 }, (err, resp) => {
965 if (err) {
966 onComplete(err, null, null, resp);
967 return;
968 }
969 let nextQuery = null;
970 if (resp.pageToken) {
971 nextQuery = Object.assign({}, options, {
972 pageToken: resp.pageToken,
973 });
974 }
975 if (resp.rows && resp.rows.length > 0 && !this.metadata.schema) {
976 // We don't know the schema for this table yet. Do a quick stat.
977 this.getMetadata((err, metadata, apiResponse) => {
978 if (err) {
979 onComplete(err, null, null, apiResponse);
980 return;
981 }
982 onComplete(null, resp.rows, nextQuery, resp);
983 });
984 return;
985 }
986 onComplete(null, resp.rows, nextQuery, resp);
987 });
988 }
989 insert(rows, optionsOrCallback, cb) {
990 const options = typeof optionsOrCallback === 'object'
991 ? optionsOrCallback
992 : {};
993 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
994 const promise = this._insertAndCreateTable(rows, options);
995 if (callback) {
996 promise.then(resp => callback(null, resp), err => callback(err, null));
997 }
998 else {
999 return promise.then(r => [r]);
1000 }
1001 }
1002 /**
1003 * Insert rows with retries, but will create the table if not exists.
1004 *
1005 * @param {RowMetadata | RowMetadata[]} rows
1006 * @param {InsertRowsOptions} options
1007 * @returns {Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable>}
1008 * @private
1009 */
1010 async _insertAndCreateTable(rows, options) {
1011 const { schema } = options;
1012 const delay = 60000;
1013 try {
1014 return await this._insertWithRetry(rows, options);
1015 }
1016 catch (err) {
1017 if (err.code !== 404 || !schema) {
1018 throw err;
1019 }
1020 }
1021 try {
1022 await this.create({ schema });
1023 }
1024 catch (err) {
1025 if (err.code !== 409) {
1026 throw err;
1027 }
1028 }
1029 // table creation after failed access is subject to failure caching and
1030 // eventual consistency, see:
1031 // https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292
1032 await new Promise(resolve => setTimeout(resolve, delay));
1033 return this._insertAndCreateTable(rows, options);
1034 }
1035 /**
1036 * This method will attempt to insert rows while retrying any partial failures
1037 * that occur along the way. Because partial insert failures are returned
1038 * differently, we can't depend on our usual retry strategy.
1039 *
1040 * @private
1041 *
1042 * @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1043 * @param {InsertRowsOptions} options Insert options.
1044 * @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1045 */
1046 async _insertWithRetry(rows, options) {
1047 const { partialRetries = 3 } = options;
1048 let error;
1049 const maxAttempts = Math.max(partialRetries, 0) + 1;
1050 for (let attempts = 0; attempts < maxAttempts; attempts++) {
1051 try {
1052 return await this._insert(rows, options);
1053 }
1054 catch (e) {
1055 error = e;
1056 rows = (e.errors || [])
1057 .filter(err => !!err.row)
1058 .map(err => err.row);
1059 if (!rows.length) {
1060 break;
1061 }
1062 }
1063 }
1064 throw error;
1065 }
1066 /**
1067 * This method does the bulk of the work for processing options and making the
1068 * network request.
1069 *
1070 * @private
1071 *
1072 * @param {RowMetadata|RowMetadata[]} rows The rows to insert.
1073 * @param {InsertRowsOptions} options Insert options.
1074 * @returns {Promise<bigquery.ITableDataInsertAllResponse>}
1075 */
1076 async _insert(rows, options) {
1077 rows = arrify(rows);
1078 if (!rows.length) {
1079 throw new Error('You must provide at least 1 row to be inserted.');
1080 }
1081 const json = extend(true, {}, options, { rows });
1082 if (!options.raw) {
1083 json.rows = rows.map((row) => {
1084 const encoded = {
1085 json: Table.encodeValue_(row),
1086 };
1087 if (options.createInsertId !== false) {
1088 encoded.insertId = uuid.v4();
1089 }
1090 return encoded;
1091 });
1092 }
1093 delete json.createInsertId;
1094 delete json.partialRetries;
1095 delete json.raw;
1096 delete json.schema;
1097 const [resp] = await this.request({
1098 method: 'POST',
1099 uri: '/insertAll',
1100 json,
1101 });
1102 const partialFailures = (resp.insertErrors || []).map((insertError) => {
1103 return {
1104 errors: insertError.errors.map(error => {
1105 return {
1106 message: error.message,
1107 reason: error.reason,
1108 };
1109 }),
1110 // eslint-disable-next-line @typescript-eslint/no-explicit-any
1111 row: rows[insertError.index],
1112 };
1113 });
1114 if (partialFailures.length > 0) {
1115 throw new common_1.util.PartialFailureError({
1116 errors: partialFailures,
1117 response: resp,
1118 });
1119 }
1120 return resp;
1121 }
1122 createInsertStream(options) {
1123 options = typeof options === 'object' ? options : {};
1124 const dup = new stream_1.Duplex({ objectMode: true });
1125 dup._write = (chunk, encoding, cb) => {
1126 this.rowQueue.add(chunk, () => { });
1127 cb();
1128 };
1129 this.rowQueue = new rowQueue_1.RowQueue(this, dup, options);
1130 return dup;
1131 }
1132 load(source, metadataOrCallback, cb) {
1133 const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
1134 const callback = typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;
1135 this.createLoadJob(source, metadata, (err, job, resp) => {
1136 if (err) {
1137 callback(err, resp);
1138 return;
1139 }
1140 job.on('error', callback).on('complete', metadata => {
1141 callback(null, metadata);
1142 });
1143 });
1144 }
1145 query(query, callback) {
1146 if (typeof query === 'string') {
1147 query = {
1148 query,
1149 };
1150 }
1151 this.dataset.query(query, callback);
1152 }
1153 setMetadata(metadata, callback) {
1154 const body = Table.formatMetadata_(metadata);
1155 super.setMetadata(body, callback);
1156 }
1157 getIamPolicy(optionsOrCallback, cb) {
1158 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
1159 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1160 if (typeof options.requestedPolicyVersion === 'number' &&
1161 options.requestedPolicyVersion !== 1) {
1162 throw new Error('Only IAM policy version 1 is supported.');
1163 }
1164 const json = extend(true, {}, { options });
1165 this.request({
1166 method: 'POST',
1167 uri: '/:getIamPolicy',
1168 json,
1169 }, (err, resp) => {
1170 if (err) {
1171 callback(err, null);
1172 return;
1173 }
1174 callback(null, resp);
1175 });
1176 }
1177 setIamPolicy(policy, optionsOrCallback, cb) {
1178 const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
1179 const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb;
1180 if (policy.version && policy.version !== 1) {
1181 throw new Error('Only IAM policy version 1 is supported.');
1182 }
1183 const json = extend(true, {}, options, { policy });
1184 this.request({
1185 method: 'POST',
1186 uri: '/:setIamPolicy',
1187 json,
1188 }, (err, resp) => {
1189 if (err) {
1190 callback(err, null);
1191 return;
1192 }
1193 callback(null, resp);
1194 });
1195 }
1196 testIamPermissions(permissions, callback) {
1197 permissions = arrify(permissions);
1198 const json = extend(true, {}, { permissions });
1199 this.request({
1200 method: 'POST',
1201 uri: '/:testIamPermissions',
1202 json,
1203 }, (err, resp) => {
1204 if (err) {
1205 callback(err, null);
1206 return;
1207 }
1208 callback(null, resp);
1209 });
1210 }
1211}
1212exports.Table = Table;
1213/*! Developer Documentation
1214 *
1215 * These methods can be auto-paginated.
1216 */
1217paginator_1.paginator.extend(Table, ['getRows']);
1218/*! Developer Documentation
1219 *
1220 * All async methods (except for streams) will return a Promise in the event
1221 * that a callback is omitted.
1222 */
1223(0, promisify_1.promisifyAll)(Table);
1224//# sourceMappingURL=table.js.map
\No newline at end of file