or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/bigquery@v1.72.0

docs

advanced-features.mdclient-setup.mddata-export.mddata-loading.mddatasets.mdindex.mdjobs.mdqueries.mdstorage-read.mdstorage-write.mdtables.md
tile.json

tessl/golang-cloud-google-com--go--bigquery

tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0

Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs

storage-write.mddocs/

BigQuery Storage Write API (Managed Writer)

This document covers using the BigQuery Storage Write API's Managed Writer for high-throughput streaming writes with exactly-once semantics.

Overview

The Managed Writer provides:

  • High-throughput streaming writes (higher than streaming inserts)
  • Exactly-once delivery semantics
  • Automatic schema updates
  • Lower cost per GB than streaming inserts
  • Better performance for large-scale data ingestion

Package Import

import "cloud.google.com/go/bigquery/storage/managedwriter"
import "cloud.google.com/go/bigquery/storage/managedwriter/adapt"

Client Creation

func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error)
type Client struct{}
func (c *Client) Close() error

Create a managed writer client:

client, err := managedwriter.NewClient(ctx, projectID)
if err != nil {
    return err
}
defer client.Close()

Creating Managed Streams

func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)
type ManagedStream struct{}
func (ms *ManagedStream) Close() error

Default Stream

// Write to default stream (automatically committed)
stream, err := client.NewManagedStream(ctx,
    managedwriter.WithDestinationTable("projects/my-project/datasets/my_dataset/tables/my_table"),
)
if err != nil {
    return err
}
defer stream.Close()

Pending Stream

// Write to pending stream (requires manual commit)
stream, err := client.NewManagedStream(ctx,
    managedwriter.WithDestinationTable("projects/my-project/datasets/my_dataset/tables/my_table"),
    managedwriter.WithType(managedwriter.PendingStream),
)

Committed Stream

// Write to committed stream (cannot be modified after creation)
stream, err := client.NewManagedStream(ctx,
    managedwriter.WithDestinationTable("projects/my-project/datasets/my_dataset/tables/my_table"),
    managedwriter.WithType(managedwriter.CommittedStream),
)

Writer Options

type WriterOption func(*ManagedStream)
func WithDestinationTable(table string) WriterOption
func WithType(t StreamType) WriterOption
func WithSchemaDescriptor(descriptor *descriptorpb.DescriptorProto) WriterOption
func WithDataOrigin(origin string) WriterOption
func WithTraceID(traceID string) WriterOption
func WithMaxInflightRequests(n int) WriterOption
func WithMaxInflightBytes(n int) WriterOption
func WithAppendRowsCallOption(opts ...gax.CallOption) WriterOption

Stream Types

type StreamType string

const (
    DefaultStream   StreamType = "DEFAULT"
    PendingStream   StreamType = "PENDING"
    CommittedStream StreamType = "COMMITTED"
)

Writing Data

AppendRows

func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error)
type AppendResult struct{}
func (ar *AppendResult) GetResult(ctx context.Context) (*storagepb.AppendRowsResponse, error)
func (ar *AppendResult) Ready() <-chan struct{}
func (ar *AppendResult) Offset() int64

Using Protocol Buffers

import (
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/descriptorpb"
)

// Define your data schema as proto
message LogEntry {
    string timestamp = 1;
    int64 user_id = 2;
    string action = 3;
}

// Create descriptor
descriptor, err := adapt.NormalizeDescriptor(logEntry.ProtoReflect().Descriptor())

stream, err := client.NewManagedStream(ctx,
    managedwriter.WithDestinationTable(tableRef),
    managedwriter.WithSchemaDescriptor(descriptor),
)

// Create proto message
entry := &LogEntry{
    Timestamp: "2024-01-20T10:00:00Z",
    UserId:    12345,
    Action:    "login",
}

// Marshal to bytes
data, err := proto.Marshal(entry)

// Append
result, err := stream.AppendRows(ctx, [][]byte{data})
if err != nil {
    return err
}

// Wait for result
_, err = result.GetResult(ctx)

Schema Adaptation

Convert BigQuery Schema to Proto Descriptor

func StorageSchemaToProto2Descriptor(schema *storagepb.TableSchema, scope string) (*descriptorpb.DescriptorProto, error)
func BQSchemaToStorageTableSchema(schema bigquery.Schema) (*storagepb.TableSchema, error)
import "cloud.google.com/go/bigquery/storage/managedwriter/adapt"

// Convert BigQuery schema to storage schema
bqSchema := bigquery.Schema{
    {Name: "timestamp", Type: bigquery.TimestampFieldType},
    {Name: "user_id", Type: bigquery.IntegerFieldType},
    {Name: "action", Type: bigquery.StringFieldType},
}

storageSchema, err := adapt.BQSchemaToStorageTableSchema(bqSchema)
if err != nil {
    return err
}

descriptor, err := adapt.StorageSchemaToProto2Descriptor(storageSchema, "root")
if err != nil {
    return err
}

Append Options

type AppendOption func(*pendingWrite)
func WithOffset(offset int64) AppendOption

Set explicit offset for exactly-once semantics:

result, err := stream.AppendRows(ctx, data,
    managedwriter.WithOffset(currentOffset),
)

Finalize and Commit

Finalize Stream

func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error)

Finalize a stream (no more appends allowed):

