CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery-storage

Google Cloud Bigquery Storage API client library for high-performance streaming read/write access to BigQuery data

Pending
Overview
Eval results
Files

writing-data.mddocs/

Writing Data

Streaming write operations to BigQuery tables using the BigQuery Storage API. Supports transactional semantics with multiple write stream types, batch commit operations, and multiple data formats including Protocol Buffers, Avro, and Arrow.

Capabilities

BigQuery Write Client

Main client for writing data to BigQuery tables with streaming capabilities and transactional guarantees.

class BigQueryWriteClient:
    def __init__(self, **kwargs):
        """
        Initialize BigQuery Write Client.
        
        Parameters:
        - credentials: Google Cloud credentials
        - project: Default project ID
        - client_info: Client library information
        """

    def create_write_stream(
        self,
        parent: str,
        write_stream: WriteStream,
        **kwargs
    ) -> WriteStream:
        """
        Create a new write stream for appending data to BigQuery.
        
        Parameters:
        - parent: Table path in format "projects/{project}/datasets/{dataset}/tables/{table}"
        - write_stream: WriteStream configuration with type and options
        
        Returns:
        WriteStream with stream name and metadata
        """

    def append_rows(
        self,
        requests: Iterator[AppendRowsRequest],
        **kwargs
    ) -> Iterator[AppendRowsResponse]:
        """
        Append rows to a write stream (bidirectional streaming RPC).
        
        Parameters:
        - requests: Iterator of AppendRowsRequest messages with serialized data
        
        Returns:
        Iterator of AppendRowsResponse messages with append results
        """

    def get_write_stream(self, name: str, **kwargs) -> WriteStream:
        """
        Get write stream information and current state.
        
        Parameters:
        - name: Write stream name
        
        Returns:
        WriteStream with current state and metadata
        """

    def finalize_write_stream(
        self,
        name: str,
        **kwargs
    ) -> FinalizeWriteStreamResponse:
        """
        Finalize a write stream to prepare it for commit.
        
        Parameters:
        - name: Write stream name
        
        Returns:
        FinalizeWriteStreamResponse with row count and state
        """

    def batch_commit_write_streams(
        self,
        parent: str,
        write_streams: List[str],
        **kwargs
    ) -> BatchCommitWriteStreamsResponse:
        """
        Atomically commit multiple write streams.
        
        Parameters:
        - parent: Table path
        - write_streams: List of write stream names to commit
        
        Returns:
        BatchCommitWriteStreamsResponse with commit timestamp and errors
        """

    def flush_rows(
        self,
        write_stream: str,
        offset: int = None,
        **kwargs
    ) -> FlushRowsResponse:
        """
        Flush buffered rows in a write stream.
        
        Parameters:
        - write_stream: Write stream name
        - offset: Offset to flush up to (optional)
        
        Returns:
        FlushRowsResponse with flush offset
        """

BigQuery Write Async Client

Async version of BigQueryWriteClient with same methods using async/await pattern.

class BigQueryWriteAsyncClient:
    async def create_write_stream(
        self,
        parent: str,
        write_stream: WriteStream,
        **kwargs
    ) -> WriteStream: ...

    async def append_rows(
        self,
        requests: AsyncIterator[AppendRowsRequest],
        **kwargs
    ) -> AsyncIterator[AppendRowsResponse]: ...

    async def get_write_stream(self, name: str, **kwargs) -> WriteStream: ...

    async def finalize_write_stream(
        self, 
        name: str, 
        **kwargs
    ) -> FinalizeWriteStreamResponse: ...

    async def batch_commit_write_streams(
        self,
        parent: str,
        write_streams: List[str],
        **kwargs
    ) -> BatchCommitWriteStreamsResponse: ...

    async def flush_rows(
        self,
        write_stream: str,
        offset: int = None,
        **kwargs
    ) -> FlushRowsResponse: ...

Append Rows Stream

Helper class that wraps write stream operations and provides convenient data appending methods.

class AppendRowsStream:
    def send(self, request: AppendRowsRequest) -> AppendRowsFuture:
        """
        Send append request and get future for response.
        
        Parameters:
        - request: AppendRowsRequest with serialized row data
        
        Returns:
        AppendRowsFuture for tracking append result
        """

    def close(self, reason: str = None):
        """
        Close the write stream.
        
        Parameters:
        - reason: Optional reason for closing
        """

    def is_active(self) -> bool:
        """Check if the write stream is still active."""

    def add_close_callback(self, callback: Callable):
        """
        Add callback to be called when stream closes.
        
        Parameters:
        - callback: Function to call on stream close
        """

