or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/bigquery@v1.72.0

docs

advanced-features.mdclient-setup.mddata-export.mddata-loading.mddatasets.mdindex.mdjobs.mdqueries.mdstorage-read.mdstorage-write.mdtables.md
tile.json

tessl/golang-cloud-google-com--go--bigquery

tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0

Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs

data-export.mddocs/

Data Export and Table Copy

This document covers extracting data from BigQuery tables to Google Cloud Storage and copying tables within BigQuery.

Table Extraction

Extractor Type

type Extractor struct {
    JobIDConfig
    ExtractConfig
}
func (t *Table) ExtractorTo(dst *GCSReference) *Extractor
func (m *Model) ExtractorTo(dst *GCSReference) *Extractor
func (e *Extractor) Run(ctx context.Context) (*Job, error)

ExtractConfig Type

type ExtractConfig struct {
    // Src is the table from which data will be extracted
    Src *Table

    // SrcModel is the ML model from which data will be extracted
    SrcModel *Model

    // Dst is the destination into which data will be extracted
    Dst *GCSReference

    // DisableHeader disables the printing of a header row
    DisableHeader bool

    // Labels for the extract job
    Labels map[string]string

    // UseAvroLogicalTypes for Avro-based extracts
    UseAvroLogicalTypes bool

    // JobTimeout for the extract job
    JobTimeout time.Duration

    // Reservation for the extract job
    Reservation string

    // MaxSlots for slot usage limit
    MaxSlots int32
}

Basic Table Extract

gcsRef := bigquery.NewGCSReference("gs://my-bucket/export/data-*.csv")
gcsRef.DestinationFormat = bigquery.CSV
gcsRef.Compression = bigquery.Gzip

extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)
if err != nil {
    return err
}

status, err := job.Wait(ctx)
if err != nil {
    return err
}

Export Formats

Supported Formats

const (
    CSV          DataFormat = "CSV"
    Avro         DataFormat = "AVRO"
    JSON         DataFormat = "NEWLINE_DELIMITED_JSON"
    Parquet      DataFormat = "PARQUET"
    // For ML models
    TFSavedModel   DataFormat = "ML_TF_SAVED_MODEL"
    XGBoostBooster DataFormat = "ML_XGBOOST_BOOSTER"
)

CSV Export

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.csv")
gcsRef.DestinationFormat = bigquery.CSV
gcsRef.FieldDelimiter = ","
gcsRef.Compression = bigquery.Gzip

extractor := table.ExtractorTo(gcsRef)
extractor.DisableHeader = false
job, err := extractor.Run(ctx)

Note: CSV format does not support nested/repeated fields.

JSON Export

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.json")
gcsRef.DestinationFormat = bigquery.JSON
gcsRef.Compression = bigquery.Gzip

extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)

Avro Export

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.avro")
gcsRef.DestinationFormat = bigquery.Avro
gcsRef.Compression = bigquery.Snappy

extractor := table.ExtractorTo(gcsRef)
extractor.UseAvroLogicalTypes = true
job, err := extractor.Run(ctx)

Parquet Export

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.parquet")
gcsRef.DestinationFormat = bigquery.Parquet
gcsRef.Compression = bigquery.Snappy

extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)

Compression Options

const (
    None    Compression = "NONE"
    Gzip    Compression = "GZIP"
    Deflate Compression = "DEFLATE" // Avro only
    Snappy  Compression = "SNAPPY"  // Avro and Parquet
)
gcsRef.Compression = bigquery.Gzip

Wildcard Exports

Export large tables into multiple files:

// Creates files: data-000000000000.csv, data-000000000001.csv, etc.
gcsRef := bigquery.NewGCSReference("gs://my-bucket/data-*.csv")

ML Model Export

model := dataset.Model("my_ml_model")
gcsRef := bigquery.NewGCSReference("gs://my-bucket/model")
gcsRef.DestinationFormat = bigquery.TFSavedModel

extractor := model.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)

Extract Statistics

type ExtractStatistics struct {
    DestinationURIFileCounts []int64
}

Get extract statistics:

job, err := extractor.Run(ctx)
if err != nil {
    return err
}

