Google API client core library providing common helpers, utilities, and components for Python client libraries
—
Support for consuming bidirectional streaming gRPC operations in Google API client libraries. This module transforms gRPC's built-in request/response iterator interface into a more socket-like send/recv pattern, making it easier to handle long-running or asymmetric streams with automatic recovery capabilities.
Socket-like interface for bidirectional streaming RPCs with explicit control over stream lifecycle, request queuing, and response consumption.
class BidiRpc:
def __init__(self, start_rpc, initial_request=None, metadata=None): ...
def add_done_callback(self, callback): ...
def open(self): ...
def close(self): ...
def send(self, request): ...
def recv(self): ...
@property
def is_active(self): ...
@property
def pending_requests(self): ...from google.api_core import bidi
import grpc
# Basic bidirectional streaming
def start_stream():
return client.bidirectional_method()
# Create and use bidirectional RPC
rpc = bidi.BidiRpc(start_stream)
rpc.open()
# Send requests
rpc.send(my_request)
# Receive responses
response = rpc.recv()
# Clean up
rpc.close()Enhanced bidirectional RPC with automatic recovery from transient errors and configurable retry logic.
class ResumableBidiRpc(BidiRpc):
def __init__(self, start_rpc, should_recover, should_terminate=None, initial_request=None, metadata=None, throttle_reopen=False): ...
# Inherits all BidiRpc methods with enhanced error handlingfrom google.api_core import exceptions
from google.api_core import bidi
# Define recovery predicate
def should_recover(exception):
return isinstance(exception, (
exceptions.InternalServerError,
exceptions.ServiceUnavailable,
exceptions.DeadlineExceeded
))
# Create resumable RPC with recovery
rpc = bidi.ResumableBidiRpc(
start_stream,
should_recover=should_recover,
throttle_reopen=True # Rate limit reconnections
)Runs bidirectional stream consumption in a separate background thread with callback-based response handling.
class BackgroundConsumer:
def __init__(self, bidi_rpc, on_response, on_fatal_exception=None): ...
def start(self): ...
def stop(self): ...
def pause(self): ...
def resume(self): ...
@property
def is_active(self): ...
@property
def is_paused(self): ...# Response handler
def handle_response(response):
print(f"Received: {response}")
# Error handler
def handle_error(exception):
print(f"Fatal error: {exception}")
# Setup background consumer
consumer = bidi.BackgroundConsumer(
rpc,
on_response=handle_response,
on_fatal_exception=handle_error
)
# Start background processing
consumer.start()
# Send requests while responses are processed in background
rpc.send(request1)
rpc.send(request2)
# Control flow
consumer.pause() # Pause response processing
consumer.resume() # Resume response processing
consumer.stop() # Stop and cleanupInternal queue-based request management with RPC lifecycle coordination and graceful shutdown handling.
# Internal helper for request generation
class _RequestQueueGenerator:
def __init__(self, queue, period=1, initial_request=None): ...Thread-safe rate limiting for stream operations using sliding time windows.
class _Throttle:
def __init__(self, access_limit, time_window): ...
def __enter__(self): ...
def __exit__(self, *_): ...from google.api_core import bidi
# For basic bidirectional streaming
rpc = bidi.BidiRpc(start_rpc_func)
# For resumable streaming with recovery
rpc = bidi.ResumableBidiRpc(start_rpc_func, should_recover_func)
# For background consumption
consumer = bidi.BackgroundConsumer(rpc, response_handler)from typing import Callable, Optional, Sequence, Tuple, Union
import datetime
import queue as queue_module
import grpc
# Type aliases
StartRpcCallable = grpc.StreamStreamMultiCallable
ResponseCallback = Callable[[Any], None]
ErrorCallback = Callable[[Exception], None]
RecoveryPredicate = Callable[[Exception], bool]
TerminationPredicate = Callable[[Exception], bool]
DoneCallback = Callable[[grpc.Future], None]
# Common parameters
InitialRequest = Union[Any, Callable[[], Any]] # protobuf.Message or callable
Metadata = Sequence[Tuple[str, str]]
TimeWindow = datetime.timedeltaThe module provides comprehensive error handling for streaming operations:
ResumableBidiRpcshould_recover and should_terminate functionsBidiRpc or ResumableBidiRpc with gRPC methodopen() to establish the streamsend() and recv() for bidirectional communicationResumableBidiRpc) or manual error handlingclose() to properly terminate the streamFor background processing, the lifecycle is managed by BackgroundConsumer with start(), pause/resume controls, and stop() for cleanup.
Install with Tessl CLI
npx tessl i tessl/pypi-google-api-core