CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-grpcio

HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems

Pending
Overview
Eval results
Files

channel-management.mddocs/

Channel Management

Channel management provides client-side connection handling with support for secure and insecure connections, connectivity monitoring, interceptor chains, and comprehensive configuration options for production deployments.

Capabilities

Channel Creation

Creates channels for client-side RPC communication with support for insecure and secure connections, including SSL/TLS, ALTS, and custom credentials.

def insecure_channel(target: str, options=None, compression=None) -> Channel:
    """
    Creates an insecure Channel to a server.

    Parameters:
    - target: The server address (e.g., 'localhost:50051', 'dns:///example.com:443')
    - options: Optional list of key-value pairs for channel configuration
    - compression: Optional compression method (grpc.compression.Gzip, etc.)

    Returns:
    Channel: A thread-safe Channel object for making RPCs
    """

def secure_channel(target: str, credentials: ChannelCredentials, options=None, compression=None) -> Channel:
    """
    Creates a secure Channel to a server.

    Parameters:
    - target: The server address
    - credentials: A ChannelCredentials instance for authentication
    - options: Optional list of key-value pairs for channel configuration
    - compression: Optional compression method

    Returns:
    Channel: A thread-safe secure Channel object
    
    Raises:
    ValueError: If called with insecure credentials
    """

Usage Examples:

# Basic insecure channel
channel = grpc.insecure_channel('localhost:50051')

# Secure channel with SSL
credentials = grpc.ssl_channel_credentials()
secure_chan = grpc.secure_channel('secure-server.com:443', credentials)

# Channel with custom options
options = [
    ('grpc.keepalive_time_ms', 30000),
    ('grpc.max_receive_message_length', 4 * 1024 * 1024),
]
channel = grpc.insecure_channel('localhost:50051', options=options)

# Channel with compression
channel = grpc.insecure_channel('localhost:50051', compression=grpc.compression.Gzip)

Channel Interception

Applies interceptor chains to channels for cross-cutting concerns like logging, metrics, authentication, and request/response transformation.

def intercept_channel(channel: Channel, *interceptors) -> Channel:
    """
    Intercepts a channel through a set of interceptors.

    Parameters:
    - channel: A Channel object to intercept
    - interceptors: Zero or more interceptor objects

    Returns:
    Channel: A Channel that applies interceptors to each invocation

    Raises:
    TypeError: If interceptor doesn't implement required interfaces
    """

Usage Example:

class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        print(f"Calling {client_call_details.method}")
        response = continuation(client_call_details, request)
        print(f"Received response")
        return response

# Apply interceptor to channel
channel = grpc.insecure_channel('localhost:50051')
intercepted_channel = grpc.intercept_channel(channel, LoggingInterceptor())

Channel Readiness

Monitors channel connectivity state and provides futures for readiness notification, useful for connection pooling and health checking.

def channel_ready_future(channel: Channel) -> Future:
    """
    Creates a Future that tracks when a Channel is ready.

    Parameters:
    - channel: A Channel object to monitor

    Returns:
    Future: A Future that completes when channel reaches READY state
    
    Note:
    Cancelling the Future does not affect the channel's state machine
    """

Usage Example:

channel = grpc.insecure_channel('localhost:50051')
ready_future = grpc.channel_ready_future(channel)

# Wait for channel to be ready (blocking)
try:
    ready_future.result(timeout=5.0)
    print("Channel is ready")
except grpc.FutureTimeoutError:
    print("Channel not ready within timeout")

Channel Interface

Core channel interface providing RPC invocation methods and connectivity management.

class Channel(abc.ABC):
    """
    Affords RPC invocation via generic methods on client-side.
    Supports context manager protocol for automatic cleanup.
    """
    
    def subscribe(self, callback, try_to_connect=False):
        """
        Subscribe to this Channel's connectivity state machine.

        Parameters:
        - callback: Callable invoked with ChannelConnectivity argument
        - try_to_connect: Whether channel should attempt immediate connection
        """
    
    def unsubscribe(self, callback):
        """
        Unsubscribes a callback from connectivity notifications.

        Parameters:
        - callback: Previously registered callback function
        """
    
    def unary_unary(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> UnaryUnaryMultiCallable:
        """
        Creates a UnaryUnaryMultiCallable for a unary-unary method.

        Parameters:
        - method: The name of the RPC method (e.g., '/service.Method')
        - request_serializer: Optional serializer for request messages
        - response_deserializer: Optional deserializer for response messages
        - _registered_method: Internal parameter for registered methods

        Returns:
        UnaryUnaryMultiCallable: Object for making unary-unary calls
        """
    
    def unary_stream(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> UnaryStreamMultiCallable:
        """Creates a UnaryStreamMultiCallable for a unary-stream method."""
    
    def stream_unary(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> StreamUnaryMultiCallable:
        """Creates a StreamUnaryMultiCallable for a stream-unary method."""
    
    def stream_stream(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> StreamStreamMultiCallable:
        """Creates a StreamStreamMultiCallable for a stream-stream method."""
    
    def close(self):
        """
        Closes this Channel and releases all resources.
        Terminates all active RPCs and prevents new ones.
        This method is idempotent.
        """
    
    def __enter__(self):
        """Enters the runtime context for context manager usage."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exits the runtime context and closes the channel."""

Usage Examples:

# Context manager usage (recommended)
with grpc.insecure_channel('localhost:50051') as channel:
    stub = MyServiceStub(channel)
    response = stub.MyMethod(request)
    # Channel automatically closed

# Manual lifecycle management
channel = grpc.insecure_channel('localhost:50051')
try:
    stub = MyServiceStub(channel)
    response = stub.MyMethod(request)
finally:
    channel.close()

# Connectivity monitoring
def connectivity_callback(connectivity):
    print(f"Channel state: {connectivity}")

channel = grpc.insecure_channel('localhost:50051')
channel.subscribe(connectivity_callback, try_to_connect=True)

Channel Options

Common channel configuration options for production deployments:

# Keepalive settings
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),

# Message size limits
('grpc.max_send_message_length', 4 * 1024 * 1024),
('grpc.max_receive_message_length', 4 * 1024 * 1024),

# Connection settings
('grpc.max_connection_idle_ms', 300000),
('grpc.max_connection_age_ms', 600000),
('grpc.http2.max_pings_without_data', 0),

# Load balancing
('grpc.lb_policy_name', 'round_robin'),
('grpc.service_config', '{"loadBalancingConfig": [{"round_robin":{}}]}'),

Types

class ChannelConnectivity(enum.Enum):
    """Channel connectivity states."""
    IDLE = ...  # Channel is idle
    CONNECTING = ...  # Channel is connecting
    READY = ...  # Channel is ready to conduct RPCs
    TRANSIENT_FAILURE = ...  # Channel has seen a recoverable failure
    SHUTDOWN = ...  # Channel has seen an unrecoverable failure

Install with Tessl CLI

npx tessl i tessl/pypi-grpcio

docs

async-api.md

channel-management.md

error-handling.md

index.md

interceptors.md

protobuf-integration.md

rpc-patterns.md

security-authentication.md

server-implementation.md

tile.json