tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs
This document covers loading data into BigQuery tables from various sources including Google Cloud Storage, local files, and streaming inserts.
BigQuery supports multiple ways to load data:
type GCSReference struct {
URIs []string
FileConfig FileConfig
DestinationFormat DataFormat
Compression Compression
}func NewGCSReference(uri ...string) *GCSReferenceCreate a reference to GCS files:
gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.csv")Multiple files or wildcards:
gcsRef := bigquery.NewGCSReference(
"gs://my-bucket/data/file1.csv",
"gs://my-bucket/data/file2.csv",
)
// Or with wildcards
gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.csv")type Loader struct {
JobIDConfig
LoadConfig
}func (t *Table) LoaderFrom(src LoadSource) *Loadertype LoadSource interface {
// Has unexported methods
}Implementations of LoadSource:
*GCSReference - Load from Google Cloud Storage*ReaderSource - Load from io.ReadergcsRef := bigquery.NewGCSReference("gs://my-bucket/data/users.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.SkipLeadingRows = 1
loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
return err
}type LoadConfig struct {
Src LoadSource
Dst *Table
CreateDisposition TableCreateDisposition
WriteDisposition TableWriteDisposition
Labels map[string]string
TimePartitioning *TimePartitioning
RangePartitioning *RangePartitioning
Clustering *Clustering
DestinationEncryptionConfig *EncryptionConfig
SchemaUpdateOptions []string
UseAvroLogicalTypes bool
ProjectionFields []string
HivePartitioningOptions *HivePartitioningOptions
DecimalTargetTypes []DecimalTargetType
JobTimeout time.Duration
ReferenceFileSchemaURI string
CreateSession bool
ConnectionProperties []*ConnectionProperty
ColumnNameCharacterMap ColumnNameCharacterMap
Reservation string
MaxSlots int32
}gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.json")
gcsRef.SourceFormat = bigquery.JSON
loader := table.LoaderFrom(gcsRef)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteTruncate
loader.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"}
loader.Labels = map[string]string{
"load-type": "daily-import",
}
job, err := loader.Run(ctx)type DataFormat string
const (
CSV DataFormat = "CSV"
Avro DataFormat = "AVRO"
JSON DataFormat = "NEWLINE_DELIMITED_JSON"
DatastoreBackup DataFormat = "DATASTORE_BACKUP"
GoogleSheets DataFormat = "GOOGLE_SHEETS"
Bigtable DataFormat = "BIGTABLE"
Parquet DataFormat = "PARQUET"
ORC DataFormat = "ORC"
Iceberg DataFormat = "ICEBERG"
)type CSVOptions struct {
AllowJaggedRows bool
AllowQuotedNewlines bool
Encoding Encoding
FieldDelimiter string
Quote string
ForceZeroQuote bool
SkipLeadingRows int64
NullMarker string
NullMarkers []string
PreserveASCIIControlCharacters bool
SourceColumnMatch SourceColumnMatch
}gcsRef := bigquery.NewGCSReference("gs://my-bucket/data.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.AllowJaggedRows = true
gcsRef.AllowQuotedNewlines = true
gcsRef.FieldDelimiter = ","
gcsRef.SkipLeadingRows = 1
gcsRef.Quote = "\""
gcsRef.NullMarker = "\\N"
loader := table.LoaderFrom(gcsRef)
loader.Schema = schema
job, err := loader.Run(ctx)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.SkipLeadingRows = 1
loader := table.LoaderFrom(gcsRef)
loader.AutoDetect = true // Auto-detect schema
job, err := loader.Run(ctx)type Encoding string
const (
UTF_8 Encoding = "UTF-8"
ISO_8859_1 Encoding = "ISO-8859-1"
)gcsRef.Encoding = bigquery.UTF_8gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.json")
gcsRef.SourceFormat = bigquery.JSON
loader := table.LoaderFrom(gcsRef)
loader.Schema = schema
loader.IgnoreUnknownValues = true
loader.MaxBadRecords = 100
job, err := loader.Run(ctx)type AvroOptions struct {
UseAvroLogicalTypes bool
}gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.avro")
gcsRef.SourceFormat = bigquery.Avro
loader := table.LoaderFrom(gcsRef)
loader.UseAvroLogicalTypes = true
job, err := loader.Run(ctx)type ParquetOptions struct {
EnumAsString bool
EnableListInference bool
MapTargetType ParquetMapTargetType
}type ParquetMapTargetType string
const (
ParquetArrayMapTargetType ParquetMapTargetType = "ARRAY_OF_STRUCT"
)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.parquet")
gcsRef.SourceFormat = bigquery.Parquet
loader := table.LoaderFrom(gcsRef)
loader.ParquetOptions = &bigquery.ParquetOptions{
EnumAsString: true,
EnableListInference: true,
}
job, err := loader.Run(ctx)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.orc")
gcsRef.SourceFormat = bigquery.ORC
loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)type Compression string
const (
None Compression = "NONE"
Gzip Compression = "GZIP"
Deflate Compression = "DEFLATE"
Snappy Compression = "SNAPPY"
)gcsRef := bigquery.NewGCSReference("gs://my-bucket/data/*.csv.gz")
gcsRef.Compression = bigquery.Gzip
gcsRef.SourceFormat = bigquery.CSV
loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)type ReaderSource struct {
FileConfig
}func NewReaderSource(r io.Reader) *ReaderSourceimport "os"
file, err := os.Open("data.json")
if err != nil {
return err
}
defer file.Close()
rs := bigquery.NewReaderSource(file)
rs.SourceFormat = bigquery.JSON
loader := table.LoaderFrom(rs)
loader.Schema = schema
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)import "bytes"
data := []byte(`{"name":"Alice","age":30}
{"name":"Bob","age":25}`)
rs := bigquery.NewReaderSource(bytes.NewReader(data))
rs.SourceFormat = bigquery.JSON
loader := table.LoaderFrom(rs)
loader.AutoDetect = true
job, err := loader.Run(ctx)type Inserter struct {
SkipInvalidRows bool
IgnoreUnknownValues bool
TableTemplateSuffix string
}func (t *Table) Inserter() *Inserterfunc (u *Inserter) Put(ctx context.Context, src interface{}) errorNote: The deprecated Uploader() method is an alias for Inserter(). Use Inserter() in new code:
// Deprecated
uploader := table.Uploader()
// Preferred
inserter := table.Inserter()inserter := table.Inserter()
inserter.SkipInvalidRows = false
inserter.IgnoreUnknownValues = falsetype ValueSaver interface {
Save() (row map[string]Value, insertID string, error)
}Implement ValueSaver:
type User struct {
ID int
Name string
Email string
}
func (u *User) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"id": u.ID,
"name": u.Name,
"email": u.Email,
}, fmt.Sprintf("user-%d", u.ID), nil
}
inserter := table.Inserter()
users := []*User{
{ID: 1, Name: "Alice", Email: "alice@example.com"},
{ID: 2, Name: "Bob", Email: "bob@example.com"},
}
if err := inserter.Put(ctx, users); err != nil {
return err
}type StructSaver struct {
Schema Schema
InsertID string
Struct interface{}
}type Product struct {
ID int
Name string
Price float64
}
schema, err := bigquery.InferSchema(Product{})
if err != nil {
return err
}
inserter := table.Inserter()
savers := []*bigquery.StructSaver{
{
Schema: schema,
InsertID: "product-1",
Struct: Product{ID: 1, Name: "Widget", Price: 9.99},
},
{
Schema: schema,
InsertID: "product-2",
Struct: Product{ID: 2, Name: "Gadget", Price: 19.99},
},
}
if err := inserter.Put(ctx, savers); err != nil {
return err
}type Event struct {
Timestamp time.Time
UserID int
Action string
}
inserter := table.Inserter()
events := []*Event{
{Timestamp: time.Now(), UserID: 1, Action: "login"},
{Timestamp: time.Now(), UserID: 2, Action: "purchase"},
}
// Schema is inferred automatically
if err := inserter.Put(ctx, events); err != nil {
return err
}Use insert IDs for best-effort deduplication:
func (u *User) Save() (map[string]bigquery.Value, string, error) {
row := map[string]bigquery.Value{
"id": u.ID,
"name": u.Name,
}
insertID := fmt.Sprintf("user-%d-%d", u.ID, time.Now().Unix())
return row, insertID, nil
}Opt out of deduplication for higher throughput:
const NoDedupeID = "NoDedupeID"func (u *User) Save() (map[string]bigquery.Value, string, error) {
row := map[string]bigquery.Value{
"id": u.ID,
"name": u.Name,
}
return row, bigquery.NoDedupeID, nil
}type PutMultiError []RowInsertionErrortype RowInsertionError struct {
InsertID string
RowIndex int
Errors []error
}err := inserter.Put(ctx, rows)
if err != nil {
if multiErr, ok := err.(bigquery.PutMultiError); ok {
for _, rowErr := range multiErr {
fmt.Printf("Row %d failed:\n", rowErr.RowIndex)
for _, err := range rowErr.Errors {
fmt.Printf(" %v\n", err)
}
}
}
return err
}loader := table.LoaderFrom(gcsRef)
loader.SchemaUpdateOptions = []string{
"ALLOW_FIELD_ADDITION",
"ALLOW_FIELD_RELAXATION",
}Available options:
"ALLOW_FIELD_ADDITION" - Allow adding new fields"ALLOW_FIELD_RELAXATION" - Allow relaxing REQUIRED to NULLABLEgcsRef := bigquery.NewGCSReference("gs://my-bucket/data.csv")
gcsRef.SourceFormat = bigquery.CSV
loader := table.LoaderFrom(gcsRef)
loader.TimePartitioning = &bigquery.TimePartitioning{
Type: bigquery.DayPartitioningType,
Field: "timestamp",
}
loader.Clustering = &bigquery.Clustering{
Fields: []string{"user_id", "country"},
}
loader.CreateDisposition = bigquery.CreateIfNeeded
job, err := loader.Run(ctx)package main
import (
"context"
"fmt"
"log"
"time"
"cloud.google.com/go/bigquery"
)
type LogEntry struct {
Timestamp time.Time `bigquery:"timestamp"`
UserID int `bigquery:"user_id"`
Action string `bigquery:"action"`
Details string `bigquery:"details"`
}
func (e *LogEntry) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"timestamp": e.Timestamp,
"user_id": e.UserID,
"action": e.Action,
"details": e.Details,
}, fmt.Sprintf("log-%d-%d", e.UserID, e.Timestamp.Unix()), nil
}
func main() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "my-project")
if err != nil {
log.Fatal(err)
}
defer client.Close()
dataset := client.Dataset("logs")
table := dataset.Table("events")
// Batch load from GCS
batchLoad(ctx, table)
// Streaming inserts
streamingInsert(ctx, table)
}
func batchLoad(ctx context.Context, table *bigquery.Table) {
gcsRef := bigquery.NewGCSReference("gs://my-bucket/logs/*.json")
gcsRef.SourceFormat = bigquery.JSON
gcsRef.Compression = bigquery.Gzip
loader := table.LoaderFrom(gcsRef)
loader.AutoDetect = true
loader.WriteDisposition = bigquery.WriteAppend
loader.MaxBadRecords = 1000
loader.TimePartitioning = &bigquery.TimePartitioning{
Type: bigquery.DayPartitioningType,
Field: "timestamp",
}
loader.Labels = map[string]string{
"source": "gcs",
"type": "batch",
}
job, err := loader.Run(ctx)
if err != nil {
log.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
log.Fatal(err)
}
if err := status.Err(); err != nil {
log.Fatal(err)
}
fmt.Println("Batch load completed successfully")
}
func streamingInsert(ctx context.Context, table *bigquery.Table) {
inserter := table.Inserter()
inserter.SkipInvalidRows = false
logs := []*LogEntry{
{
Timestamp: time.Now(),
UserID: 1,
Action: "login",
Details: "User logged in from web",
},
{
Timestamp: time.Now(),
UserID: 2,
Action: "purchase",
Details: "Purchased product #123",
},
}
if err := inserter.Put(ctx, logs); err != nil {
if multiErr, ok := err.(bigquery.PutMultiError); ok {
for _, rowErr := range multiErr {
log.Printf("Row %d failed: %v", rowErr.RowIndex, rowErr.Errors)
}
}
log.Fatal(err)
}
fmt.Println("Streaming insert completed successfully")
}