Google Cloud Bigquery Storage API client library for high-performance streaming read/write access to BigQuery data
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive type system for BigQuery Storage operations including data format specifications, schema definitions, session configuration, stream management, and error handling. All types are based on Protocol Buffers and provide strong typing for BigQuery Storage API interactions.
Core data format types for serializing and deserializing BigQuery data in different formats.
class DataFormat(enum.Enum):
"""Supported data serialization formats."""
DATA_FORMAT_UNSPECIFIED = 0
AVRO = 1 # Apache Avro format
ARROW = 2 # Apache Arrow format
PROTO = 3 # Protocol Buffer formatTypes for working with Apache Avro serialized data.
class AvroSchema:
"""Avro schema definition for BigQuery data."""
schema: str # JSON schema string
class AvroRows:
"""Avro-encoded row data."""
serialized_binary_rows: bytes # Avro binary data
row_count: int # Number of rows encoded
class AvroSerializationOptions:
"""Options for Avro serialization."""
enable_display_name_attribute: bool # Use display names in schemaTypes for working with Apache Arrow serialized data.
class ArrowSchema:
"""Arrow schema definition for BigQuery data."""
serialized_schema: bytes # Serialized Arrow schema
class ArrowRecordBatch:
"""Arrow record batch data."""
serialized_record_batch: bytes # Serialized Arrow record batch
row_count: int # Number of rows in batch
class ArrowSerializationOptions:
"""Options for Arrow serialization."""
buffer_compression: ArrowSerializationOptions.CompressionCodec
class ArrowSerializationOptions.CompressionCodec(enum.Enum):
"""Arrow compression codecs."""
COMPRESSION_UNSPECIFIED = 0
LZ4_FRAME = 1 # LZ4 frame compression
ZSTD = 2 # Zstandard compressionTypes for working with Protocol Buffer serialized data.
class ProtoSchema:
"""Protocol Buffer schema definition."""
proto_descriptor: DescriptorProto # Protocol buffer descriptor
class ProtoRows:
"""Protocol Buffer encoded rows."""
serialized_rows: List[bytes] # List of serialized row messagesTypes for configuring and managing read/write sessions and streams.
class ReadSession:
"""Configuration and state for a BigQuery read session."""
name: str # Session resource name
table: str # Source table path
data_format: DataFormat # Output data format
read_options: ReadSession.TableReadOptions
streams: List[ReadStream] # Available read streams
estimated_total_bytes_scanned: int
estimated_row_count: int
avro_schema: AvroSchema # Schema for Avro format
arrow_schema: ArrowSchema # Schema for Arrow format
table_modifiers: ReadSession.TableModifiers
class ReadSession.TableReadOptions:
"""Options for reading table data."""
selected_fields: List[str] # Column names to read
row_restriction: str # SQL WHERE clause filter
arrow_serialization_options: ArrowSerializationOptions
avro_serialization_options: AvroSerializationOptions
sample_percentage: float # Percentage of data to sample
class ReadSession.TableModifiers:
"""Modifiers for table access."""
snapshot_time: Timestamp # Point-in-time snapshot
class ReadStream:
"""Individual read stream within a session."""
name: str # Stream resource name
class WriteStream:
"""Configuration and state for a BigQuery write stream."""
name: str # Stream resource name
type_: WriteStream.Type # Stream type
create_time: Timestamp # Creation timestamp
commit_time: Timestamp # Commit timestamp (if committed)
table_schema: TableSchema # Target table schema
state: WriteStream.State # Current stream state
location: str # Geographic location
class WriteStream.Type(enum.Enum):
"""Write stream types."""
TYPE_UNSPECIFIED = 0
COMMITTED = 1 # Default stream, auto-commits
PENDING = 2 # Pending stream, requires explicit commit
BUFFERED = 3 # Buffered stream for batch processing
class WriteStream.State(enum.Enum):
"""Write stream states."""
STATE_UNSPECIFIED = 0
CREATED = 1 # Stream created but not active
RUNNING = 2 # Stream accepting data
FINALIZED = 3 # Stream finalized, ready for commit
COMMITTED = 4 # Stream data committed to table
ABORTED = 5 # Stream aborted, data discarded
class WriteStream.WriteMode(enum.Enum):
"""Write stream modes."""
WRITE_MODE_UNSPECIFIED = 0
INSERT = 1 # Insert mode for appending rows
class WriteStreamView(enum.Enum):
"""Views for write stream information."""
WRITE_STREAM_VIEW_UNSPECIFIED = 0
BASIC = 1 # Basic stream information
FULL = 2 # Full stream details including schemaTypes for representing BigQuery table schemas and field definitions.
class TableSchema:
"""BigQuery table schema definition."""
fields: List[TableFieldSchema] # Table field definitions
class TableFieldSchema:
"""Individual table field schema."""
name: str # Field name
type_: TableFieldSchema.Type # Field data type
mode: TableFieldSchema.Mode # Field mode (nullable, required, repeated)
fields: List[TableFieldSchema] # Nested field schemas (for RECORD type)
description: str # Field description
max_length: int # Maximum length for STRING/BYTES
precision: int # Precision for NUMERIC/BIGNUMERIC
scale: int # Scale for NUMERIC/BIGNUMERIC
default_value_expression: str # Default value expression
class TableFieldSchema.Type(enum.Enum):
"""BigQuery field data types."""
TYPE_UNSPECIFIED = 0
STRING = 1
INT64 = 2
DOUBLE = 3
NUMERIC = 4
BOOL = 5
TIMESTAMP = 6
DATE = 7
TIME = 8
DATETIME = 9
GEOGRAPHY = 10
RECORD = 11 # Nested record/struct
BYTES = 12
JSON = 13
BIGNUMERIC = 14
INTERVAL = 15
RANGE = 16
class TableFieldSchema.Mode(enum.Enum):
"""BigQuery field modes."""
MODE_UNSPECIFIED = 0
NULLABLE = 1 # Field can be null
REQUIRED = 2 # Field cannot be null
REPEATED = 3 # Field is an arrayMessage types for BigQuery Storage API operations.
class CreateReadSessionRequest:
"""Request to create a read session."""
parent: str # Project ID
read_session: ReadSession # Session configuration
max_stream_count: int # Maximum parallel streams
class ReadRowsRequest:
"""Request to read rows from a stream."""
read_stream: str # Stream name
offset: int # Starting offset
class ReadRowsResponse:
"""Response containing row data from a stream."""
avro_rows: AvroRows # Avro format data
arrow_record_batch: ArrowRecordBatch # Arrow format data
row_count: int # Number of rows in response
stats: StreamStats # Stream statistics
throttle_state: ThrottleState # Throttling information
class SplitReadStreamRequest:
"""Request to split a read stream."""
name: str # Stream to split
fraction: float # Split point (0.0 to 1.0)
class SplitReadStreamResponse:
"""Response with split stream information."""
primary_stream: ReadStream # First part of split
remainder_stream: ReadStream # Second part of split
class CreateWriteStreamRequest:
"""Request to create a write stream."""
parent: str # Table path
write_stream: WriteStream # Stream configuration
class AppendRowsRequest:
"""Request to append rows to a write stream."""
write_stream: str # Stream name
offset: int # Append offset
proto_rows: AppendRowsRequest.ProtoData # Protocol buffer data
arrow_rows: AppendRowsRequest.ArrowData # Arrow format data
trace_id: str # Request trace ID
class AppendRowsRequest.ProtoData:
"""Protocol buffer row data."""
writer_schema: ProtoSchema # Schema for data
serialized_rows: List[bytes] # Serialized row messages
class AppendRowsRequest.ArrowData:
"""Arrow format row data."""
writer_schema: ArrowSchema # Schema for data
serialized_record_batch: bytes # Serialized record batch
class AppendRowsResponse:
"""Response to append rows request."""
append_result: AppendRowsResponse.AppendResult # Success result
error: Status # Error information
updated_schema: TableSchema # Updated table schema
row_errors: List[RowError] # Individual row errors
class AppendRowsResponse.AppendResult:
"""Successful append result."""
offset: int # Offset of appended dataTypes for error handling and operation status reporting.
class StorageError:
"""Storage operation error information."""
code: StorageError.StorageErrorCode # Error code
entity: str # Affected entity
error_message: str # Error description
class StorageError.StorageErrorCode(enum.Enum):
"""Storage error codes."""
STORAGE_ERROR_CODE_UNSPECIFIED = 0
TABLE_NOT_FOUND = 1 # Table does not exist
STREAM_ALREADY_COMMITTED = 2 # Stream already committed
STREAM_NOT_FOUND = 3 # Stream does not exist
INVALID_STREAM_TYPE = 4 # Invalid stream type for operation
INVALID_STREAM_STATE = 5 # Stream in wrong state
STREAM_FINALIZED = 6 # Stream already finalized
class RowError:
"""Error information for individual rows."""
index: int # Row index with error
code: RowError.RowErrorCode # Error code
message: str # Error message
class RowError.RowErrorCode(enum.Enum):
"""Row-level error codes."""
ROW_ERROR_CODE_UNSPECIFIED = 0
ROW_PARSE_ERROR = 1 # Row parsing error
UNKNOWN_ERROR = 2 # Unknown error
FIELDS_ERROR = 3 # Field validation error
class StreamStats:
"""Statistics for stream operations."""
progress: StreamStats.Progress # Progress information
class StreamStats.Progress:
"""Stream progress information."""
at_response_start: float # Progress at response start
at_response_end: float # Progress at response end
class ThrottleState:
"""Throttling state information."""
throttle_percent: int # Throttle percentage (0-100)Common utility types used across BigQuery Storage operations.
class Timestamp:
"""Timestamp representation."""
seconds: int # Seconds since Unix epoch
nanos: int # Nanoseconds within second
def FromMilliseconds(self, millis: int):
"""Set timestamp from milliseconds."""
def ToMilliseconds(self) -> int:
"""Convert timestamp to milliseconds."""
class Status:
"""Operation status information."""
code: int # Status code
message: str # Status message
details: List[Any] # Additional details
class AppendRowsFuture:
"""Future object for tracking append operation results."""
def result(self, timeout: float = None) -> AppendRowsResponse:
"""
Get the append operation result.
Parameters:
- timeout: Maximum time to wait for result
Returns:
AppendRowsResponse with operation result
"""
def exception(self, timeout: float = None) -> Exception:
"""Get exception if operation failed."""
def done(self) -> bool:
"""Check if operation is complete."""
class StreamClosedError(Exception):
"""Exception raised when operations are attempted on closed streams."""from google.cloud.bigquery_storage import types
# Define table schema
schema = types.TableSchema(
fields=[
types.TableFieldSchema(
name="id",
type_=types.TableFieldSchema.Type.INT64,
mode=types.TableFieldSchema.Mode.REQUIRED
),
types.TableFieldSchema(
name="name",
type_=types.TableFieldSchema.Type.STRING,
mode=types.TableFieldSchema.Mode.NULLABLE,
max_length=100
),
types.TableFieldSchema(
name="scores",
type_=types.TableFieldSchema.Type.DOUBLE,
mode=types.TableFieldSchema.Mode.REPEATED
),
types.TableFieldSchema(
name="metadata",
type_=types.TableFieldSchema.Type.RECORD,
mode=types.TableFieldSchema.Mode.NULLABLE,
fields=[
types.TableFieldSchema(
name="created_at",
type_=types.TableFieldSchema.Type.TIMESTAMP,
mode=types.TableFieldSchema.Mode.REQUIRED
),
types.TableFieldSchema(
name="tags",
type_=types.TableFieldSchema.Type.STRING,
mode=types.TableFieldSchema.Mode.REPEATED
)
]
)
]
)from google.cloud.bigquery_storage import types
# Arrow serialization with compression
arrow_options = types.ArrowSerializationOptions(
buffer_compression=types.ArrowSerializationOptions.CompressionCodec.ZSTD
)
# Avro serialization with display names
avro_options = types.AvroSerializationOptions(
enable_display_name_attribute=True
)
# Read session with format options
read_options = types.ReadSession.TableReadOptions(
selected_fields=["id", "name", "metadata.created_at"],
row_restriction='id > 1000 AND name IS NOT NULL',
arrow_serialization_options=arrow_options,
sample_percentage=10.0 # Sample 10% of data
)
requested_session = types.ReadSession(
table="projects/my-project/datasets/my_dataset/tables/my_table",
data_format=types.DataFormat.ARROW,
read_options=read_options
)from google.cloud.bigquery_storage import types
# Create pending write stream
write_stream = types.WriteStream(
type_=types.WriteStream.Type.PENDING
)
# Check stream state
if write_stream.state == types.WriteStream.State.RUNNING:
print("Stream is accepting data")
elif write_stream.state == types.WriteStream.State.FINALIZED:
print("Stream is ready for commit")
# Create append request with proto data
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.serialized_rows = [serialized_row_1, serialized_row_2]
request = types.AppendRowsRequest(
write_stream=stream_name,
proto_rows=proto_data,
trace_id="my-trace-123" # For debugging
)from google.cloud.bigquery_storage import types
from google.cloud import bigquery_storage
try:
# Perform append operation
response = client.append_rows([request])
except Exception as e:
# Handle storage errors
if hasattr(e, 'details'):
for detail in e.details:
if isinstance(detail, types.StorageError):
if detail.code == types.StorageError.StorageErrorCode.TABLE_NOT_FOUND:
print(f"Table not found: {detail.entity}")
elif detail.code == types.StorageError.StorageErrorCode.STREAM_FINALIZED:
print(f"Stream already finalized: {detail.entity}")
# Check for row-level errors in response
for response in response_stream:
if response.row_errors:
for row_error in response.row_errors:
print(f"Row {row_error.index} error: {row_error.message}")from google.cloud.bigquery_storage import types
import time
# Create timestamp for snapshot
snapshot_time = types.Timestamp()
current_millis = int(time.time() * 1000)
snapshot_time.FromMilliseconds(current_millis)
# Use in table modifiers
table_modifiers = types.ReadSession.TableModifiers(
snapshot_time=snapshot_time
)
read_session = types.ReadSession(
table=table_path,
data_format=types.DataFormat.AVRO,
table_modifiers=table_modifiers
)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery-storage