or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/bigquery@v1.72.0

docs

advanced-features.mdclient-setup.mddata-export.mddata-loading.mddatasets.mdindex.mdjobs.mdqueries.mdstorage-read.mdstorage-write.mdtables.md
tile.json

tessl/golang-cloud-google-com--go--bigquery

tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0

Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs

data-loading.mddocs/

Data Loading

This document covers loading data into BigQuery tables from various sources including Google Cloud Storage, local files, and streaming inserts.

Overview

BigQuery supports multiple ways to load data:

  • Batch loading from Google Cloud Storage or local files
  • Streaming inserts for real-time data ingestion
  • Loading from other BigQuery tables (see data-export.md for copying)

Batch Loading from Google Cloud Storage

GCSReference Type

type GCSReference struct {
    URIs              []string
    FileConfig        FileConfig
    DestinationFormat DataFormat
    Compression       Compression
}
func NewGCSReference(uri ...string) *GCSReference

Create a reference to GCS files:

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.csv")

Multiple files or wildcards:

gcsRef := bigquery.NewGCSReference(
    "gs://my-bucket/data/file1.csv",
    "gs://my-bucket/data/file2.csv",
)

// Or with wildcards
gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.csv")

Loader Type

type Loader struct {
    JobIDConfig
    LoadConfig
}
func (t *Table) LoaderFrom(src LoadSource) *Loader
type LoadSource interface {
    // Has unexported methods
}

Implementations of LoadSource:

  • *GCSReference - Load from Google Cloud Storage
  • *ReaderSource - Load from io.Reader

Basic GCS Load

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/users.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.SkipLeadingRows = 1

loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)
if err != nil {
    return err
}

status, err := job.Wait(ctx)
if err != nil {
    return err
}
if err := status.Err(); err != nil {
    return err
}

Load Configuration

LoadConfig Type

type LoadConfig struct {
    Src                         LoadSource
    Dst                         *Table
    CreateDisposition           TableCreateDisposition
    WriteDisposition            TableWriteDisposition
    Labels                      map[string]string
    TimePartitioning            *TimePartitioning
    RangePartitioning           *RangePartitioning
    Clustering                  *Clustering
    DestinationEncryptionConfig *EncryptionConfig
    SchemaUpdateOptions         []string
    UseAvroLogicalTypes         bool
    ProjectionFields            []string
    HivePartitioningOptions     *HivePartitioningOptions
    DecimalTargetTypes          []DecimalTargetType
    JobTimeout                  time.Duration
    ReferenceFileSchemaURI      string
    CreateSession               bool
    ConnectionProperties        []*ConnectionProperty
    ColumnNameCharacterMap      ColumnNameCharacterMap
    Reservation                 string
    MaxSlots                    int32
}

Configuring a Loader

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.json")
gcsRef.SourceFormat = bigquery.JSON

loader := table.LoaderFrom(gcsRef)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteTruncate
loader.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"}
loader.Labels = map[string]string{
    "load-type": "daily-import",
}

job, err := loader.Run(ctx)

Data Formats

DataFormat Type

type DataFormat string

const (
    CSV             DataFormat = "CSV"
    Avro            DataFormat = "AVRO"
    JSON            DataFormat = "NEWLINE_DELIMITED_JSON"
    DatastoreBackup DataFormat = "DATASTORE_BACKUP"
    GoogleSheets    DataFormat = "GOOGLE_SHEETS"
    Bigtable        DataFormat = "BIGTABLE"
    Parquet         DataFormat = "PARQUET"
    ORC             DataFormat = "ORC"
    Iceberg         DataFormat = "ICEBERG"
)

CSV Loading

CSVOptions Type

type CSVOptions struct {
    AllowJaggedRows                 bool
    AllowQuotedNewlines             bool
    Encoding                        Encoding
    FieldDelimiter                  string
    Quote                           string
    ForceZeroQuote                  bool
    SkipLeadingRows                 int64
    NullMarker                      string
    NullMarkers                     []string
    PreserveASCIIControlCharacters  bool
    SourceColumnMatch               SourceColumnMatch
}

Loading CSV Files

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.AllowJaggedRows = true
gcsRef.AllowQuotedNewlines = true
gcsRef.FieldDelimiter = ","
gcsRef.SkipLeadingRows = 1
gcsRef.Quote = "\""
gcsRef.NullMarker = "\\N"

loader := table.LoaderFrom(gcsRef)
loader.Schema = schema

job, err := loader.Run(ctx)

CSV with Auto-detection

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.SkipLeadingRows = 1

loader := table.LoaderFrom(gcsRef)
loader.AutoDetect = true // Auto-detect schema

job, err := loader.Run(ctx)

Encoding

type Encoding string

const (
    UTF_8      Encoding = "UTF-8"
    ISO_8859_1 Encoding = "ISO-8859-1"
)
gcsRef.Encoding = bigquery.UTF_8

JSON Loading

Loading JSON Files

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.json")
gcsRef.SourceFormat = bigquery.JSON

