CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-api-core

Google API client core library providing common helpers, utilities, and components for Python client libraries

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transport.mddocs/

Transport Helpers

Utilities for gRPC and REST transport protocols, including channel creation, method wrapping, and streaming support. These helpers provide consistent interfaces for different transport protocols used by Google APIs.

Capabilities

gRPC Channel Management

Functions for creating and configuring gRPC channels with authentication and transport options.

def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):
    """
    Create a gRPC channel with authentication and configuration.
    
    Args:
        target (str): Target server address (host:port)
        credentials: Google Auth credentials object
        scopes (List[str], optional): OAuth 2.0 scopes for authentication
        ssl_credentials: gRPC SSL credentials
        credentials_file (str, optional): Path to service account file
        quota_project_id (str, optional): Project ID for quota attribution
        default_scopes (List[str], optional): Default scopes if none provided
        default_host (str, optional): Default host for the service
        compression: gRPC compression algorithm
        **kwargs: Additional channel arguments
        
    Returns:
        grpc.Channel: Configured gRPC channel
    """

async def create_channel_async(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):
    """
    Create an async gRPC channel with authentication and configuration.
    
    Args:
        target (str): Target server address (host:port)
        credentials: Google Auth credentials object
        scopes (List[str], optional): OAuth 2.0 scopes for authentication
        ssl_credentials: gRPC SSL credentials
        credentials_file (str, optional): Path to service account file
        quota_project_id (str, optional): Project ID for quota attribution
        default_scopes (List[str], optional): Default scopes if none provided
        default_host (str, optional): Default host for the service
        compression: gRPC compression algorithm
        **kwargs: Additional channel arguments
        
    Returns:
        grpc.aio.Channel: Configured async gRPC channel
    """

Method Wrapping

Functions for applying retry, timeout, and other decorators to gRPC and REST methods.

def wrap_method(func, default_retry=None, default_timeout=None, client_info=None):
    """
    Apply retry, timeout, and client info to a gRPC method.
    
    Args:
        func (Callable): gRPC method to wrap
        default_retry (Retry, optional): Default retry configuration
        default_timeout (Timeout, optional): Default timeout configuration
        client_info (ClientInfo, optional): Client information for headers
        
    Returns:
        Callable: Wrapped method with applied decorators
    """

async def wrap_method_async(func, default_retry=None, default_timeout=None, client_info=None):
    """
    Apply async retry, timeout, and client info to an async gRPC method.
    
    Args:
        func (Callable): Async gRPC method to wrap
        default_retry (AsyncRetry, optional): Default async retry configuration
        default_timeout (Timeout, optional): Default timeout configuration
        client_info (ClientInfo, optional): Client information for headers
        
    Returns:
        Callable: Wrapped async method with applied decorators
    """

Streaming Support

Classes and utilities for handling gRPC streaming responses.

class GrpcStream:
    """
    Wrapper for gRPC streaming responses with additional functionality.
    
    Args:
        wrapped: Original gRPC stream iterator
    """
    def __init__(self, wrapped): ...
    
    def __iter__(self):
        """Return iterator for stream."""
        return self
    
    def __next__(self):
        """Get next item from stream."""
    
    def cancel(self):
        """Cancel the streaming operation."""
    
    @property
    def cancelled(self):
        """Check if stream was cancelled."""

class _StreamingResponseIterator:
    """Iterator wrapper for gRPC streaming responses."""
    def __init__(self, wrapped): ...
    def __iter__(self): ...
    def __next__(self): ...

Transport Constants

Constants indicating availability of optional transport dependencies.

# Boolean indicating if grpc_gcp is available
HAS_GRPC_GCP = True  # or False based on installation

# Protobuf version string  
PROTOBUF_VERSION = "4.25.1"

Usage Examples

Creating gRPC Channels

from google.api_core import grpc_helpers
from google.auth import default

# Create channel with default credentials
credentials, project = default()
channel = grpc_helpers.create_channel(
    target="googleapis.com:443",
    credentials=credentials,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Create channel with service account
channel = grpc_helpers.create_channel(
    target="googleapis.com:443", 
    credentials_file="/path/to/service-account.json",
    quota_project_id="my-project-id"
)

# Create channel with custom options
channel = grpc_helpers.create_channel(
    target="googleapis.com:443",
    credentials=credentials,
    compression=grpc.Compression.Gzip,
    options=[
        ("grpc.keepalive_time_ms", 30000),
        ("grpc.keepalive_timeout_ms", 5000),
        ("grpc.http2.max_pings_without_data", 0)
    ]
)

Method Wrapping with Retry and Timeout

from google.api_core import grpc_helpers
from google.api_core import retry
from google.api_core import timeout
from google.api_core import client_info
import grpc

# Create retry and timeout configurations
retry_config = retry.Retry(
    predicate=retry.if_exception_type(
        grpc.StatusCode.UNAVAILABLE,
        grpc.StatusCode.DEADLINE_EXCEEDED
    ),
    initial=1.0,
    maximum=60.0,
    multiplier=2.0
)

timeout_config = timeout.TimeToDeadlineTimeout(deadline=300.0)

client_info_obj = client_info.ClientInfo(
    client_library_name="my-client",
    client_library_version="1.0.0"
)

# Wrap a gRPC method
def create_wrapped_method(stub_method):
    return grpc_helpers.wrap_method(
        stub_method,
        default_retry=retry_config,
        default_timeout=timeout_config,
        client_info=client_info_obj
    )

# Use with a gRPC client
from my_service_pb2_grpc import MyServiceStub

channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)
stub = MyServiceStub(channel)