rowCount, err := stream.Finalize(ctx)
if err != nil {
    return err
}
fmt.Printf("Finalized stream with %d rows\n", rowCount)

Batch Commit

For pending streams, commit the writes:

func (c *Client) BatchCommitWriteStreams(ctx context.Context, parent string, streamNames []string) error
streamName := stream.StreamName()
err = client.BatchCommitWriteStreams(ctx,
    "projects/my-project/datasets/my_dataset/tables/my_table",
    []string{streamName},
)

Stream Information

func (ms *ManagedStream) StreamName() string
func (ms *ManagedStream) StreamType() StreamType
func (ms *ManagedStream) TableName() string
fmt.Printf("Stream: %s\n", stream.StreamName())
fmt.Printf("Type: %s\n", stream.StreamType())
fmt.Printf("Table: %s\n", stream.TableName())

Error Handling

Append Errors

result, err := stream.AppendRows(ctx, data)
if err != nil {
    // Network or client error
    return err
}

// Wait for server response
resp, err := result.GetResult(ctx)
if err != nil {
    // Server rejected the append
    return err
}

if resp.GetError() != nil {
    // Row-level errors
    fmt.Printf("Append error: %v\n", resp.GetError())
}

Stream Errors

Check for stream-level errors:

resp, err := result.GetResult(ctx)
if err != nil {
    return err
}

if rowErrors := resp.GetRowErrors(); len(rowErrors) > 0 {
    for _, rowErr := range rowErrors {
        fmt.Printf("Row %d error: %v\n", rowErr.Index, rowErr.Message)
    }
}

Flow Control

Limit in-flight requests and bytes:

stream, err := client.NewManagedStream(ctx,
    managedwriter.WithDestinationTable(tableRef),
    managedwriter.WithMaxInflightRequests(10),
    managedwriter.WithMaxInflightBytes(10 * 1024 * 1024), // 10 MB
)

Complete Managed Writer Example

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "cloud.google.com/go/bigquery"
    "cloud.google.com/go/bigquery/storage/managedwriter"
    "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/reflect/protodesc"
    "google.golang.org/protobuf/reflect/protoreflect"
    "google.golang.org/protobuf/types/descriptorpb"
    "google.golang.org/protobuf/types/dynamicpb"
)

func main() {
    ctx := context.Background()
    projectID := "my-project"

    // Create managed writer client
    client, err := managedwriter.NewClient(ctx, projectID)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Define table reference
    tableRef := "projects/my-project/datasets/analytics/tables/events"

    // Define schema
    bqSchema := bigquery.Schema{
        {Name: "event_timestamp", Type: bigquery.TimestampFieldType},
        {Name: "user_id", Type: bigquery.IntegerFieldType},
        {Name: "event_type", Type: bigquery.StringFieldType},
        {Name: "metadata", Type: bigquery.JSONFieldType},
    }

    // Convert to proto descriptor
    storageSchema, err := adapt.BQSchemaToStorageTableSchema(bqSchema)
    if err != nil {
        log.Fatal(err)
    }

    descriptor, err := adapt.StorageSchemaToProto2Descriptor(storageSchema, "root")
    if err != nil {
        log.Fatal(err)
    }

    // Create managed stream
    stream, err := client.NewManagedStream(ctx,
        managedwriter.WithDestinationTable(tableRef),
        managedwriter.WithType(managedwriter.DefaultStream),
        managedwriter.WithSchemaDescriptor(descriptor),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer stream.Close()

    // Create message descriptor for dynamic messages
    msgDesc, err := protodesc.NewMessage(descriptor, nil)
    if err != nil {
        log.Fatal(err)
    }

    // Write data
    for i := 0; i < 1000; i++ {
        // Create dynamic message
        msg := dynamicpb.NewMessage(msgDesc)
        msg.Set(
            msgDesc.Fields().ByName("event_timestamp"),
            protoreflect.ValueOfString(time.Now().Format(time.RFC3339)),
        )
        msg.Set(
            msgDesc.Fields().ByName("user_id"),
            protoreflect.ValueOfInt64(int64(i)),
        )
        msg.Set(
            msgDesc.Fields().ByName("event_type"),
            protoreflect.ValueOfString("page_view"),
        )
        msg.Set(
            msgDesc.Fields().ByName("metadata"),
            protoreflect.ValueOfString(`{"page": "/home"}`),
        )

        // Marshal message
        data, err := proto.Marshal(msg)
        if err != nil {
            log.Fatal(err)
        }

        // Append to stream
        result, err := stream.AppendRows(ctx, [][]byte{data})
        if err != nil {
            log.Fatal(err)
        }

        // Wait for result (optional, can batch these)
        _, err = result.GetResult(ctx)
        if err != nil {
            log.Printf("Append error: %v\n", err)
            continue
        }

        if (i+1)%100 == 0 {
            fmt.Printf("Written %d rows\n", i+1)
        }
    }

    fmt.Println("All rows written successfully")
}

Comparison: Streaming Inserts vs Managed Writer

FeatureStreaming InsertsManaged Writer
ThroughputLowerHigher
CostHigher per GBLower per GB
LatencySecondsSeconds
DeduplicationBest-effortExactly-once
Schema updatesManualAutomatic
ComplexityLowerHigher
Best forSimple use casesHigh-volume production