CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery-storage

Google Cloud Bigquery Storage API client library for high-performance streaming read/write access to BigQuery data

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

types-schemas.mddocs/

Types and Schemas

Comprehensive type system for BigQuery Storage operations including data format specifications, schema definitions, session configuration, stream management, and error handling. All types are based on Protocol Buffers and provide strong typing for BigQuery Storage API interactions.

Capabilities

Data Format Types

Core data format types for serializing and deserializing BigQuery data in different formats.

class DataFormat(enum.Enum):
    """Supported data serialization formats."""
    DATA_FORMAT_UNSPECIFIED = 0
    AVRO = 1      # Apache Avro format
    ARROW = 2     # Apache Arrow format  
    PROTO = 3     # Protocol Buffer format

Avro Format Types

Types for working with Apache Avro serialized data.

class AvroSchema:
    """Avro schema definition for BigQuery data."""
    schema: str  # JSON schema string

class AvroRows:
    """Avro-encoded row data."""
    serialized_binary_rows: bytes  # Avro binary data
    row_count: int                 # Number of rows encoded

class AvroSerializationOptions:
    """Options for Avro serialization."""
    enable_display_name_attribute: bool  # Use display names in schema

Arrow Format Types

Types for working with Apache Arrow serialized data.

class ArrowSchema:
    """Arrow schema definition for BigQuery data."""
    serialized_schema: bytes  # Serialized Arrow schema

class ArrowRecordBatch:
    """Arrow record batch data."""
    serialized_record_batch: bytes  # Serialized Arrow record batch
    row_count: int                  # Number of rows in batch

class ArrowSerializationOptions:
    """Options for Arrow serialization."""
    buffer_compression: ArrowSerializationOptions.CompressionCodec
    
class ArrowSerializationOptions.CompressionCodec(enum.Enum):
    """Arrow compression codecs."""
    COMPRESSION_UNSPECIFIED = 0
    LZ4_FRAME = 1  # LZ4 frame compression
    ZSTD = 2       # Zstandard compression

Protocol Buffer Types

Types for working with Protocol Buffer serialized data.

class ProtoSchema:
    """Protocol Buffer schema definition."""
    proto_descriptor: DescriptorProto  # Protocol buffer descriptor

class ProtoRows:
    """Protocol Buffer encoded rows."""
    serialized_rows: List[bytes]  # List of serialized row messages

Session and Stream Types

Types for configuring and managing read/write sessions and streams.

class ReadSession:
    """Configuration and state for a BigQuery read session."""
    name: str                    # Session resource name
    table: str                   # Source table path  
    data_format: DataFormat      # Output data format
    read_options: ReadSession.TableReadOptions
    streams: List[ReadStream]    # Available read streams
    estimated_total_bytes_scanned: int
    estimated_row_count: int
    avro_schema: AvroSchema      # Schema for Avro format
    arrow_schema: ArrowSchema    # Schema for Arrow format
    table_modifiers: ReadSession.TableModifiers

class ReadSession.TableReadOptions:
    """Options for reading table data."""
    selected_fields: List[str]   # Column names to read
    row_restriction: str         # SQL WHERE clause filter
    arrow_serialization_options: ArrowSerializationOptions
    avro_serialization_options: AvroSerializationOptions
    sample_percentage: float     # Percentage of data to sample

class ReadSession.TableModifiers:
    """Modifiers for table access."""
    snapshot_time: Timestamp     # Point-in-time snapshot

class ReadStream:
    """Individual read stream within a session."""
    name: str                    # Stream resource name

class WriteStream:
    """Configuration and state for a BigQuery write stream."""
    name: str                    # Stream resource name
    type_: WriteStream.Type      # Stream type
    create_time: Timestamp       # Creation timestamp
    commit_time: Timestamp       # Commit timestamp (if committed)
    table_schema: TableSchema    # Target table schema
    state: WriteStream.State     # Current stream state
    location: str               # Geographic location