# Wrap methods with retry/timeout
wrapped_method = create_wrapped_method(stub.GetData)

# Call wrapped method - retry and timeout applied automatically
try:
    response = wrapped_method(request)
    print("Success:", response)
except grpc.RpcError as e:
    print("gRPC error:", e)

Async gRPC Usage

import asyncio
from google.api_core import grpc_helpers_async
from google.api_core import retry
from google.auth import default

async def async_grpc_example():
    # Create async credentials
    credentials, project = default()
    
    # Create async gRPC channel
    channel = await grpc_helpers_async.create_channel(
        target="googleapis.com:443",
        credentials=credentials,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    
    # Create async retry configuration
    async_retry = retry.AsyncRetry(
        predicate=retry.if_exception_type(
            grpc.StatusCode.UNAVAILABLE,
            grpc.StatusCode.INTERNAL
        ),
        initial=1.0,
        maximum=30.0
    )
    
    # Use async stub with wrapped methods
    from my_service_pb2_grpc import MyServiceStub
    
    stub = MyServiceStub(channel)
    wrapped_method = grpc_helpers_async.wrap_method(
        stub.GetDataAsync,
        default_retry=async_retry
    )
    
    try:
        response = await wrapped_method(request)
        print("Async response:", response)
    except grpc.RpcError as e:
        print("Async gRPC error:", e)
    finally:
        await channel.close()

asyncio.run(async_grpc_example())

Streaming Operations

from google.api_core import grpc_helpers
import grpc

# Create streaming client
channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)
stub = MyServiceStub(channel)

# Server streaming
def handle_server_stream():
    request = StreamRequest(query="data")
    stream = stub.StreamData(request)
    
    # Wrap stream for additional functionality
    wrapped_stream = grpc_helpers.GrpcStream(stream)
    
    try:
        for response in wrapped_stream:
            print("Streamed data:", response.data)
            
            # Check for cancellation
            if some_cancellation_condition():
                wrapped_stream.cancel()
                break
                
    except grpc.RpcError as e:
        if wrapped_stream.cancelled:
            print("Stream was cancelled")
        else:
            print("Stream error:", e)

# Bidirectional streaming
def handle_bidirectional_stream():
    def request_generator():
        for i in range(10):
            yield BidiRequest(id=i, data=f"request_{i}")
    
    stream = stub.BidirectionalStream(request_generator())
    wrapped_stream = grpc_helpers.GrpcStream(stream)
    
    try:
        for response in wrapped_stream:
            print("Bidirectional response:", response.result)
    except grpc.RpcError as e:
        print("Bidirectional stream error:", e)

handle_server_stream()
handle_bidirectional_stream()

Custom Transport Configuration

from google.api_core import grpc_helpers
import grpc

def create_custom_channel():
    """Create gRPC channel with custom configuration."""
    
    # Custom gRPC options for performance tuning
    options = [
        # Connection keepalive settings
        ("grpc.keepalive_time_ms", 30000),
        ("grpc.keepalive_timeout_ms", 5000),
        ("grpc.keepalive_permit_without_calls", True),
        
        # Message size limits
        ("grpc.max_send_message_length", 100 * 1024 * 1024),  # 100MB
        ("grpc.max_receive_message_length", 100 * 1024 * 1024),  # 100MB
        
        # Connection pool settings
        ("grpc.http2.max_pings_without_data", 0),
        ("grpc.http2.min_time_between_pings_ms", 10000),
    ]
    
    # Create channel with custom options
    channel = grpc_helpers.create_channel(
        target="api.example.com:443",
        credentials=credentials,
        compression=grpc.Compression.Gzip,
        options=options
    )
    
    return channel

# Use custom channel
custom_channel = create_custom_channel()
stub = MyServiceStub(custom_channel)

Install with Tessl CLI

npx tessl i tessl/pypi-google-api-core

docs

bidirectional-streaming.md

client-config.md

datetime.md

exceptions.md

gapic-framework.md

iam-policies.md

index.md

operations.md

page-iteration.md

path-templates.md

protobuf-helpers.md

retry.md

timeout.md

transport.md

universe-domain.md

tile.json