Google API client core library providing common helpers, utilities, and components for Python client libraries
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utilities for gRPC and REST transport protocols, including channel creation, method wrapping, and streaming support. These helpers provide consistent interfaces for different transport protocols used by Google APIs.
Functions for creating and configuring gRPC channels with authentication and transport options.
def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):
"""
Create a gRPC channel with authentication and configuration.
Args:
target (str): Target server address (host:port)
credentials: Google Auth credentials object
scopes (List[str], optional): OAuth 2.0 scopes for authentication
ssl_credentials: gRPC SSL credentials
credentials_file (str, optional): Path to service account file
quota_project_id (str, optional): Project ID for quota attribution
default_scopes (List[str], optional): Default scopes if none provided
default_host (str, optional): Default host for the service
compression: gRPC compression algorithm
**kwargs: Additional channel arguments
Returns:
grpc.Channel: Configured gRPC channel
"""
async def create_channel_async(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):
"""
Create an async gRPC channel with authentication and configuration.
Args:
target (str): Target server address (host:port)
credentials: Google Auth credentials object
scopes (List[str], optional): OAuth 2.0 scopes for authentication
ssl_credentials: gRPC SSL credentials
credentials_file (str, optional): Path to service account file
quota_project_id (str, optional): Project ID for quota attribution
default_scopes (List[str], optional): Default scopes if none provided
default_host (str, optional): Default host for the service
compression: gRPC compression algorithm
**kwargs: Additional channel arguments
Returns:
grpc.aio.Channel: Configured async gRPC channel
"""Functions for applying retry, timeout, and other decorators to gRPC and REST methods.
def wrap_method(func, default_retry=None, default_timeout=None, client_info=None):
"""
Apply retry, timeout, and client info to a gRPC method.
Args:
func (Callable): gRPC method to wrap
default_retry (Retry, optional): Default retry configuration
default_timeout (Timeout, optional): Default timeout configuration
client_info (ClientInfo, optional): Client information for headers
Returns:
Callable: Wrapped method with applied decorators
"""
async def wrap_method_async(func, default_retry=None, default_timeout=None, client_info=None):
"""
Apply async retry, timeout, and client info to an async gRPC method.
Args:
func (Callable): Async gRPC method to wrap
default_retry (AsyncRetry, optional): Default async retry configuration
default_timeout (Timeout, optional): Default timeout configuration
client_info (ClientInfo, optional): Client information for headers
Returns:
Callable: Wrapped async method with applied decorators
"""Classes and utilities for handling gRPC streaming responses.
class GrpcStream:
"""
Wrapper for gRPC streaming responses with additional functionality.
Args:
wrapped: Original gRPC stream iterator
"""
def __init__(self, wrapped): ...
def __iter__(self):
"""Return iterator for stream."""
return self
def __next__(self):
"""Get next item from stream."""
def cancel(self):
"""Cancel the streaming operation."""
@property
def cancelled(self):
"""Check if stream was cancelled."""
class _StreamingResponseIterator:
"""Iterator wrapper for gRPC streaming responses."""
def __init__(self, wrapped): ...
def __iter__(self): ...
def __next__(self): ...Constants indicating availability of optional transport dependencies.
# Boolean indicating if grpc_gcp is available
HAS_GRPC_GCP = True # or False based on installation
# Protobuf version string
PROTOBUF_VERSION = "4.25.1"from google.api_core import grpc_helpers
from google.auth import default
# Create channel with default credentials
credentials, project = default()
channel = grpc_helpers.create_channel(
target="googleapis.com:443",
credentials=credentials,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Create channel with service account
channel = grpc_helpers.create_channel(
target="googleapis.com:443",
credentials_file="/path/to/service-account.json",
quota_project_id="my-project-id"
)
# Create channel with custom options
channel = grpc_helpers.create_channel(
target="googleapis.com:443",
credentials=credentials,
compression=grpc.Compression.Gzip,
options=[
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 5000),
("grpc.http2.max_pings_without_data", 0)
]
)from google.api_core import grpc_helpers
from google.api_core import retry
from google.api_core import timeout
from google.api_core import client_info
import grpc
# Create retry and timeout configurations
retry_config = retry.Retry(
predicate=retry.if_exception_type(
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED
),
initial=1.0,
maximum=60.0,
multiplier=2.0
)
timeout_config = timeout.TimeToDeadlineTimeout(deadline=300.0)
client_info_obj = client_info.ClientInfo(
client_library_name="my-client",
client_library_version="1.0.0"
)
# Wrap a gRPC method
def create_wrapped_method(stub_method):
return grpc_helpers.wrap_method(
stub_method,
default_retry=retry_config,
default_timeout=timeout_config,
client_info=client_info_obj
)
# Use with a gRPC client
from my_service_pb2_grpc import MyServiceStub
channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)
stub = MyServiceStub(channel)
# Wrap methods with retry/timeout
wrapped_method = create_wrapped_method(stub.GetData)
# Call wrapped method - retry and timeout applied automatically
try:
response = wrapped_method(request)
print("Success:", response)
except grpc.RpcError as e:
print("gRPC error:", e)import asyncio
from google.api_core import grpc_helpers_async
from google.api_core import retry
from google.auth import default
async def async_grpc_example():
# Create async credentials
credentials, project = default()
# Create async gRPC channel
channel = await grpc_helpers_async.create_channel(
target="googleapis.com:443",
credentials=credentials,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Create async retry configuration
async_retry = retry.AsyncRetry(
predicate=retry.if_exception_type(
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.INTERNAL
),
initial=1.0,
maximum=30.0
)
# Use async stub with wrapped methods
from my_service_pb2_grpc import MyServiceStub
stub = MyServiceStub(channel)
wrapped_method = grpc_helpers_async.wrap_method(
stub.GetDataAsync,
default_retry=async_retry
)
try:
response = await wrapped_method(request)
print("Async response:", response)
except grpc.RpcError as e:
print("Async gRPC error:", e)
finally:
await channel.close()
asyncio.run(async_grpc_example())from google.api_core import grpc_helpers
import grpc
# Create streaming client
channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)
stub = MyServiceStub(channel)
# Server streaming
def handle_server_stream():
request = StreamRequest(query="data")
stream = stub.StreamData(request)
# Wrap stream for additional functionality
wrapped_stream = grpc_helpers.GrpcStream(stream)
try:
for response in wrapped_stream:
print("Streamed data:", response.data)
# Check for cancellation
if some_cancellation_condition():
wrapped_stream.cancel()
break
except grpc.RpcError as e:
if wrapped_stream.cancelled:
print("Stream was cancelled")
else:
print("Stream error:", e)
# Bidirectional streaming
def handle_bidirectional_stream():
def request_generator():
for i in range(10):
yield BidiRequest(id=i, data=f"request_{i}")
stream = stub.BidirectionalStream(request_generator())
wrapped_stream = grpc_helpers.GrpcStream(stream)
try:
for response in wrapped_stream:
print("Bidirectional response:", response.result)
except grpc.RpcError as e:
print("Bidirectional stream error:", e)
handle_server_stream()
handle_bidirectional_stream()from google.api_core import grpc_helpers
import grpc
def create_custom_channel():
"""Create gRPC channel with custom configuration."""
# Custom gRPC options for performance tuning
options = [
# Connection keepalive settings
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 5000),
("grpc.keepalive_permit_without_calls", True),
# Message size limits
("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB
("grpc.max_receive_message_length", 100 * 1024 * 1024), # 100MB
# Connection pool settings
("grpc.http2.max_pings_without_data", 0),
("grpc.http2.min_time_between_pings_ms", 10000),
]
# Create channel with custom options
channel = grpc_helpers.create_channel(
target="api.example.com:443",
credentials=credentials,
compression=grpc.Compression.Gzip,
options=options
)
return channel
# Use custom channel
custom_channel = create_custom_channel()
stub = MyServiceStub(custom_channel)Install with Tessl CLI
npx tessl i tessl/pypi-google-api-core