tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs
This document covers extracting data from BigQuery tables to Google Cloud Storage and copying tables within BigQuery.
type Extractor struct {
JobIDConfig
ExtractConfig
}func (t *Table) ExtractorTo(dst *GCSReference) *Extractorfunc (m *Model) ExtractorTo(dst *GCSReference) *Extractorfunc (e *Extractor) Run(ctx context.Context) (*Job, error)type ExtractConfig struct {
// Src is the table from which data will be extracted
Src *Table
// SrcModel is the ML model from which data will be extracted
SrcModel *Model
// Dst is the destination into which data will be extracted
Dst *GCSReference
// DisableHeader disables the printing of a header row
DisableHeader bool
// Labels for the extract job
Labels map[string]string
// UseAvroLogicalTypes for Avro-based extracts
UseAvroLogicalTypes bool
// JobTimeout for the extract job
JobTimeout time.Duration
// Reservation for the extract job
Reservation string
// MaxSlots for slot usage limit
MaxSlots int32
}gcsRef := bigquery.NewGCSReference("gs://my-bucket/export/data-*.csv")
gcsRef.DestinationFormat = bigquery.CSV
gcsRef.Compression = bigquery.Gzip
extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}const (
CSV DataFormat = "CSV"
Avro DataFormat = "AVRO"
JSON DataFormat = "NEWLINE_DELIMITED_JSON"
Parquet DataFormat = "PARQUET"
// For ML models
TFSavedModel DataFormat = "ML_TF_SAVED_MODEL"
XGBoostBooster DataFormat = "ML_XGBOOST_BOOSTER"
)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.csv")
gcsRef.DestinationFormat = bigquery.CSV
gcsRef.FieldDelimiter = ","
gcsRef.Compression = bigquery.Gzip
extractor := table.ExtractorTo(gcsRef)
extractor.DisableHeader = false
job, err := extractor.Run(ctx)Note: CSV format does not support nested/repeated fields.
gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.json")
gcsRef.DestinationFormat = bigquery.JSON
gcsRef.Compression = bigquery.Gzip
extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.avro")
gcsRef.DestinationFormat = bigquery.Avro
gcsRef.Compression = bigquery.Snappy
extractor := table.ExtractorTo(gcsRef)
extractor.UseAvroLogicalTypes = true
job, err := extractor.Run(ctx)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.parquet")
gcsRef.DestinationFormat = bigquery.Parquet
gcsRef.Compression = bigquery.Snappy
extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)const (
None Compression = "NONE"
Gzip Compression = "GZIP"
Deflate Compression = "DEFLATE" // Avro only
Snappy Compression = "SNAPPY" // Avro and Parquet
)gcsRef.Compression = bigquery.GzipExport large tables into multiple files:
// Creates files: data-000000000000.csv, data-000000000001.csv, etc.
gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.csv")model := dataset.Model("my_ml_model")
gcsRef := bigquery.NewGCSReference("gs://my-bucket/model")
gcsRef.DestinationFormat = bigquery.TFSavedModel
extractor := model.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)type ExtractStatistics struct {
DestinationURIFileCounts []int64
}Get extract statistics:
job, err := extractor.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
stats := job.LastStatus().Statistics
extractStats := stats.Details.(*bigquery.ExtractStatistics)
for i, count := range extractStats.DestinationURIFileCounts {
fmt.Printf("URI %d: %d files\n", i, count)
}type Copier struct {
JobIDConfig
CopyConfig
}func (t *Table) CopierFrom(srcs ...*Table) *Copierfunc (c *Copier) Run(ctx context.Context) (*Job, error)type TableCopyOperationType stringconst (
CopyOperation TableCopyOperationType = "COPY"
SnapshotOperation TableCopyOperationType = "SNAPSHOT"
RestoreOperation TableCopyOperationType = "RESTORE"
CloneOperation TableCopyOperationType = "CLONE"
)Table copy operation types specify the kind of copy operation to perform:
CopyOperation - Normal table to table copying (default)SnapshotOperation - Create an immutable snapshot from a regular tableRestoreOperation - Create/restore a table from a snapshotCloneOperation - Create a table clone with copy-on-write semantics (billed based on difference from base table)// Create a snapshot
copier := snapshotTable.CopierFrom(srcTable)
copier.OperationType = bigquery.SnapshotOperation
job, err := copier.Run(ctx)
// Restore from snapshot
copier := restoredTable.CopierFrom(snapshotTable)
copier.OperationType = bigquery.RestoreOperation
job, err := copier.Run(ctx)
// Create a clone
copier := cloneTable.CopierFrom(srcTable)
copier.OperationType = bigquery.CloneOperation
job, err := copier.Run(ctx)type CopyConfig struct {
// Srcs are the tables from which data will be copied
Srcs []*Table
// Dst is the table into which data will be copied
Dst *Table
// CreateDisposition specifies when to create the destination table
CreateDisposition TableCreateDisposition
// WriteDisposition specifies how to handle existing data
WriteDisposition TableWriteDisposition
// Labels for the copy job
Labels map[string]string
// DestinationEncryptionConfig for destination table
DestinationEncryptionConfig *EncryptionConfig
// OperationType for copy operations
OperationType TableCopyOperationType
// JobTimeout for the copy job
JobTimeout time.Duration
// Reservation for the copy job
Reservation string
// MaxSlots for slot usage limit
MaxSlots int32
}srcTable := dataset.Table("source_table")
dstTable := dataset.Table("destination_table")
copier := dstTable.CopierFrom(srcTable)
copier.WriteDisposition = bigquery.WriteTruncate
job, err := copier.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)src1 := dataset.Table("table1")
src2 := dataset.Table("table2")
src3 := dataset.Table("table3")
dst := dataset.Table("combined_table")
copier := dst.CopierFrom(src1, src2, src3)
copier.WriteDisposition = bigquery.WriteAppend
job, err := copier.Run(ctx)copier := dstTable.CopierFrom(srcTable)
copier.DestinationEncryptionConfig = &bigquery.EncryptionConfig{
KMSKeyName: "projects/my-project/locations/us/keyRings/my-ring/cryptoKeys/my-key",
}
job, err := copier.Run(ctx)type TableCopyOperationType string
const (
CopyOperationType TableCopyOperationType = "COPY"
SnapshotOperationType TableCopyOperationType = "SNAPSHOT"
RestoreOperationType TableCopyOperationType = "RESTORE"
CloneOperationType TableCopyOperationType = "CLONE"
)srcTable := dataset.Table("source_table")
snapshotTable := dataset.Table("snapshot_20240120")
copier := snapshotTable.CopierFrom(srcTable)
copier.OperationType = bigquery.SnapshotOperationType
job, err := copier.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)snapshotTable := dataset.Table("snapshot_20240120")
restoredTable := dataset.Table("restored_table")
copier := restoredTable.CopierFrom(snapshotTable)
copier.OperationType = bigquery.RestoreOperationType
job, err := copier.Run(ctx)srcTable := dataset.Table("source_table")
cloneTable := dataset.Table("clone_table")
copier := cloneTable.CopierFrom(srcTable)
copier.OperationType = bigquery.CloneOperationType
job, err := copier.Run(ctx)type TableWriteDisposition string
const (
WriteEmpty TableWriteDisposition = "WRITE_EMPTY"
WriteTruncate TableWriteDisposition = "WRITE_TRUNCATE"
WriteAppend TableWriteDisposition = "WRITE_APPEND"
)copier.WriteDisposition = bigquery.WriteTruncate // Replace table
copier.WriteDisposition = bigquery.WriteAppend // Append to table
copier.WriteDisposition = bigquery.WriteEmpty // Fail if table existstype TableCreateDisposition string
const (
CreateIfNeeded TableCreateDisposition = "CREATE_IF_NEEDED"
CreateNever TableCreateDisposition = "CREATE_NEVER"
)copier.CreateDisposition = bigquery.CreateIfNeeded // Create if doesn't exist
copier.CreateDisposition = bigquery.CreateNever // Fail if doesn't existpackage main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/bigquery"
)
func main() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "my-project")
if err != nil {
log.Fatal(err)
}
defer client.Close()
dataset := client.Dataset("analytics")
table := dataset.Table("events")
// Export to CSV
exportToCSV(ctx, table)
// Export to Parquet
exportToParquet(ctx, table)
// Copy table
copyTable(ctx, dataset)
// Create snapshot
createSnapshot(ctx, dataset)
}
func exportToCSV(ctx context.Context, table *bigquery.Table) {
gcsRef := bigquery.NewGCSReference("gs://my-bucket/exports/events-*.csv.gz")
gcsRef.DestinationFormat = bigquery.CSV
gcsRef.Compression = bigquery.Gzip
gcsRef.FieldDelimiter = ","
extractor := table.ExtractorTo(gcsRef)
extractor.DisableHeader = false
extractor.Labels = map[string]string{
"export-type": "csv",
"date": "2024-01-20",
}
job, err := extractor.Run(ctx)
if err != nil {
log.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
log.Fatal(err)
}
if err := status.Err(); err != nil {
log.Fatal(err)
}
fmt.Println("CSV export completed")
}
func exportToParquet(ctx context.Context, table *bigquery.Table) {
gcsRef := bigquery.NewGCSReference("gs://my-bucket/exports/events-*.parquet")
gcsRef.DestinationFormat = bigquery.Parquet
gcsRef.Compression = bigquery.Snappy
extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)
if err != nil {
log.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Parquet export completed")
}
func copyTable(ctx context.Context, dataset *bigquery.Dataset) {
srcTable := dataset.Table("events")
dstTable := dataset.Table("events_backup")
copier := dstTable.CopierFrom(srcTable)
copier.WriteDisposition = bigquery.WriteTruncate
copier.Labels = map[string]string{
"backup": "daily",
}
job, err := copier.Run(ctx)
if err != nil {
log.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Table copy completed")
}
func createSnapshot(ctx context.Context, dataset *bigquery.Dataset) {
srcTable := dataset.Table("events")
snapshotTable := dataset.Table("events_snapshot_20240120")
copier := snapshotTable.CopierFrom(srcTable)
copier.OperationType = bigquery.SnapshotOperationType
job, err := copier.Run(ctx)
if err != nil {
log.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Snapshot created")
}