CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery-storage

Google Cloud Bigquery Storage API client library for high-performance streaming read/write access to BigQuery data

Pending
Overview
Eval results
Files

reading-data.mddocs/

Reading Data

High-performance streaming reads from BigQuery tables using the BigQuery Storage API. Supports parallel processing, column selection, row filtering, and multiple data formats with direct conversion to pandas DataFrames and Apache Arrow.

Capabilities

BigQuery Read Client

Main client for reading data from BigQuery tables with streaming capabilities and format flexibility.

class BigQueryReadClient:
    def __init__(self, **kwargs):
        """
        Initialize BigQuery Read Client.
        
        Parameters:
        - credentials: Google Cloud credentials
        - project: Default project ID
        - client_info: Client library information
        """

    def create_read_session(
        self,
        parent: str,
        read_session: ReadSession,
        max_stream_count: int = None,
        **kwargs
    ) -> ReadSession:
        """
        Create a new read session for streaming data from BigQuery.
        
        Parameters: 
        - parent: Project ID in format "projects/{project_id}"
        - read_session: ReadSession configuration with table and options
        - max_stream_count: Maximum number of parallel streams (optional)
        
        Returns:
        ReadSession with stream information and metadata
        """

    def read_rows(
        self,
        name: str,
        offset: int = 0,
        **kwargs
    ) -> ReadRowsStream:
        """
        Read rows from a specific stream.
        
        Parameters:
        - name: Stream name from ReadSession.streams[].name
        - offset: Starting offset for reading (optional)
        
        Returns:
        ReadRowsStream iterator for processing messages
        """

    def split_read_stream(
        self,
        name: str,
        fraction: float = None,
        **kwargs
    ) -> SplitReadStreamResponse:
        """
        Split a read stream into two streams for parallel processing.
        
        Parameters:
        - name: Stream name to split
        - fraction: Split point as fraction (0.0 to 1.0)
        
        Returns:
        SplitReadStreamResponse with primary and remainder streams
        """

### BigQuery Read Async Client

Async version of BigQueryReadClient with async methods for non-blocking operations.