status, err := job.Wait(ctx)
if err != nil {
    return err
}

stats := job.LastStatus().Statistics
extractStats := stats.Details.(*bigquery.ExtractStatistics)

for i, count := range extractStats.DestinationURIFileCounts {
    fmt.Printf("URI %d: %d files\n", i, count)
}

Table Copy

Copier Type

type Copier struct {
    JobIDConfig
    CopyConfig
}
func (t *Table) CopierFrom(srcs ...*Table) *Copier
func (c *Copier) Run(ctx context.Context) (*Job, error)

Table Copy Operation Types

type TableCopyOperationType string
const (
    CopyOperation     TableCopyOperationType = "COPY"
    SnapshotOperation TableCopyOperationType = "SNAPSHOT"
    RestoreOperation  TableCopyOperationType = "RESTORE"
    CloneOperation    TableCopyOperationType = "CLONE"
)

Table copy operation types specify the kind of copy operation to perform:

  • CopyOperation - Normal table to table copying (default)
  • SnapshotOperation - Create an immutable snapshot from a regular table
  • RestoreOperation - Create/restore a table from a snapshot
  • CloneOperation - Create a table clone with copy-on-write semantics (billed based on difference from base table)
// Create a snapshot
copier := snapshotTable.CopierFrom(srcTable)
copier.OperationType = bigquery.SnapshotOperation
job, err := copier.Run(ctx)

// Restore from snapshot
copier := restoredTable.CopierFrom(snapshotTable)
copier.OperationType = bigquery.RestoreOperation
job, err := copier.Run(ctx)

// Create a clone
copier := cloneTable.CopierFrom(srcTable)
copier.OperationType = bigquery.CloneOperation
job, err := copier.Run(ctx)

CopyConfig Type

type CopyConfig struct {
    // Srcs are the tables from which data will be copied
    Srcs []*Table

    // Dst is the table into which data will be copied
    Dst *Table

    // CreateDisposition specifies when to create the destination table
    CreateDisposition TableCreateDisposition

    // WriteDisposition specifies how to handle existing data
    WriteDisposition TableWriteDisposition

    // Labels for the copy job
    Labels map[string]string

    // DestinationEncryptionConfig for destination table
    DestinationEncryptionConfig *EncryptionConfig

    // OperationType for copy operations
    OperationType TableCopyOperationType

    // JobTimeout for the copy job
    JobTimeout time.Duration

    // Reservation for the copy job
    Reservation string

    // MaxSlots for slot usage limit
    MaxSlots int32
}

Basic Table Copy

srcTable := dataset.Table("source_table")
dstTable := dataset.Table("destination_table")

copier := dstTable.CopierFrom(srcTable)
copier.WriteDisposition = bigquery.WriteTruncate

job, err := copier.Run(ctx)
if err != nil {
    return err
}

status, err := job.Wait(ctx)

Copy Multiple Tables

src1 := dataset.Table("table1")
src2 := dataset.Table("table2")
src3 := dataset.Table("table3")
dst := dataset.Table("combined_table")

copier := dst.CopierFrom(src1, src2, src3)
copier.WriteDisposition = bigquery.WriteAppend

job, err := copier.Run(ctx)

Copy with Encryption

copier := dstTable.CopierFrom(srcTable)
copier.DestinationEncryptionConfig = &bigquery.EncryptionConfig{
    KMSKeyName: "projects/my-project/locations/us/keyRings/my-ring/cryptoKeys/my-key",
}

job, err := copier.Run(ctx)

Table Copy Operations

TableCopyOperationType

type TableCopyOperationType string

const (
    CopyOperationType     TableCopyOperationType = "COPY"
    SnapshotOperationType TableCopyOperationType = "SNAPSHOT"
    RestoreOperationType  TableCopyOperationType = "RESTORE"
    CloneOperationType    TableCopyOperationType = "CLONE"
)

Creating Table Snapshots

srcTable := dataset.Table("source_table")
snapshotTable := dataset.Table("snapshot_20240120")

copier := snapshotTable.CopierFrom(srcTable)
copier.OperationType = bigquery.SnapshotOperationType

job, err := copier.Run(ctx)
if err != nil {
    return err
}

