HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
—
Channel management provides client-side connection handling with support for secure and insecure connections, connectivity monitoring, interceptor chains, and comprehensive configuration options for production deployments.
Creates channels for client-side RPC communication with support for insecure and secure connections, including SSL/TLS, ALTS, and custom credentials.
def insecure_channel(target: str, options=None, compression=None) -> Channel:
"""
Creates an insecure Channel to a server.
Parameters:
- target: The server address (e.g., 'localhost:50051', 'dns:///example.com:443')
- options: Optional list of key-value pairs for channel configuration
- compression: Optional compression method (grpc.compression.Gzip, etc.)
Returns:
Channel: A thread-safe Channel object for making RPCs
"""
def secure_channel(target: str, credentials: ChannelCredentials, options=None, compression=None) -> Channel:
"""
Creates a secure Channel to a server.
Parameters:
- target: The server address
- credentials: A ChannelCredentials instance for authentication
- options: Optional list of key-value pairs for channel configuration
- compression: Optional compression method
Returns:
Channel: A thread-safe secure Channel object
Raises:
ValueError: If called with insecure credentials
"""Usage Examples:
# Basic insecure channel
channel = grpc.insecure_channel('localhost:50051')
# Secure channel with SSL
credentials = grpc.ssl_channel_credentials()
secure_chan = grpc.secure_channel('secure-server.com:443', credentials)
# Channel with custom options
options = [
('grpc.keepalive_time_ms', 30000),
('grpc.max_receive_message_length', 4 * 1024 * 1024),
]
channel = grpc.insecure_channel('localhost:50051', options=options)
# Channel with compression
channel = grpc.insecure_channel('localhost:50051', compression=grpc.compression.Gzip)Applies interceptor chains to channels for cross-cutting concerns like logging, metrics, authentication, and request/response transformation.
def intercept_channel(channel: Channel, *interceptors) -> Channel:
"""
Intercepts a channel through a set of interceptors.
Parameters:
- channel: A Channel object to intercept
- interceptors: Zero or more interceptor objects
Returns:
Channel: A Channel that applies interceptors to each invocation
Raises:
TypeError: If interceptor doesn't implement required interfaces
"""Usage Example:
class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
print(f"Calling {client_call_details.method}")
response = continuation(client_call_details, request)
print(f"Received response")
return response
# Apply interceptor to channel
channel = grpc.insecure_channel('localhost:50051')
intercepted_channel = grpc.intercept_channel(channel, LoggingInterceptor())Monitors channel connectivity state and provides futures for readiness notification, useful for connection pooling and health checking.
def channel_ready_future(channel: Channel) -> Future:
"""
Creates a Future that tracks when a Channel is ready.
Parameters:
- channel: A Channel object to monitor
Returns:
Future: A Future that completes when channel reaches READY state
Note:
Cancelling the Future does not affect the channel's state machine
"""Usage Example:
channel = grpc.insecure_channel('localhost:50051')
ready_future = grpc.channel_ready_future(channel)
# Wait for channel to be ready (blocking)
try:
ready_future.result(timeout=5.0)
print("Channel is ready")
except grpc.FutureTimeoutError:
print("Channel not ready within timeout")Core channel interface providing RPC invocation methods and connectivity management.
class Channel(abc.ABC):
"""
Affords RPC invocation via generic methods on client-side.
Supports context manager protocol for automatic cleanup.
"""
def subscribe(self, callback, try_to_connect=False):
"""
Subscribe to this Channel's connectivity state machine.
Parameters:
- callback: Callable invoked with ChannelConnectivity argument
- try_to_connect: Whether channel should attempt immediate connection
"""
def unsubscribe(self, callback):
"""
Unsubscribes a callback from connectivity notifications.
Parameters:
- callback: Previously registered callback function
"""
def unary_unary(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> UnaryUnaryMultiCallable:
"""
Creates a UnaryUnaryMultiCallable for a unary-unary method.
Parameters:
- method: The name of the RPC method (e.g., '/service.Method')
- request_serializer: Optional serializer for request messages
- response_deserializer: Optional deserializer for response messages
- _registered_method: Internal parameter for registered methods
Returns:
UnaryUnaryMultiCallable: Object for making unary-unary calls
"""
def unary_stream(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> UnaryStreamMultiCallable:
"""Creates a UnaryStreamMultiCallable for a unary-stream method."""
def stream_unary(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> StreamUnaryMultiCallable:
"""Creates a StreamUnaryMultiCallable for a stream-unary method."""
def stream_stream(self, method: str, request_serializer=None, response_deserializer=None, _registered_method=False) -> StreamStreamMultiCallable:
"""Creates a StreamStreamMultiCallable for a stream-stream method."""
def close(self):
"""
Closes this Channel and releases all resources.
Terminates all active RPCs and prevents new ones.
This method is idempotent.
"""
def __enter__(self):
"""Enters the runtime context for context manager usage."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exits the runtime context and closes the channel."""Usage Examples:
# Context manager usage (recommended)
with grpc.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
response = stub.MyMethod(request)
# Channel automatically closed
# Manual lifecycle management
channel = grpc.insecure_channel('localhost:50051')
try:
stub = MyServiceStub(channel)
response = stub.MyMethod(request)
finally:
channel.close()
# Connectivity monitoring
def connectivity_callback(connectivity):
print(f"Channel state: {connectivity}")
channel = grpc.insecure_channel('localhost:50051')
channel.subscribe(connectivity_callback, try_to_connect=True)Common channel configuration options for production deployments:
# Keepalive settings
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
# Message size limits
('grpc.max_send_message_length', 4 * 1024 * 1024),
('grpc.max_receive_message_length', 4 * 1024 * 1024),
# Connection settings
('grpc.max_connection_idle_ms', 300000),
('grpc.max_connection_age_ms', 600000),
('grpc.http2.max_pings_without_data', 0),
# Load balancing
('grpc.lb_policy_name', 'round_robin'),
('grpc.service_config', '{"loadBalancingConfig": [{"round_robin":{}}]}'),class ChannelConnectivity(enum.Enum):
"""Channel connectivity states."""
IDLE = ... # Channel is idle
CONNECTING = ... # Channel is connecting
READY = ... # Channel is ready to conduct RPCs
TRANSIENT_FAILURE = ... # Channel has seen a recoverable failure
SHUTDOWN = ... # Channel has seen an unrecoverable failureInstall with Tessl CLI
npx tessl i tessl/pypi-grpcio