Path Helper Methods

Utilities for constructing and parsing BigQuery resource paths.

class BigQueryWriteClient:
    @staticmethod
    def table_path(project: str, dataset: str, table: str) -> str:
        """Construct BigQuery table resource path."""

    @staticmethod
    def parse_table_path(path: str) -> dict:
        """Parse table path into project, dataset, table components."""

    @staticmethod
    def write_stream_path(
        project: str,
        dataset: str,
        table: str,
        stream: str
    ) -> str:
        """Construct write stream resource path."""

    @staticmethod
    def parse_write_stream_path(path: str) -> dict:
        """Parse write stream path into components."""

Usage Examples

Basic Write Stream (Pending Mode)

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types

# Create client
write_client = bigquery_storage_v1.BigQueryWriteClient()

# Create write stream
parent = write_client.table_path("your-project", "your_dataset", "your_table")
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

# Prepare append request with protocol buffer data
request = types.AppendRowsRequest()
request.write_stream = stream.name

# Add serialized row data (requires protocol buffer schema)
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.serialized_rows = [serialized_row_data]  # Your serialized data
request.proto_rows = proto_data

# Append rows
response_stream = write_client.append_rows([request])
for response in response_stream:
    if response.HasField('error'):
        print(f"Error: {response.error}")
    else:
        print(f"Appended {len(response.append_result.offset)} rows")

# Finalize and commit
write_client.finalize_write_stream(name=stream.name)
commit_response = write_client.batch_commit_write_streams(
    parent=parent,
    write_streams=[stream.name]
)
print(f"Committed at: {commit_response.commit_time}")

Default Stream (Immediate Commit)

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types

write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")

# Create default stream (auto-commits)
write_stream = types.WriteStream(type_=types.WriteStream.Type.COMMITTED)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

# Append data (automatically committed)
request = types.AppendRowsRequest(write_stream=stream.name)
# ... configure with data ...

response_stream = write_client.append_rows([request])
for response in response_stream:
    if response.append_result:
        print(f"Data committed at offset: {response.append_result.offset}")

Batch Commit Multiple Streams

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types

write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")

# Create multiple pending streams
streams = []
for i in range(3):
    write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
    stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
    streams.append(stream)
    
    # Append data to each stream
    request = types.AppendRowsRequest(write_stream=stream.name)
    # ... add data for stream i ...
    write_client.append_rows([request])

# Finalize all streams
for stream in streams:
    write_client.finalize_write_stream(name=stream.name)

# Atomic batch commit
stream_names = [stream.name for stream in streams]
commit_response = write_client.batch_commit_write_streams(
    parent=parent,
    write_streams=stream_names
)

if commit_response.stream_errors:
    print("Some streams failed to commit")
else:
    print(f"All streams committed at: {commit_response.commit_time}")

Using AppendRowsStream Helper

from google.cloud.bigquery_storage_v1 import writer, types

# Create write stream
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

# Use helper class
append_stream = writer.AppendRowsStream(write_client, stream.name)

# Send data using helper
request = types.AppendRowsRequest()
# ... configure request ...

future = append_stream.send(request)
try:
    response = future.result(timeout=30)
    print(f"Append successful: {response.append_result.offset}")
except Exception as e:
    print(f"Append failed: {e}")

# Clean up
append_stream.close()

Arrow Format Writing

import pyarrow as pa
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types

write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")

# Create Arrow schema and data
schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("name", pa.string()),
    pa.field("value", pa.float64())
])

# Create Arrow table
data = pa.table([
    pa.array([1, 2, 3]),
    pa.array(["Alice", "Bob", "Charlie"]),
    pa.array([10.5, 20.3, 30.1])
], schema=schema)

# Convert to record batch
record_batch = data.to_batches()[0]

# Create write stream
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

# Prepare append request
request = types.AppendRowsRequest(write_stream=stream.name)
arrow_data = types.AppendRowsRequest.ArrowData()
arrow_data.serialized_record_batch = record_batch.serialize().to_pybytes()
request.arrow_rows = arrow_data

# Append and commit
write_client.append_rows([request])
write_client.finalize_write_stream(name=stream.name)
write_client.batch_commit_write_streams(parent=parent, write_streams=[stream.name])

Error Handling and Retry

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.api_core import exceptions, retry
import time

write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")