loader := table.LoaderFrom(gcsRef)
loader.Schema = schema
loader.IgnoreUnknownValues = true
loader.MaxBadRecords = 100

job, err := loader.Run(ctx)

Avro Loading

AvroOptions Type

type AvroOptions struct {
    UseAvroLogicalTypes bool
}

Loading Avro Files

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.avro")
gcsRef.SourceFormat = bigquery.Avro

loader := table.LoaderFrom(gcsRef)
loader.UseAvroLogicalTypes = true

job, err := loader.Run(ctx)

Parquet Loading

ParquetOptions Type

type ParquetOptions struct {
    EnumAsString        bool
    EnableListInference bool
    MapTargetType       ParquetMapTargetType
}
type ParquetMapTargetType string

const (
    ParquetArrayMapTargetType  ParquetMapTargetType = "ARRAY_OF_STRUCT"
)

Loading Parquet Files

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.parquet")
gcsRef.SourceFormat = bigquery.Parquet

loader := table.LoaderFrom(gcsRef)
loader.ParquetOptions = &bigquery.ParquetOptions{
    EnumAsString:        true,
    EnableListInference: true,
}

job, err := loader.Run(ctx)

ORC Loading

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.orc")
gcsRef.SourceFormat = bigquery.ORC

loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)

Compression

Compression Type

type Compression string

const (
    None    Compression = "NONE"
    Gzip    Compression = "GZIP"
    Deflate Compression = "DEFLATE"
    Snappy  Compression = "SNAPPY"
)

Loading Compressed Files

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.csv.gz")
gcsRef.Compression = bigquery.Gzip
gcsRef.SourceFormat = bigquery.CSV

loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)

Loading from io.Reader

ReaderSource Type

type ReaderSource struct {
    FileConfig
}
func NewReaderSource(r io.Reader) *ReaderSource

Loading from Local File

import "os"

file, err := os.Open("data.json")
if err != nil {
    return err
}
defer file.Close()

rs := bigquery.NewReaderSource(file)
rs.SourceFormat = bigquery.JSON

loader := table.LoaderFrom(rs)
loader.Schema = schema

job, err := loader.Run(ctx)
if err != nil {
    return err
}

status, err := job.Wait(ctx)

Loading from Bytes

import "bytes"

data := []byte(`{"name":"Alice","age":30}
{"name":"Bob","age":25}`)

rs := bigquery.NewReaderSource(bytes.NewReader(data))
rs.SourceFormat = bigquery.JSON

loader := table.LoaderFrom(rs)
loader.AutoDetect = true

job, err := loader.Run(ctx)

Streaming Inserts

Inserter Type

type Inserter struct {
    SkipInvalidRows     bool
    IgnoreUnknownValues bool
    TableTemplateSuffix string
}
func (t *Table) Inserter() *Inserter
func (u *Inserter) Put(ctx context.Context, src interface{}) error

Note: The deprecated Uploader() method is an alias for Inserter(). Use Inserter() in new code:

// Deprecated
uploader := table.Uploader()

// Preferred
inserter := table.Inserter()

Creating an Inserter

inserter := table.Inserter()
inserter.SkipInvalidRows = false
inserter.IgnoreUnknownValues = false

Streaming with ValueSaver

type ValueSaver interface {
    Save() (row map[string]Value, insertID string, error)
}

Implement ValueSaver:

type User struct {
    ID    int
    Name  string
    Email string
}

func (u *User) Save() (map[string]bigquery.Value, string, error) {
    return map[string]bigquery.Value{
        "id":    u.ID,
        "name":  u.Name,
        "email": u.Email,
    }, fmt.Sprintf("user-%d", u.ID), nil
}

inserter := table.Inserter()
users := []*User{
    {ID: 1, Name: "Alice", Email: "alice@example.com"},
    {ID: 2, Name: "Bob", Email: "bob@example.com"},
}

if err := inserter.Put(ctx, users); err != nil {
    return err
}

Streaming with StructSaver

type StructSaver struct {
    Schema   Schema
    InsertID string
    Struct   interface{}
}
type Product struct {
    ID    int
    Name  string
    Price float64
}

schema, err := bigquery.InferSchema(Product{})
if err != nil {
    return err
}

inserter := table.Inserter()
savers := []*bigquery.StructSaver{
    {
        Schema:   schema,
        InsertID: "product-1",
        Struct:   Product{ID: 1, Name: "Widget", Price: 9.99},
    },
    {
        Schema:   schema,
        InsertID: "product-2",
        Struct:   Product{ID: 2, Name: "Gadget", Price: 19.99},
    },
}

if err := inserter.Put(ctx, savers); err != nil {
    return err
}

Streaming with Struct Inference

type Event struct {
    Timestamp time.Time
    UserID    int
    Action    string
}

inserter := table.Inserter()
events := []*Event{
    {Timestamp: time.Now(), UserID: 1, Action: "login"},
    {Timestamp: time.Now(), UserID: 2, Action: "purchase"},
}

// Schema is inferred automatically
if err := inserter.Put(ctx, events); err != nil {
    return err
}

