Google Cloud Bigquery Storage API client library for high-performance streaming read/write access to BigQuery data
—
Streaming write operations to BigQuery tables using the BigQuery Storage API. Supports transactional semantics with multiple write stream types, batch commit operations, and multiple data formats including Protocol Buffers, Avro, and Arrow.
Main client for writing data to BigQuery tables with streaming capabilities and transactional guarantees.
class BigQueryWriteClient:
def __init__(self, **kwargs):
"""
Initialize BigQuery Write Client.
Parameters:
- credentials: Google Cloud credentials
- project: Default project ID
- client_info: Client library information
"""
def create_write_stream(
self,
parent: str,
write_stream: WriteStream,
**kwargs
) -> WriteStream:
"""
Create a new write stream for appending data to BigQuery.
Parameters:
- parent: Table path in format "projects/{project}/datasets/{dataset}/tables/{table}"
- write_stream: WriteStream configuration with type and options
Returns:
WriteStream with stream name and metadata
"""
def append_rows(
self,
requests: Iterator[AppendRowsRequest],
**kwargs
) -> Iterator[AppendRowsResponse]:
"""
Append rows to a write stream (bidirectional streaming RPC).
Parameters:
- requests: Iterator of AppendRowsRequest messages with serialized data
Returns:
Iterator of AppendRowsResponse messages with append results
"""
def get_write_stream(self, name: str, **kwargs) -> WriteStream:
"""
Get write stream information and current state.
Parameters:
- name: Write stream name
Returns:
WriteStream with current state and metadata
"""
def finalize_write_stream(
self,
name: str,
**kwargs
) -> FinalizeWriteStreamResponse:
"""
Finalize a write stream to prepare it for commit.
Parameters:
- name: Write stream name
Returns:
FinalizeWriteStreamResponse with row count and state
"""
def batch_commit_write_streams(
self,
parent: str,
write_streams: List[str],
**kwargs
) -> BatchCommitWriteStreamsResponse:
"""
Atomically commit multiple write streams.
Parameters:
- parent: Table path
- write_streams: List of write stream names to commit
Returns:
BatchCommitWriteStreamsResponse with commit timestamp and errors
"""
def flush_rows(
self,
write_stream: str,
offset: int = None,
**kwargs
) -> FlushRowsResponse:
"""
Flush buffered rows in a write stream.
Parameters:
- write_stream: Write stream name
- offset: Offset to flush up to (optional)
Returns:
FlushRowsResponse with flush offset
"""Async version of BigQueryWriteClient with same methods using async/await pattern.
class BigQueryWriteAsyncClient:
async def create_write_stream(
self,
parent: str,
write_stream: WriteStream,
**kwargs
) -> WriteStream: ...
async def append_rows(
self,
requests: AsyncIterator[AppendRowsRequest],
**kwargs
) -> AsyncIterator[AppendRowsResponse]: ...
async def get_write_stream(self, name: str, **kwargs) -> WriteStream: ...
async def finalize_write_stream(
self,
name: str,
**kwargs
) -> FinalizeWriteStreamResponse: ...
async def batch_commit_write_streams(
self,
parent: str,
write_streams: List[str],
**kwargs
) -> BatchCommitWriteStreamsResponse: ...
async def flush_rows(
self,
write_stream: str,
offset: int = None,
**kwargs
) -> FlushRowsResponse: ...Helper class that wraps write stream operations and provides convenient data appending methods.
class AppendRowsStream:
def send(self, request: AppendRowsRequest) -> AppendRowsFuture:
"""
Send append request and get future for response.
Parameters:
- request: AppendRowsRequest with serialized row data
Returns:
AppendRowsFuture for tracking append result
"""
def close(self, reason: str = None):
"""
Close the write stream.
Parameters:
- reason: Optional reason for closing
"""
def is_active(self) -> bool:
"""Check if the write stream is still active."""
def add_close_callback(self, callback: Callable):
"""
Add callback to be called when stream closes.
Parameters:
- callback: Function to call on stream close
"""Utilities for constructing and parsing BigQuery resource paths.
class BigQueryWriteClient:
@staticmethod
def table_path(project: str, dataset: str, table: str) -> str:
"""Construct BigQuery table resource path."""
@staticmethod
def parse_table_path(path: str) -> dict:
"""Parse table path into project, dataset, table components."""
@staticmethod
def write_stream_path(
project: str,
dataset: str,
table: str,
stream: str
) -> str:
"""Construct write stream resource path."""
@staticmethod
def parse_write_stream_path(path: str) -> dict:
"""Parse write stream path into components."""from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
# Create client
write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create write stream
parent = write_client.table_path("your-project", "your_dataset", "your_table")
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
# Prepare append request with protocol buffer data
request = types.AppendRowsRequest()
request.write_stream = stream.name
# Add serialized row data (requires protocol buffer schema)
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.serialized_rows = [serialized_row_data] # Your serialized data
request.proto_rows = proto_data
# Append rows
response_stream = write_client.append_rows([request])
for response in response_stream:
if response.HasField('error'):
print(f"Error: {response.error}")
else:
print(f"Appended {len(response.append_result.offset)} rows")
# Finalize and commit
write_client.finalize_write_stream(name=stream.name)
commit_response = write_client.batch_commit_write_streams(
parent=parent,
write_streams=[stream.name]
)
print(f"Committed at: {commit_response.commit_time}")from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")
# Create default stream (auto-commits)
write_stream = types.WriteStream(type_=types.WriteStream.Type.COMMITTED)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
# Append data (automatically committed)
request = types.AppendRowsRequest(write_stream=stream.name)
# ... configure with data ...
response_stream = write_client.append_rows([request])
for response in response_stream:
if response.append_result:
print(f"Data committed at offset: {response.append_result.offset}")from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")
# Create multiple pending streams
streams = []
for i in range(3):
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
streams.append(stream)
# Append data to each stream
request = types.AppendRowsRequest(write_stream=stream.name)
# ... add data for stream i ...
write_client.append_rows([request])
# Finalize all streams
for stream in streams:
write_client.finalize_write_stream(name=stream.name)
# Atomic batch commit
stream_names = [stream.name for stream in streams]
commit_response = write_client.batch_commit_write_streams(
parent=parent,
write_streams=stream_names
)
if commit_response.stream_errors:
print("Some streams failed to commit")
else:
print(f"All streams committed at: {commit_response.commit_time}")from google.cloud.bigquery_storage_v1 import writer, types
# Create write stream
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
# Use helper class
append_stream = writer.AppendRowsStream(write_client, stream.name)
# Send data using helper
request = types.AppendRowsRequest()
# ... configure request ...
future = append_stream.send(request)
try:
response = future.result(timeout=30)
print(f"Append successful: {response.append_result.offset}")
except Exception as e:
print(f"Append failed: {e}")
# Clean up
append_stream.close()import pyarrow as pa
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")
# Create Arrow schema and data
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("value", pa.float64())
])
# Create Arrow table
data = pa.table([
pa.array([1, 2, 3]),
pa.array(["Alice", "Bob", "Charlie"]),
pa.array([10.5, 20.3, 30.1])
], schema=schema)
# Convert to record batch
record_batch = data.to_batches()[0]
# Create write stream
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
# Prepare append request
request = types.AppendRowsRequest(write_stream=stream.name)
arrow_data = types.AppendRowsRequest.ArrowData()
arrow_data.serialized_record_batch = record_batch.serialize().to_pybytes()
request.arrow_rows = arrow_data
# Append and commit
write_client.append_rows([request])
write_client.finalize_write_stream(name=stream.name)
write_client.batch_commit_write_streams(parent=parent, write_streams=[stream.name])from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.api_core import exceptions, retry
import time
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path("your-project", "your_dataset", "your_table")
def append_with_retry(request, max_retries=3):
for attempt in range(max_retries):
try:
response_stream = write_client.append_rows([request])
for response in response_stream:
if response.HasField('error'):
raise Exception(f"Append error: {response.error}")
return response
except exceptions.ResourceExhausted:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except exceptions.Aborted:
if attempt < max_retries - 1:
print(f"Request aborted, retrying attempt {attempt + 1}")
time.sleep(1)
else:
raise
# Use retry wrapper
try:
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
request = types.AppendRowsRequest(write_stream=stream.name)
# ... configure request ...
response = append_with_retry(request)
print(f"Successfully appended rows: {response.append_result.offset}")
except Exception as e:
print(f"Final append failure: {e}")class WriteStream:
name: str
type_: WriteStream.Type
create_time: Timestamp
commit_time: Timestamp
table_schema: TableSchema
state: WriteStream.State
location: str
class WriteStream.Type(enum.Enum):
TYPE_UNSPECIFIED = 0
COMMITTED = 1 # Default stream, auto-commits
PENDING = 2 # Pending stream, requires explicit commit
BUFFERED = 3 # Buffered stream, for batch processing
class WriteStream.State(enum.Enum):
STATE_UNSPECIFIED = 0
CREATED = 1
RUNNING = 2
FINALIZED = 3
COMMITTED = 4
ABORTED = 5class CreateWriteStreamRequest:
parent: str
write_stream: WriteStream
class AppendRowsRequest:
write_stream: str
offset: int
proto_rows: AppendRowsRequest.ProtoData
arrow_rows: AppendRowsRequest.ArrowData
trace_id: str
class AppendRowsRequest.ProtoData:
writer_schema: ProtoSchema
serialized_rows: List[bytes]
class AppendRowsRequest.ArrowData:
writer_schema: ArrowSchema
serialized_record_batch: bytes
class AppendRowsResponse:
append_result: AppendRowsResponse.AppendResult
error: Status
updated_schema: TableSchema
row_errors: List[RowError]
class AppendRowsResponse.AppendResult:
offset: int
class GetWriteStreamRequest:
name: str
view: WriteStreamView
class FinalizeWriteStreamRequest:
name: str
class FinalizeWriteStreamResponse:
row_count: int
class BatchCommitWriteStreamsRequest:
parent: str
write_streams: List[str]
class BatchCommitWriteStreamsResponse:
commit_time: Timestamp
stream_errors: List[StorageError]
class FlushRowsRequest:
write_stream: str
offset: int
class FlushRowsResponse:
offset: intclass StorageError:
code: StorageError.StorageErrorCode
entity: str
error_message: str
class StorageError.StorageErrorCode(enum.Enum):
STORAGE_ERROR_CODE_UNSPECIFIED = 0
TABLE_NOT_FOUND = 1
STREAM_ALREADY_COMMITTED = 2
STREAM_NOT_FOUND = 3
INVALID_STREAM_TYPE = 4
INVALID_STREAM_STATE = 5
STREAM_FINALIZED = 6
class RowError:
index: int
code: RowError.RowErrorCode
message: str
class RowError.RowErrorCode(enum.Enum):
ROW_ERROR_CODE_UNSPECIFIED = 0
ROW_PARSE_ERROR = 1
UNKNOWN_ERROR = 2
FIELDS_ERROR = 3
class WriteStreamView(enum.Enum):
"""Views for write stream information."""
WRITE_STREAM_VIEW_UNSPECIFIED = 0
BASIC = 1 # Basic stream information
FULL = 2 # Full stream details including schema
class AppendRowsFuture:
"""Future object for tracking append operation results."""
def result(self, timeout: float = None) -> AppendRowsResponse:
"""
Get the append operation result.
Parameters:
- timeout: Maximum time to wait for result
Returns:
AppendRowsResponse with operation result
"""
def exception(self, timeout: float = None) -> Exception:
"""Get exception if operation failed."""
def done(self) -> bool:
"""Check if operation is complete."""Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery-storage