def append_with_retry(request, max_retries=3):
    for attempt in range(max_retries):
        try:
            response_stream = write_client.append_rows([request])
            for response in response_stream:
                if response.HasField('error'):
                    raise Exception(f"Append error: {response.error}")
                return response
                
        except exceptions.ResourceExhausted:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited, waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
        except exceptions.Aborted:
            if attempt < max_retries - 1:
                print(f"Request aborted, retrying attempt {attempt + 1}")
                time.sleep(1)
            else:
                raise

# Use retry wrapper
try:
    write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
    stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
    
    request = types.AppendRowsRequest(write_stream=stream.name)
    # ... configure request ...
    
    response = append_with_retry(request)
    print(f"Successfully appended rows: {response.append_result.offset}")
    
except Exception as e:
    print(f"Final append failure: {e}")

Types

WriteStream

class WriteStream:
    name: str
    type_: WriteStream.Type
    create_time: Timestamp
    commit_time: Timestamp
    table_schema: TableSchema
    state: WriteStream.State
    location: str

class WriteStream.Type(enum.Enum):
    TYPE_UNSPECIFIED = 0
    COMMITTED = 1  # Default stream, auto-commits
    PENDING = 2    # Pending stream, requires explicit commit
    BUFFERED = 3   # Buffered stream, for batch processing

class WriteStream.State(enum.Enum):
    STATE_UNSPECIFIED = 0
    CREATED = 1
    RUNNING = 2
    FINALIZED = 3
    COMMITTED = 4
    ABORTED = 5

Request/Response Types

class CreateWriteStreamRequest:
    parent: str
    write_stream: WriteStream

class AppendRowsRequest:
    write_stream: str
    offset: int
    proto_rows: AppendRowsRequest.ProtoData
    arrow_rows: AppendRowsRequest.ArrowData
    trace_id: str

class AppendRowsRequest.ProtoData:
    writer_schema: ProtoSchema
    serialized_rows: List[bytes]

class AppendRowsRequest.ArrowData:
    writer_schema: ArrowSchema
    serialized_record_batch: bytes

class AppendRowsResponse:
    append_result: AppendRowsResponse.AppendResult
    error: Status
    updated_schema: TableSchema
    row_errors: List[RowError]

class AppendRowsResponse.AppendResult:
    offset: int

class GetWriteStreamRequest:
    name: str
    view: WriteStreamView

class FinalizeWriteStreamRequest:
    name: str

class FinalizeWriteStreamResponse:
    row_count: int

class BatchCommitWriteStreamsRequest:
    parent: str
    write_streams: List[str]

class BatchCommitWriteStreamsResponse:
    commit_time: Timestamp
    stream_errors: List[StorageError]

class FlushRowsRequest:
    write_stream: str
    offset: int

class FlushRowsResponse:
    offset: int

Error Types

class StorageError:
    code: StorageError.StorageErrorCode
    entity: str
    error_message: str

class StorageError.StorageErrorCode(enum.Enum):
    STORAGE_ERROR_CODE_UNSPECIFIED = 0
    TABLE_NOT_FOUND = 1
    STREAM_ALREADY_COMMITTED = 2
    STREAM_NOT_FOUND = 3
    INVALID_STREAM_TYPE = 4
    INVALID_STREAM_STATE = 5
    STREAM_FINALIZED = 6

class RowError:
    index: int
    code: RowError.RowErrorCode
    message: str

class RowError.RowErrorCode(enum.Enum):
    ROW_ERROR_CODE_UNSPECIFIED = 0
    ROW_PARSE_ERROR = 1
    UNKNOWN_ERROR = 2
    FIELDS_ERROR = 3

class WriteStreamView(enum.Enum):
    """Views for write stream information."""
    WRITE_STREAM_VIEW_UNSPECIFIED = 0
    BASIC = 1        # Basic stream information
    FULL = 2         # Full stream details including schema

class AppendRowsFuture:
    """Future object for tracking append operation results."""
    def result(self, timeout: float = None) -> AppendRowsResponse:
        """
        Get the append operation result.
        
        Parameters:
        - timeout: Maximum time to wait for result
        
        Returns:
        AppendRowsResponse with operation result
        """
    
    def exception(self, timeout: float = None) -> Exception:
        """Get exception if operation failed."""
    
    def done(self) -> bool:
        """Check if operation is complete."""

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery-storage

docs

index.md

metastore-services.md

reading-data.md

types-schemas.md

writing-data.md

tile.json