Insert IDs and Deduplication

Use insert IDs for best-effort deduplication:

func (u *User) Save() (map[string]bigquery.Value, string, error) {
    row := map[string]bigquery.Value{
        "id":   u.ID,
        "name": u.Name,
    }
    insertID := fmt.Sprintf("user-%d-%d", u.ID, time.Now().Unix())
    return row, insertID, nil
}

Opt out of deduplication for higher throughput:

const NoDedupeID = "NoDedupeID"
func (u *User) Save() (map[string]bigquery.Value, string, error) {
    row := map[string]bigquery.Value{
        "id":   u.ID,
        "name": u.Name,
    }
    return row, bigquery.NoDedupeID, nil
}

Handling Insert Errors

type PutMultiError []RowInsertionError
type RowInsertionError struct {
    InsertID string
    RowIndex int
    Errors   []error
}
err := inserter.Put(ctx, rows)
if err != nil {
    if multiErr, ok := err.(bigquery.PutMultiError); ok {
        for _, rowErr := range multiErr {
            fmt.Printf("Row %d failed:\n", rowErr.RowIndex)
            for _, err := range rowErr.Errors {
                fmt.Printf("  %v\n", err)
            }
        }
    }
    return err
}

Schema Update Options

loader := table.LoaderFrom(gcsRef)
loader.SchemaUpdateOptions = []string{
    "ALLOW_FIELD_ADDITION",
    "ALLOW_FIELD_RELAXATION",
}

Available options:

  • "ALLOW_FIELD_ADDITION" - Allow adding new fields
  • "ALLOW_FIELD_RELAXATION" - Allow relaxing REQUIRED to NULLABLE

Partitioning and Clustering on Load

Creating Partitioned Table on Load

gcsRef := bigquery.NewGCSReference("gs://my-bucket/data.csv")
gcsRef.SourceFormat = bigquery.CSV

loader := table.LoaderFrom(gcsRef)
loader.TimePartitioning = &bigquery.TimePartitioning{
    Type:  bigquery.DayPartitioningType,
    Field: "timestamp",
}
loader.Clustering = &bigquery.Clustering{
    Fields: []string{"user_id", "country"},
}
loader.CreateDisposition = bigquery.CreateIfNeeded

job, err := loader.Run(ctx)

Complete Loading Example

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "cloud.google.com/go/bigquery"
)

type LogEntry struct {
    Timestamp time.Time `bigquery:"timestamp"`
    UserID    int       `bigquery:"user_id"`
    Action    string    `bigquery:"action"`
    Details   string    `bigquery:"details"`
}

func (e *LogEntry) Save() (map[string]bigquery.Value, string, error) {
    return map[string]bigquery.Value{
        "timestamp": e.Timestamp,
        "user_id":   e.UserID,
        "action":    e.Action,
        "details":   e.Details,
    }, fmt.Sprintf("log-%d-%d", e.UserID, e.Timestamp.Unix()), nil
}

func main() {
    ctx := context.Background()
    client, err := bigquery.NewClient(ctx, "my-project")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    dataset := client.Dataset("logs")
    table := dataset.Table("events")

    // Batch load from GCS
    batchLoad(ctx, table)

    // Streaming inserts
    streamingInsert(ctx, table)
}

func batchLoad(ctx context.Context, table *bigquery.Table) {
    gcsRef := bigquery.NewGCSReference("gs://my-bucket/logs/*.json")
    gcsRef.SourceFormat = bigquery.JSON
    gcsRef.Compression = bigquery.Gzip

    loader := table.LoaderFrom(gcsRef)
    loader.AutoDetect = true
    loader.WriteDisposition = bigquery.WriteAppend
    loader.MaxBadRecords = 1000
    loader.TimePartitioning = &bigquery.TimePartitioning{
        Type:  bigquery.DayPartitioningType,
        Field: "timestamp",
    }
    loader.Labels = map[string]string{
        "source": "gcs",
        "type":   "batch",
    }

    job, err := loader.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }

    status, err := job.Wait(ctx)
    if err != nil {
        log.Fatal(err)
    }

    if err := status.Err(); err != nil {
        log.Fatal(err)
    }

    fmt.Println("Batch load completed successfully")
}

func streamingInsert(ctx context.Context, table *bigquery.Table) {
    inserter := table.Inserter()
    inserter.SkipInvalidRows = false

    logs := []*LogEntry{
        {
            Timestamp: time.Now(),
            UserID:    1,
            Action:    "login",
            Details:   "User logged in from web",
        },
        {
            Timestamp: time.Now(),
            UserID:    2,
            Action:    "purchase",
            Details:   "Purchased product #123",
        },
    }

    if err := inserter.Put(ctx, logs); err != nil {
        if multiErr, ok := err.(bigquery.PutMultiError); ok {
            for _, rowErr := range multiErr {
                log.Printf("Row %d failed: %v", rowErr.RowIndex, rowErr.Errors)
            }
        }
        log.Fatal(err)
    }

    fmt.Println("Streaming insert completed successfully")
}