CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-api-core

Google API client core library providing common helpers, utilities, and components for Python client libraries

Pending
Overview
Eval results
Files

bidirectional-streaming.mddocs/

Bidirectional Streaming

Support for consuming bidirectional streaming gRPC operations in Google API client libraries. This module transforms gRPC's built-in request/response iterator interface into a more socket-like send/recv pattern, making it easier to handle long-running or asymmetric streams with automatic recovery capabilities.

Capabilities

Core Bidirectional RPC

Socket-like interface for bidirectional streaming RPCs with explicit control over stream lifecycle, request queuing, and response consumption.

class BidiRpc:
    def __init__(self, start_rpc, initial_request=None, metadata=None): ...
    
    def add_done_callback(self, callback): ...
    def open(self): ...
    def close(self): ...
    def send(self, request): ...
    def recv(self): ...
    
    @property
    def is_active(self): ...
    
    @property
    def pending_requests(self): ...

Usage Examples

from google.api_core import bidi
import grpc

# Basic bidirectional streaming
def start_stream():
    return client.bidirectional_method()

# Create and use bidirectional RPC
rpc = bidi.BidiRpc(start_stream)
rpc.open()

# Send requests
rpc.send(my_request)

# Receive responses
response = rpc.recv()

# Clean up
rpc.close()

Resumable Bidirectional RPC

Enhanced bidirectional RPC with automatic recovery from transient errors and configurable retry logic.

class ResumableBidiRpc(BidiRpc):
    def __init__(self, start_rpc, should_recover, should_terminate=None, initial_request=None, metadata=None, throttle_reopen=False): ...
    
    # Inherits all BidiRpc methods with enhanced error handling

Error Recovery Configuration

from google.api_core import exceptions
from google.api_core import bidi

# Define recovery predicate
def should_recover(exception):
    return isinstance(exception, (
        exceptions.InternalServerError,
        exceptions.ServiceUnavailable,
        exceptions.DeadlineExceeded
    ))

# Create resumable RPC with recovery
rpc = bidi.ResumableBidiRpc(
    start_stream,
    should_recover=should_recover,
    throttle_reopen=True  # Rate limit reconnections
)

Background Stream Consumer

Runs bidirectional stream consumption in a separate background thread with callback-based response handling.

class BackgroundConsumer:
    def __init__(self, bidi_rpc, on_response, on_fatal_exception=None): ...
    
    def start(self): ...
    def stop(self): ...
    def pause(self): ...
    def resume(self): ...
    
    @property
    def is_active(self): ...
    
    @property
    def is_paused(self): ...

Background Processing Example

# Response handler
def handle_response(response):
    print(f"Received: {response}")

# Error handler
def handle_error(exception):
    print(f"Fatal error: {exception}")

# Setup background consumer
consumer = bidi.BackgroundConsumer(
    rpc,
    on_response=handle_response,
    on_fatal_exception=handle_error
)

# Start background processing
consumer.start()

# Send requests while responses are processed in background
rpc.send(request1)
rpc.send(request2)

# Control flow
consumer.pause()   # Pause response processing
consumer.resume()  # Resume response processing
consumer.stop()    # Stop and cleanup

Request Queue Management

Internal queue-based request management with RPC lifecycle coordination and graceful shutdown handling.

# Internal helper for request generation
class _RequestQueueGenerator:
    def __init__(self, queue, period=1, initial_request=None): ...

Rate Limiting

Thread-safe rate limiting for stream operations using sliding time windows.

class _Throttle:
    def __init__(self, access_limit, time_window): ...
    
    def __enter__(self): ...
    def __exit__(self, *_): ...

Import Patterns

from google.api_core import bidi

# For basic bidirectional streaming
rpc = bidi.BidiRpc(start_rpc_func)

# For resumable streaming with recovery
rpc = bidi.ResumableBidiRpc(start_rpc_func, should_recover_func)

# For background consumption
consumer = bidi.BackgroundConsumer(rpc, response_handler)

Types

from typing import Callable, Optional, Sequence, Tuple, Union
import datetime
import queue as queue_module
import grpc

# Type aliases
StartRpcCallable = grpc.StreamStreamMultiCallable
ResponseCallback = Callable[[Any], None]
ErrorCallback = Callable[[Exception], None]
RecoveryPredicate = Callable[[Exception], bool]
TerminationPredicate = Callable[[Exception], bool]
DoneCallback = Callable[[grpc.Future], None]

# Common parameters
InitialRequest = Union[Any, Callable[[], Any]]  # protobuf.Message or callable
Metadata = Sequence[Tuple[str, str]]
TimeWindow = datetime.timedelta

Error Handling

The module provides comprehensive error handling for streaming operations:

  • Transient Error Recovery: Automatic retry on recoverable errors in ResumableBidiRpc
  • User-Defined Recovery Logic: Custom should_recover and should_terminate functions
  • Rate Limiting: Throttling of reconnection attempts to prevent overwhelming services
  • Thread Safety: All operations are thread-safe with proper locking mechanisms
  • Graceful Shutdown: Proper cleanup and resource management on errors and normal termination

Stream Lifecycle

  1. Initialization: Create BidiRpc or ResumableBidiRpc with gRPC method
  2. Opening: Call open() to establish the stream
  3. Communication: Use send() and recv() for bidirectional communication
  4. Error Handling: Automatic recovery (if using ResumableBidiRpc) or manual error handling
  5. Cleanup: Call close() to properly terminate the stream

For background processing, the lifecycle is managed by BackgroundConsumer with start(), pause/resume controls, and stop() for cleanup.

Install with Tessl CLI

npx tessl i tessl/pypi-google-api-core

docs

bidirectional-streaming.md

client-config.md

datetime.md

exceptions.md

gapic-framework.md

iam-policies.md

index.md

operations.md

page-iteration.md

path-templates.md

protobuf-helpers.md

retry.md

timeout.md

transport.md

universe-domain.md

tile.json