class WriteStream.Type(enum.Enum):
    """Write stream types."""
    TYPE_UNSPECIFIED = 0
    COMMITTED = 1    # Default stream, auto-commits
    PENDING = 2      # Pending stream, requires explicit commit  
    BUFFERED = 3     # Buffered stream for batch processing

class WriteStream.State(enum.Enum):
    """Write stream states."""
    STATE_UNSPECIFIED = 0
    CREATED = 1      # Stream created but not active
    RUNNING = 2      # Stream accepting data
    FINALIZED = 3    # Stream finalized, ready for commit
    COMMITTED = 4    # Stream data committed to table
    ABORTED = 5      # Stream aborted, data discarded

class WriteStream.WriteMode(enum.Enum):
    """Write stream modes."""
    WRITE_MODE_UNSPECIFIED = 0
    INSERT = 1       # Insert mode for appending rows

class WriteStreamView(enum.Enum):
    """Views for write stream information."""
    WRITE_STREAM_VIEW_UNSPECIFIED = 0
    BASIC = 1        # Basic stream information
    FULL = 2         # Full stream details including schema

Table Schema Types

Types for representing BigQuery table schemas and field definitions.

class TableSchema:
    """BigQuery table schema definition."""
    fields: List[TableFieldSchema]  # Table field definitions

class TableFieldSchema:
    """Individual table field schema."""
    name: str                       # Field name
    type_: TableFieldSchema.Type    # Field data type
    mode: TableFieldSchema.Mode     # Field mode (nullable, required, repeated)
    fields: List[TableFieldSchema]  # Nested field schemas (for RECORD type)
    description: str                # Field description
    max_length: int                # Maximum length for STRING/BYTES
    precision: int                 # Precision for NUMERIC/BIGNUMERIC
    scale: int                     # Scale for NUMERIC/BIGNUMERIC
    default_value_expression: str  # Default value expression

class TableFieldSchema.Type(enum.Enum):
    """BigQuery field data types."""
    TYPE_UNSPECIFIED = 0
    STRING = 1
    INT64 = 2  
    DOUBLE = 3
    NUMERIC = 4
    BOOL = 5
    TIMESTAMP = 6
    DATE = 7
    TIME = 8
    DATETIME = 9
    GEOGRAPHY = 10
    RECORD = 11      # Nested record/struct
    BYTES = 12
    JSON = 13
    BIGNUMERIC = 14
    INTERVAL = 15
    RANGE = 16

class TableFieldSchema.Mode(enum.Enum):
    """BigQuery field modes."""
    MODE_UNSPECIFIED = 0
    NULLABLE = 1     # Field can be null
    REQUIRED = 2     # Field cannot be null
    REPEATED = 3     # Field is an array

Request and Response Types

Message types for BigQuery Storage API operations.

class CreateReadSessionRequest:
    """Request to create a read session."""
    parent: str                    # Project ID
    read_session: ReadSession      # Session configuration
    max_stream_count: int         # Maximum parallel streams

class ReadRowsRequest:
    """Request to read rows from a stream."""
    read_stream: str              # Stream name
    offset: int                   # Starting offset

class ReadRowsResponse:
    """Response containing row data from a stream."""
    avro_rows: AvroRows           # Avro format data
    arrow_record_batch: ArrowRecordBatch  # Arrow format data
    row_count: int                # Number of rows in response
    stats: StreamStats            # Stream statistics
    throttle_state: ThrottleState # Throttling information

class SplitReadStreamRequest:
    """Request to split a read stream."""
    name: str                     # Stream to split
    fraction: float               # Split point (0.0 to 1.0)

class SplitReadStreamResponse:
    """Response with split stream information."""
    primary_stream: ReadStream    # First part of split
    remainder_stream: ReadStream  # Second part of split

class CreateWriteStreamRequest:
    """Request to create a write stream."""
    parent: str                   # Table path
    write_stream: WriteStream     # Stream configuration