```python { .api }
class BigQueryReadAsyncClient:
    def __init__(self, **kwargs):
        """
        Initialize BigQuery Read Async Client.
        
        Parameters:
        - credentials: Google Cloud credentials
        - project: Default project ID
        - client_info: Client library information
        """

    async def create_read_session(
        self,
        parent: str,
        read_session: ReadSession,
        max_stream_count: int = None,
        **kwargs
    ) -> ReadSession:
        """
        Create a new read session for streaming data from BigQuery (async).
        
        Parameters: 
        - parent: Project ID in format "projects/{project_id}"
        - read_session: ReadSession configuration with table and options
        - max_stream_count: Maximum number of parallel streams (optional)
        
        Returns:
        ReadSession with stream information and metadata
        """

    def read_rows(
        self,
        name: str,
        offset: int = 0,
        **kwargs
    ) -> ReadRowsStream:
        """
        Read rows from a specific stream (sync method on async client).
        
        Parameters:
        - name: Stream name from ReadSession.streams[].name
        - offset: Starting offset for reading (optional)
        
        Returns:
        ReadRowsStream iterator for processing messages
        """

    async def split_read_stream(
        self,
        name: str,
        fraction: float = None,
        **kwargs
    ) -> SplitReadStreamResponse:
        """
        Split a read stream into two streams for parallel processing (async).
        
        Parameters:
        - name: Stream name to split
        - fraction: Split point as fraction (0.0 to 1.0)
        
        Returns:
        SplitReadStreamResponse with primary and remainder streams
        """

Read Rows Stream

Helper class that wraps read stream responses and provides convenient data parsing methods.

class ReadRowsStream:
    def __iter__(self) -> Iterator[ReadRowsResponse]:
        """Iterate over ReadRowsResponse messages."""

    def rows(self, read_session: ReadSession = None) -> Iterator[dict]:
        """
        Parse stream messages into row dictionaries.
        
        Parameters:
        - read_session: ReadSession for schema information (required for Avro)
        
        Returns:
        Iterator of row dictionaries
        
        Note: Requires fastavro for Avro format support
        """

    def to_arrow(self, read_session: ReadSession = None):
        """
        Convert stream to Apache Arrow format.
        
        Parameters:
        - read_session: ReadSession for schema information
        
        Returns:
        Apache Arrow Table
        
        Note: Requires pyarrow for Arrow format support
        """

    def to_dataframe(
        self,
        read_session: ReadSession = None,
        dtypes: dict = None
    ):
        """
        Convert stream to pandas DataFrame.
        
        Parameters:
        - read_session: ReadSession for schema information
        - dtypes: Column data type specifications
        
        Returns:
        pandas DataFrame
        
        Note: Requires pandas for DataFrame support
        """

Path Helper Methods

Utilities for constructing and parsing BigQuery resource paths.

class BigQueryReadClient:
    @staticmethod
    def read_session_path(project: str, location: str, session: str) -> str:
        """Construct read session resource path."""

    @staticmethod
    def parse_read_session_path(path: str) -> dict:
        """Parse read session path into components."""

    @staticmethod
    def read_stream_path(
        project: str, 
        location: str, 
        session: str, 
        stream: str
    ) -> str:
        """Construct read stream resource path."""

    @staticmethod
    def parse_read_stream_path(path: str) -> dict:
        """Parse read stream path into components."""

    @staticmethod
    def table_path(project: str, dataset: str, table: str) -> str:
        """Construct BigQuery table resource path."""

    @staticmethod
    def parse_table_path(path: str) -> dict:
        """Parse table path into project, dataset, table components."""

Usage Examples

Basic Read Session

from google.cloud.bigquery_storage import BigQueryReadClient, types

# Create client
client = BigQueryReadClient()

# Configure table and session
table = "projects/bigquery-public-data/datasets/usa_names/tables/usa_1910_current"
requested_session = types.ReadSession(
    table=table,
    data_format=types.DataFormat.AVRO
)

# Create session with single stream
session = client.create_read_session(
    parent="projects/your-project",
    read_session=requested_session,
    max_stream_count=1
)

# Read data
reader = client.read_rows(session.streams[0].name)
for row in reader.rows(session):
    print(f"Name: {row['name']}, State: {row['state']}")

Column Selection and Filtering

from google.cloud.bigquery_storage import BigQueryReadClient, types

client = BigQueryReadClient()
table = "projects/your-project/datasets/your_dataset/tables/your_table"

# Configure read options
read_options = types.ReadSession.TableReadOptions(
    selected_fields=["name", "age", "city"],
    row_restriction='age > 18 AND city = "New York"'
)

requested_session = types.ReadSession(
    table=table,
    data_format=types.DataFormat.ARROW,
    read_options=read_options
)

session = client.create_read_session(
    parent="projects/your-project",
    read_session=requested_session
)

# Process all streams in parallel
for stream in session.streams:
    reader = client.read_rows(stream.name)
    for row in reader.rows(session):
        print(row)

Convert to DataFrame

import pandas as pd
from google.cloud.bigquery_storage import BigQueryReadClient, types

client = BigQueryReadClient()
table = "projects/bigquery-public-data/datasets/new_york_trees/tables/tree_species"

requested_session = types.ReadSession(
    table=table,
    data_format=types.DataFormat.ARROW,
    read_options=types.ReadSession.TableReadOptions(
        selected_fields=["species_common_name", "fall_color"]
    )
)

session = client.create_read_session(
    parent="projects/your-project",
    read_session=requested_session,
    max_stream_count=1
)

# Read into pandas DataFrame using to_dataframe method
reader = client.read_rows(session.streams[0].name)
dataframe = reader.to_dataframe(session)

print(dataframe.head())

Parallel Stream Processing

import concurrent.futures
from google.cloud.bigquery_storage import BigQueryReadClient, types

def process_stream(client, stream_name, session):
    """Process a single stream."""
    reader = client.read_rows(stream_name)
    rows = []
    for row in reader.rows(session):
        rows.append(row)
    return rows

client = BigQueryReadClient()
table = "projects/your-project/datasets/large_dataset/tables/big_table"

requested_session = types.ReadSession(
    table=table,
    data_format=types.DataFormat.AVRO
)

session = client.create_read_session(
    parent="projects/your-project",
    read_session=requested_session,
    max_stream_count=4  # Request multiple streams
)

# Process streams in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futures = []
    for stream in session.streams:
        future = executor.submit(process_stream, client, stream.name, session)
        futures.append(future)
    
    # Collect results
    all_rows = []
    for future in concurrent.futures.as_completed(futures):
        stream_rows = future.result()
        all_rows.extend(stream_rows)

print(f"Processed {len(all_rows)} total rows")

Stream Splitting

from google.cloud.bigquery_storage import BigQueryReadClient, types

client = BigQueryReadClient()
# ... create session with single stream ...

original_stream = session.streams[0]

# Split stream at 50% mark
split_response = client.split_read_stream(
    name=original_stream.name,
    fraction=0.5
)

# Process both streams
if split_response.primary_stream:
    reader1 = client.read_rows(split_response.primary_stream.name)
    # Process first half...

if split_response.remainder_stream:
    reader2 = client.read_rows(split_response.remainder_stream.name)
    # Process second half...

Error Handling

from google.cloud.bigquery_storage import BigQueryReadClient
from google.api_core import exceptions

client = BigQueryReadClient()

try:
    session = client.create_read_session(
        parent="projects/your-project",
        read_session=requested_session
    )
    
    reader = client.read_rows(session.streams[0].name)
    for row in reader.rows(session):
        # Process row
        pass
        
except exceptions.NotFound:
    print("Table not found")
except exceptions.PermissionDenied:
    print("Access denied to table or project")
except exceptions.ResourceExhausted:
    print("Quota exceeded")
except Exception as e:
    print(f"Unexpected error: {e}")

Types

ReadSession

class ReadSession:
    name: str
    table: str
    data_format: DataFormat
    read_options: TableReadOptions
    streams: List[ReadStream]
    estimated_total_bytes_scanned: int
    estimated_row_count: int
    avro_schema: AvroSchema
    arrow_schema: ArrowSchema
    table_modifiers: ReadSession.TableModifiers

class ReadSession.TableReadOptions:
    selected_fields: List[str]
    row_restriction: str
    arrow_serialization_options: ArrowSerializationOptions
    avro_serialization_options: AvroSerializationOptions

class ReadSession.TableModifiers:
    snapshot_time: Timestamp

ReadStream

class ReadStream:
    name: str

Request/Response Types

class CreateReadSessionRequest:
    parent: str
    read_session: ReadSession
    max_stream_count: int

class ReadRowsRequest:
    read_stream: str
    offset: int

class ReadRowsResponse:
    avro_rows: AvroRows
    arrow_record_batch: ArrowRecordBatch
    row_count: int
    stats: StreamStats
    throttle_state: ThrottleState

class SplitReadStreamRequest:
    name: str
    fraction: float

class SplitReadStreamResponse:
    primary_stream: ReadStream
    remainder_stream: ReadStream

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery-storage

docs

index.md

metastore-services.md

reading-data.md

types-schemas.md

writing-data.md

tile.json