Google Cloud Bigquery Storage API client library for high-performance streaming read/write access to BigQuery data
—
High-performance streaming reads from BigQuery tables using the BigQuery Storage API. Supports parallel processing, column selection, row filtering, and multiple data formats with direct conversion to pandas DataFrames and Apache Arrow.
Main client for reading data from BigQuery tables with streaming capabilities and format flexibility.
class BigQueryReadClient:
def __init__(self, **kwargs):
"""
Initialize BigQuery Read Client.
Parameters:
- credentials: Google Cloud credentials
- project: Default project ID
- client_info: Client library information
"""
def create_read_session(
self,
parent: str,
read_session: ReadSession,
max_stream_count: int = None,
**kwargs
) -> ReadSession:
"""
Create a new read session for streaming data from BigQuery.
Parameters:
- parent: Project ID in format "projects/{project_id}"
- read_session: ReadSession configuration with table and options
- max_stream_count: Maximum number of parallel streams (optional)
Returns:
ReadSession with stream information and metadata
"""
def read_rows(
self,
name: str,
offset: int = 0,
**kwargs
) -> ReadRowsStream:
"""
Read rows from a specific stream.
Parameters:
- name: Stream name from ReadSession.streams[].name
- offset: Starting offset for reading (optional)
Returns:
ReadRowsStream iterator for processing messages
"""
def split_read_stream(
self,
name: str,
fraction: float = None,
**kwargs
) -> SplitReadStreamResponse:
"""
Split a read stream into two streams for parallel processing.
Parameters:
- name: Stream name to split
- fraction: Split point as fraction (0.0 to 1.0)
Returns:
SplitReadStreamResponse with primary and remainder streams
"""
### BigQuery Read Async Client
Async version of BigQueryReadClient with async methods for non-blocking operations.
```python { .api }
class BigQueryReadAsyncClient:
def __init__(self, **kwargs):
"""
Initialize BigQuery Read Async Client.
Parameters:
- credentials: Google Cloud credentials
- project: Default project ID
- client_info: Client library information
"""
async def create_read_session(
self,
parent: str,
read_session: ReadSession,
max_stream_count: int = None,
**kwargs
) -> ReadSession:
"""
Create a new read session for streaming data from BigQuery (async).
Parameters:
- parent: Project ID in format "projects/{project_id}"
- read_session: ReadSession configuration with table and options
- max_stream_count: Maximum number of parallel streams (optional)
Returns:
ReadSession with stream information and metadata
"""
def read_rows(
self,
name: str,
offset: int = 0,
**kwargs
) -> ReadRowsStream:
"""
Read rows from a specific stream (sync method on async client).
Parameters:
- name: Stream name from ReadSession.streams[].name
- offset: Starting offset for reading (optional)
Returns:
ReadRowsStream iterator for processing messages
"""
async def split_read_stream(
self,
name: str,
fraction: float = None,
**kwargs
) -> SplitReadStreamResponse:
"""
Split a read stream into two streams for parallel processing (async).
Parameters:
- name: Stream name to split
- fraction: Split point as fraction (0.0 to 1.0)
Returns:
SplitReadStreamResponse with primary and remainder streams
"""Helper class that wraps read stream responses and provides convenient data parsing methods.
class ReadRowsStream:
def __iter__(self) -> Iterator[ReadRowsResponse]:
"""Iterate over ReadRowsResponse messages."""
def rows(self, read_session: ReadSession = None) -> Iterator[dict]:
"""
Parse stream messages into row dictionaries.
Parameters:
- read_session: ReadSession for schema information (required for Avro)
Returns:
Iterator of row dictionaries
Note: Requires fastavro for Avro format support
"""
def to_arrow(self, read_session: ReadSession = None):
"""
Convert stream to Apache Arrow format.
Parameters:
- read_session: ReadSession for schema information
Returns:
Apache Arrow Table
Note: Requires pyarrow for Arrow format support
"""
def to_dataframe(
self,
read_session: ReadSession = None,
dtypes: dict = None
):
"""
Convert stream to pandas DataFrame.
Parameters:
- read_session: ReadSession for schema information
- dtypes: Column data type specifications
Returns:
pandas DataFrame
Note: Requires pandas for DataFrame support
"""Utilities for constructing and parsing BigQuery resource paths.
class BigQueryReadClient:
@staticmethod
def read_session_path(project: str, location: str, session: str) -> str:
"""Construct read session resource path."""
@staticmethod
def parse_read_session_path(path: str) -> dict:
"""Parse read session path into components."""
@staticmethod
def read_stream_path(
project: str,
location: str,
session: str,
stream: str
) -> str:
"""Construct read stream resource path."""
@staticmethod
def parse_read_stream_path(path: str) -> dict:
"""Parse read stream path into components."""
@staticmethod
def table_path(project: str, dataset: str, table: str) -> str:
"""Construct BigQuery table resource path."""
@staticmethod
def parse_table_path(path: str) -> dict:
"""Parse table path into project, dataset, table components."""from google.cloud.bigquery_storage import BigQueryReadClient, types
# Create client
client = BigQueryReadClient()
# Configure table and session
table = "projects/bigquery-public-data/datasets/usa_names/tables/usa_1910_current"
requested_session = types.ReadSession(
table=table,
data_format=types.DataFormat.AVRO
)
# Create session with single stream
session = client.create_read_session(
parent="projects/your-project",
read_session=requested_session,
max_stream_count=1
)
# Read data
reader = client.read_rows(session.streams[0].name)
for row in reader.rows(session):
print(f"Name: {row['name']}, State: {row['state']}")from google.cloud.bigquery_storage import BigQueryReadClient, types
client = BigQueryReadClient()
table = "projects/your-project/datasets/your_dataset/tables/your_table"
# Configure read options
read_options = types.ReadSession.TableReadOptions(
selected_fields=["name", "age", "city"],
row_restriction='age > 18 AND city = "New York"'
)
requested_session = types.ReadSession(
table=table,
data_format=types.DataFormat.ARROW,
read_options=read_options
)
session = client.create_read_session(
parent="projects/your-project",
read_session=requested_session
)
# Process all streams in parallel
for stream in session.streams:
reader = client.read_rows(stream.name)
for row in reader.rows(session):
print(row)import pandas as pd
from google.cloud.bigquery_storage import BigQueryReadClient, types
client = BigQueryReadClient()
table = "projects/bigquery-public-data/datasets/new_york_trees/tables/tree_species"
requested_session = types.ReadSession(
table=table,
data_format=types.DataFormat.ARROW,
read_options=types.ReadSession.TableReadOptions(
selected_fields=["species_common_name", "fall_color"]
)
)
session = client.create_read_session(
parent="projects/your-project",
read_session=requested_session,
max_stream_count=1
)
# Read into pandas DataFrame using to_dataframe method
reader = client.read_rows(session.streams[0].name)
dataframe = reader.to_dataframe(session)
print(dataframe.head())import concurrent.futures
from google.cloud.bigquery_storage import BigQueryReadClient, types
def process_stream(client, stream_name, session):
"""Process a single stream."""
reader = client.read_rows(stream_name)
rows = []
for row in reader.rows(session):
rows.append(row)
return rows
client = BigQueryReadClient()
table = "projects/your-project/datasets/large_dataset/tables/big_table"
requested_session = types.ReadSession(
table=table,
data_format=types.DataFormat.AVRO
)
session = client.create_read_session(
parent="projects/your-project",
read_session=requested_session,
max_stream_count=4 # Request multiple streams
)
# Process streams in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for stream in session.streams:
future = executor.submit(process_stream, client, stream.name, session)
futures.append(future)
# Collect results
all_rows = []
for future in concurrent.futures.as_completed(futures):
stream_rows = future.result()
all_rows.extend(stream_rows)
print(f"Processed {len(all_rows)} total rows")from google.cloud.bigquery_storage import BigQueryReadClient, types
client = BigQueryReadClient()
# ... create session with single stream ...
original_stream = session.streams[0]
# Split stream at 50% mark
split_response = client.split_read_stream(
name=original_stream.name,
fraction=0.5
)
# Process both streams
if split_response.primary_stream:
reader1 = client.read_rows(split_response.primary_stream.name)
# Process first half...
if split_response.remainder_stream:
reader2 = client.read_rows(split_response.remainder_stream.name)
# Process second half...from google.cloud.bigquery_storage import BigQueryReadClient
from google.api_core import exceptions
client = BigQueryReadClient()
try:
session = client.create_read_session(
parent="projects/your-project",
read_session=requested_session
)
reader = client.read_rows(session.streams[0].name)
for row in reader.rows(session):
# Process row
pass
except exceptions.NotFound:
print("Table not found")
except exceptions.PermissionDenied:
print("Access denied to table or project")
except exceptions.ResourceExhausted:
print("Quota exceeded")
except Exception as e:
print(f"Unexpected error: {e}")class ReadSession:
name: str
table: str
data_format: DataFormat
read_options: TableReadOptions
streams: List[ReadStream]
estimated_total_bytes_scanned: int
estimated_row_count: int
avro_schema: AvroSchema
arrow_schema: ArrowSchema
table_modifiers: ReadSession.TableModifiers
class ReadSession.TableReadOptions:
selected_fields: List[str]
row_restriction: str
arrow_serialization_options: ArrowSerializationOptions
avro_serialization_options: AvroSerializationOptions
class ReadSession.TableModifiers:
snapshot_time: Timestampclass ReadStream:
name: strclass CreateReadSessionRequest:
parent: str
read_session: ReadSession
max_stream_count: int
class ReadRowsRequest:
read_stream: str
offset: int
class ReadRowsResponse:
avro_rows: AvroRows
arrow_record_batch: ArrowRecordBatch
row_count: int
stats: StreamStats
throttle_state: ThrottleState
class SplitReadStreamRequest:
name: str
fraction: float
class SplitReadStreamResponse:
primary_stream: ReadStream
remainder_stream: ReadStreamInstall with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery-storage