status, err := job.Wait(ctx)

Restoring from Snapshot

snapshotTable := dataset.Table("snapshot_20240120")
restoredTable := dataset.Table("restored_table")

copier := restoredTable.CopierFrom(snapshotTable)
copier.OperationType = bigquery.RestoreOperationType

job, err := copier.Run(ctx)

Cloning Tables

srcTable := dataset.Table("source_table")
cloneTable := dataset.Table("clone_table")

copier := cloneTable.CopierFrom(srcTable)
copier.OperationType = bigquery.CloneOperationType

job, err := copier.Run(ctx)

Write Disposition

type TableWriteDisposition string

const (
    WriteEmpty    TableWriteDisposition = "WRITE_EMPTY"
    WriteTruncate TableWriteDisposition = "WRITE_TRUNCATE"
    WriteAppend   TableWriteDisposition = "WRITE_APPEND"
)
copier.WriteDisposition = bigquery.WriteTruncate // Replace table
copier.WriteDisposition = bigquery.WriteAppend   // Append to table
copier.WriteDisposition = bigquery.WriteEmpty    // Fail if table exists

Create Disposition

type TableCreateDisposition string

const (
    CreateIfNeeded TableCreateDisposition = "CREATE_IF_NEEDED"
    CreateNever    TableCreateDisposition = "CREATE_NEVER"
)
copier.CreateDisposition = bigquery.CreateIfNeeded // Create if doesn't exist
copier.CreateDisposition = bigquery.CreateNever    // Fail if doesn't exist

Complete Export Example

package main

import (
    "context"
    "fmt"
    "log"

    "cloud.google.com/go/bigquery"
)

func main() {
    ctx := context.Background()
    client, err := bigquery.NewClient(ctx, "my-project")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    dataset := client.Dataset("analytics")
    table := dataset.Table("events")

    // Export to CSV
    exportToCSV(ctx, table)

    // Export to Parquet
    exportToParquet(ctx, table)

    // Copy table
    copyTable(ctx, dataset)

    // Create snapshot
    createSnapshot(ctx, dataset)
}

func exportToCSV(ctx context.Context, table *bigquery.Table) {
    gcsRef := bigquery.NewGCSReference("gs://my-bucket/exports/events-*.csv.gz")
    gcsRef.DestinationFormat = bigquery.CSV
    gcsRef.Compression = bigquery.Gzip
    gcsRef.FieldDelimiter = ","

    extractor := table.ExtractorTo(gcsRef)
    extractor.DisableHeader = false
    extractor.Labels = map[string]string{
        "export-type": "csv",
        "date":        "2024-01-20",
    }

    job, err := extractor.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }

    status, err := job.Wait(ctx)
    if err != nil {
        log.Fatal(err)
    }

    if err := status.Err(); err != nil {
        log.Fatal(err)
    }

    fmt.Println("CSV export completed")
}

func exportToParquet(ctx context.Context, table *bigquery.Table) {
    gcsRef := bigquery.NewGCSReference("gs://my-bucket/exports/events-*.parquet")
    gcsRef.DestinationFormat = bigquery.Parquet
    gcsRef.Compression = bigquery.Snappy

    extractor := table.ExtractorTo(gcsRef)
    job, err := extractor.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }

    status, err := job.Wait(ctx)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Parquet export completed")
}

func copyTable(ctx context.Context, dataset *bigquery.Dataset) {
    srcTable := dataset.Table("events")
    dstTable := dataset.Table("events_backup")

    copier := dstTable.CopierFrom(srcTable)
    copier.WriteDisposition = bigquery.WriteTruncate
    copier.Labels = map[string]string{
        "backup": "daily",
    }

    job, err := copier.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }

    status, err := job.Wait(ctx)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Table copy completed")
}

func createSnapshot(ctx context.Context, dataset *bigquery.Dataset) {
    srcTable := dataset.Table("events")
    snapshotTable := dataset.Table("events_snapshot_20240120")

    copier := snapshotTable.CopierFrom(srcTable)
    copier.OperationType = bigquery.SnapshotOperationType

    job, err := copier.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }

    status, err := job.Wait(ctx)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Snapshot created")
}