class AppendRowsRequest:
    """Request to append rows to a write stream."""
    write_stream: str             # Stream name
    offset: int                   # Append offset
    proto_rows: AppendRowsRequest.ProtoData    # Protocol buffer data
    arrow_rows: AppendRowsRequest.ArrowData    # Arrow format data
    trace_id: str                 # Request trace ID

class AppendRowsRequest.ProtoData:
    """Protocol buffer row data."""
    writer_schema: ProtoSchema    # Schema for data
    serialized_rows: List[bytes]  # Serialized row messages

class AppendRowsRequest.ArrowData:
    """Arrow format row data."""
    writer_schema: ArrowSchema    # Schema for data
    serialized_record_batch: bytes # Serialized record batch

class AppendRowsResponse:
    """Response to append rows request."""
    append_result: AppendRowsResponse.AppendResult  # Success result
    error: Status                 # Error information
    updated_schema: TableSchema   # Updated table schema
    row_errors: List[RowError]    # Individual row errors

class AppendRowsResponse.AppendResult:
    """Successful append result."""
    offset: int                   # Offset of appended data

Error and Status Types

Types for error handling and operation status reporting.

class StorageError:
    """Storage operation error information."""
    code: StorageError.StorageErrorCode  # Error code
    entity: str                         # Affected entity
    error_message: str                  # Error description

class StorageError.StorageErrorCode(enum.Enum):
    """Storage error codes."""
    STORAGE_ERROR_CODE_UNSPECIFIED = 0
    TABLE_NOT_FOUND = 1           # Table does not exist
    STREAM_ALREADY_COMMITTED = 2  # Stream already committed
    STREAM_NOT_FOUND = 3          # Stream does not exist
    INVALID_STREAM_TYPE = 4       # Invalid stream type for operation
    INVALID_STREAM_STATE = 5      # Stream in wrong state
    STREAM_FINALIZED = 6          # Stream already finalized

class RowError:
    """Error information for individual rows."""
    index: int                    # Row index with error
    code: RowError.RowErrorCode   # Error code
    message: str                  # Error message

class RowError.RowErrorCode(enum.Enum):
    """Row-level error codes."""
    ROW_ERROR_CODE_UNSPECIFIED = 0
    ROW_PARSE_ERROR = 1           # Row parsing error
    UNKNOWN_ERROR = 2             # Unknown error
    FIELDS_ERROR = 3              # Field validation error

class StreamStats:
    """Statistics for stream operations."""
    progress: StreamStats.Progress  # Progress information

class StreamStats.Progress:
    """Stream progress information."""
    at_response_start: float      # Progress at response start
    at_response_end: float        # Progress at response end

class ThrottleState:
    """Throttling state information."""
    throttle_percent: int         # Throttle percentage (0-100)

Utility Types

Common utility types used across BigQuery Storage operations.

class Timestamp:
    """Timestamp representation."""
    seconds: int                  # Seconds since Unix epoch
    nanos: int                   # Nanoseconds within second
    
    def FromMilliseconds(self, millis: int):
        """Set timestamp from milliseconds."""
    
    def ToMilliseconds(self) -> int:
        """Convert timestamp to milliseconds."""

class Status:
    """Operation status information."""
    code: int                     # Status code
    message: str                  # Status message
    details: List[Any]           # Additional details

class AppendRowsFuture:
    """Future object for tracking append operation results."""
    def result(self, timeout: float = None) -> AppendRowsResponse:
        """
        Get the append operation result.
        
        Parameters:
        - timeout: Maximum time to wait for result
        
        Returns:
        AppendRowsResponse with operation result
        """
    
    def exception(self, timeout: float = None) -> Exception:
        """Get exception if operation failed."""
    
    def done(self) -> bool:
        """Check if operation is complete."""

class StreamClosedError(Exception):
    """Exception raised when operations are attempted on closed streams."""

Usage Examples

Working with Schema Types

from google.cloud.bigquery_storage import types

