tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs
This document covers using the BigQuery Storage Write API's Managed Writer for high-throughput streaming writes with exactly-once semantics.
The Managed Writer provides:
import "cloud.google.com/go/bigquery/storage/managedwriter"
import "cloud.google.com/go/bigquery/storage/managedwriter/adapt"func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error)type Client struct{}func (c *Client) Close() errorCreate a managed writer client:
client, err := managedwriter.NewClient(ctx, projectID)
if err != nil {
return err
}
defer client.Close()func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error)type ManagedStream struct{}func (ms *ManagedStream) Close() error// Write to default stream (automatically committed)
stream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable("projects/my-project/datasets/my_dataset/tables/my_table"),
)
if err != nil {
return err
}
defer stream.Close()// Write to pending stream (requires manual commit)
stream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable("projects/my-project/datasets/my_dataset/tables/my_table"),
managedwriter.WithType(managedwriter.PendingStream),
)// Write to committed stream (cannot be modified after creation)
stream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable("projects/my-project/datasets/my_dataset/tables/my_table"),
managedwriter.WithType(managedwriter.CommittedStream),
)type WriterOption func(*ManagedStream)func WithDestinationTable(table string) WriterOption
func WithType(t StreamType) WriterOption
func WithSchemaDescriptor(descriptor *descriptorpb.DescriptorProto) WriterOption
func WithDataOrigin(origin string) WriterOption
func WithTraceID(traceID string) WriterOption
func WithMaxInflightRequests(n int) WriterOption
func WithMaxInflightBytes(n int) WriterOption
func WithAppendRowsCallOption(opts ...gax.CallOption) WriterOptiontype StreamType string
const (
DefaultStream StreamType = "DEFAULT"
PendingStream StreamType = "PENDING"
CommittedStream StreamType = "COMMITTED"
)func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error)type AppendResult struct{}func (ar *AppendResult) GetResult(ctx context.Context) (*storagepb.AppendRowsResponse, error)
func (ar *AppendResult) Ready() <-chan struct{}
func (ar *AppendResult) Offset() int64import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
)
// Define your data schema as proto
message LogEntry {
string timestamp = 1;
int64 user_id = 2;
string action = 3;
}
// Create descriptor
descriptor, err := adapt.NormalizeDescriptor(logEntry.ProtoReflect().Descriptor())
stream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable(tableRef),
managedwriter.WithSchemaDescriptor(descriptor),
)
// Create proto message
entry := &LogEntry{
Timestamp: "2024-01-20T10:00:00Z",
UserId: 12345,
Action: "login",
}
// Marshal to bytes
data, err := proto.Marshal(entry)
// Append
result, err := stream.AppendRows(ctx, [][]byte{data})
if err != nil {
return err
}
// Wait for result
_, err = result.GetResult(ctx)func StorageSchemaToProto2Descriptor(schema *storagepb.TableSchema, scope string) (*descriptorpb.DescriptorProto, error)
func BQSchemaToStorageTableSchema(schema bigquery.Schema) (*storagepb.TableSchema, error)import "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
// Convert BigQuery schema to storage schema
bqSchema := bigquery.Schema{
{Name: "timestamp", Type: bigquery.TimestampFieldType},
{Name: "user_id", Type: bigquery.IntegerFieldType},
{Name: "action", Type: bigquery.StringFieldType},
}
storageSchema, err := adapt.BQSchemaToStorageTableSchema(bqSchema)
if err != nil {
return err
}
descriptor, err := adapt.StorageSchemaToProto2Descriptor(storageSchema, "root")
if err != nil {
return err
}type AppendOption func(*pendingWrite)func WithOffset(offset int64) AppendOptionSet explicit offset for exactly-once semantics:
result, err := stream.AppendRows(ctx, data,
managedwriter.WithOffset(currentOffset),
)func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error)Finalize a stream (no more appends allowed):
rowCount, err := stream.Finalize(ctx)
if err != nil {
return err
}
fmt.Printf("Finalized stream with %d rows\n", rowCount)For pending streams, commit the writes:
func (c *Client) BatchCommitWriteStreams(ctx context.Context, parent string, streamNames []string) errorstreamName := stream.StreamName()
err = client.BatchCommitWriteStreams(ctx,
"projects/my-project/datasets/my_dataset/tables/my_table",
[]string{streamName},
)func (ms *ManagedStream) StreamName() string
func (ms *ManagedStream) StreamType() StreamType
func (ms *ManagedStream) TableName() stringfmt.Printf("Stream: %s\n", stream.StreamName())
fmt.Printf("Type: %s\n", stream.StreamType())
fmt.Printf("Table: %s\n", stream.TableName())result, err := stream.AppendRows(ctx, data)
if err != nil {
// Network or client error
return err
}
// Wait for server response
resp, err := result.GetResult(ctx)
if err != nil {
// Server rejected the append
return err
}
if resp.GetError() != nil {
// Row-level errors
fmt.Printf("Append error: %v\n", resp.GetError())
}Check for stream-level errors:
resp, err := result.GetResult(ctx)
if err != nil {
return err
}
if rowErrors := resp.GetRowErrors(); len(rowErrors) > 0 {
for _, rowErr := range rowErrors {
fmt.Printf("Row %d error: %v\n", rowErr.Index, rowErr.Message)
}
}Limit in-flight requests and bytes:
stream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable(tableRef),
managedwriter.WithMaxInflightRequests(10),
managedwriter.WithMaxInflightBytes(10 * 1024 * 1024), // 10 MB
)package main
import (
"context"
"fmt"
"log"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
func main() {
ctx := context.Background()
projectID := "my-project"
// Create managed writer client
client, err := managedwriter.NewClient(ctx, projectID)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Define table reference
tableRef := "projects/my-project/datasets/analytics/tables/events"
// Define schema
bqSchema := bigquery.Schema{
{Name: "event_timestamp", Type: bigquery.TimestampFieldType},
{Name: "user_id", Type: bigquery.IntegerFieldType},
{Name: "event_type", Type: bigquery.StringFieldType},
{Name: "metadata", Type: bigquery.JSONFieldType},
}
// Convert to proto descriptor
storageSchema, err := adapt.BQSchemaToStorageTableSchema(bqSchema)
if err != nil {
log.Fatal(err)
}
descriptor, err := adapt.StorageSchemaToProto2Descriptor(storageSchema, "root")
if err != nil {
log.Fatal(err)
}
// Create managed stream
stream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable(tableRef),
managedwriter.WithType(managedwriter.DefaultStream),
managedwriter.WithSchemaDescriptor(descriptor),
)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
// Create message descriptor for dynamic messages
msgDesc, err := protodesc.NewMessage(descriptor, nil)
if err != nil {
log.Fatal(err)
}
// Write data
for i := 0; i < 1000; i++ {
// Create dynamic message
msg := dynamicpb.NewMessage(msgDesc)
msg.Set(
msgDesc.Fields().ByName("event_timestamp"),
protoreflect.ValueOfString(time.Now().Format(time.RFC3339)),
)
msg.Set(
msgDesc.Fields().ByName("user_id"),
protoreflect.ValueOfInt64(int64(i)),
)
msg.Set(
msgDesc.Fields().ByName("event_type"),
protoreflect.ValueOfString("page_view"),
)
msg.Set(
msgDesc.Fields().ByName("metadata"),
protoreflect.ValueOfString(`{"page": "/home"}`),
)
// Marshal message
data, err := proto.Marshal(msg)
if err != nil {
log.Fatal(err)
}
// Append to stream
result, err := stream.AppendRows(ctx, [][]byte{data})
if err != nil {
log.Fatal(err)
}
// Wait for result (optional, can batch these)
_, err = result.GetResult(ctx)
if err != nil {
log.Printf("Append error: %v\n", err)
continue
}
if (i+1)%100 == 0 {
fmt.Printf("Written %d rows\n", i+1)
}
}
fmt.Println("All rows written successfully")
}| Feature | Streaming Inserts | Managed Writer |
|---|---|---|
| Throughput | Lower | Higher |
| Cost | Higher per GB | Lower per GB |
| Latency | Seconds | Seconds |
| Deduplication | Best-effort | Exactly-once |
| Schema updates | Manual | Automatic |
| Complexity | Lower | Higher |
| Best for | Simple use cases | High-volume production |