# Define table schema
schema = types.TableSchema(
    fields=[
        types.TableFieldSchema(
            name="id",
            type_=types.TableFieldSchema.Type.INT64,
            mode=types.TableFieldSchema.Mode.REQUIRED
        ),
        types.TableFieldSchema(
            name="name", 
            type_=types.TableFieldSchema.Type.STRING,
            mode=types.TableFieldSchema.Mode.NULLABLE,
            max_length=100
        ),
        types.TableFieldSchema(
            name="scores",
            type_=types.TableFieldSchema.Type.DOUBLE,
            mode=types.TableFieldSchema.Mode.REPEATED
        ),
        types.TableFieldSchema(
            name="metadata",
            type_=types.TableFieldSchema.Type.RECORD,
            mode=types.TableFieldSchema.Mode.NULLABLE,
            fields=[
                types.TableFieldSchema(
                    name="created_at",
                    type_=types.TableFieldSchema.Type.TIMESTAMP,
                    mode=types.TableFieldSchema.Mode.REQUIRED
                ),
                types.TableFieldSchema(
                    name="tags",
                    type_=types.TableFieldSchema.Type.STRING,
                    mode=types.TableFieldSchema.Mode.REPEATED
                )
            ]
        )
    ]
)

Configuring Data Formats

from google.cloud.bigquery_storage import types

# Arrow serialization with compression
arrow_options = types.ArrowSerializationOptions(
    buffer_compression=types.ArrowSerializationOptions.CompressionCodec.ZSTD
)

# Avro serialization with display names
avro_options = types.AvroSerializationOptions(
    enable_display_name_attribute=True
)

# Read session with format options
read_options = types.ReadSession.TableReadOptions(
    selected_fields=["id", "name", "metadata.created_at"],
    row_restriction='id > 1000 AND name IS NOT NULL',
    arrow_serialization_options=arrow_options,
    sample_percentage=10.0  # Sample 10% of data
)

requested_session = types.ReadSession(
    table="projects/my-project/datasets/my_dataset/tables/my_table",
    data_format=types.DataFormat.ARROW,
    read_options=read_options
)

Working with Write Stream Types

from google.cloud.bigquery_storage import types

# Create pending write stream
write_stream = types.WriteStream(
    type_=types.WriteStream.Type.PENDING
)

# Check stream state
if write_stream.state == types.WriteStream.State.RUNNING:
    print("Stream is accepting data")
elif write_stream.state == types.WriteStream.State.FINALIZED:
    print("Stream is ready for commit")

# Create append request with proto data
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.serialized_rows = [serialized_row_1, serialized_row_2]

request = types.AppendRowsRequest(
    write_stream=stream_name,
    proto_rows=proto_data,
    trace_id="my-trace-123"  # For debugging
)

Error Handling with Types

from google.cloud.bigquery_storage import types
from google.cloud import bigquery_storage

try:
    # Perform append operation
    response = client.append_rows([request])
    
except Exception as e:
    # Handle storage errors
    if hasattr(e, 'details'):
        for detail in e.details:
            if isinstance(detail, types.StorageError):
                if detail.code == types.StorageError.StorageErrorCode.TABLE_NOT_FOUND:
                    print(f"Table not found: {detail.entity}")
                elif detail.code == types.StorageError.StorageErrorCode.STREAM_FINALIZED:
                    print(f"Stream already finalized: {detail.entity}")

# Check for row-level errors in response
for response in response_stream:
    if response.row_errors:
        for row_error in response.row_errors:
            print(f"Row {row_error.index} error: {row_error.message}")

Time-based Operations

from google.cloud.bigquery_storage import types
import time

# Create timestamp for snapshot
snapshot_time = types.Timestamp()
current_millis = int(time.time() * 1000)
snapshot_time.FromMilliseconds(current_millis)

# Use in table modifiers
table_modifiers = types.ReadSession.TableModifiers(
    snapshot_time=snapshot_time
)

read_session = types.ReadSession(
    table=table_path,
    data_format=types.DataFormat.AVRO,
    table_modifiers=table_modifiers
)

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery-storage

docs

index.md

metastore-services.md

reading-data.md

types-schemas.md

writing-data.md

tile.json