HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
—
Complete asynchronous API for high-performance async/await programming with native Python asyncio integration, supporting all RPC patterns, server/client implementations, and comprehensive async context management for scalable concurrent applications.
Asynchronous channel creation and management with full asyncio integration and context manager support.
# grpc.aio module functions
def insecure_channel(target: str, options=None, compression=None) -> aio.Channel:
"""
Creates an asynchronous insecure Channel to a server.
Parameters:
- target: The server address (e.g., 'localhost:50051')
- options: Optional list of key-value pairs for channel configuration
- compression: Optional compression method
Returns:
aio.Channel: An asynchronous Channel object for making RPCs
"""
def secure_channel(target: str, credentials: ChannelCredentials, options=None, compression=None) -> aio.Channel:
"""
Creates an asynchronous secure Channel to a server.
Parameters:
- target: The server address
- credentials: A ChannelCredentials instance for authentication
- options: Optional list of key-value pairs for channel configuration
- compression: Optional compression method
Returns:
aio.Channel: An asynchronous secure Channel object
"""Usage Examples:
import grpc.aio
import asyncio
async def async_client_example():
# Create async channel with context manager
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
# Make async RPC calls
request = MyRequest(message="Hello async")
response = await stub.UnaryMethod(request)
print(f"Response: {response.reply}")
# Channel automatically closed when exiting context
async def secure_async_client():
credentials = grpc.ssl_channel_credentials()
async with grpc.aio.secure_channel('secure-server.com:443', credentials) as channel:
stub = MyServiceStub(channel)
response = await stub.SecureMethod(request)
return response
# Manual channel lifecycle
async def manual_channel_management():
channel = grpc.aio.insecure_channel('localhost:50051')
try:
stub = MyServiceStub(channel)
response = await stub.UnaryMethod(request)
return response
finally:
await channel.close()Asynchronous channel interface with comprehensive async context management and RPC method creation.
class Channel(abc.ABC):
"""Asynchronous channel interface for client-side RPC invocations."""
async def __aenter__(self) -> Channel:
"""Enters the async runtime context for the channel."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exits the async runtime context and closes the channel."""
def unary_unary(self, method: str, request_serializer=None, response_deserializer=None) -> UnaryUnaryMultiCallable:
"""Creates an async UnaryUnaryMultiCallable for a unary-unary method."""
def unary_stream(self, method: str, request_serializer=None, response_deserializer=None) -> UnaryStreamMultiCallable:
"""Creates an async UnaryStreamMultiCallable for a unary-stream method."""
def stream_unary(self, method: str, request_serializer=None, response_deserializer=None) -> StreamUnaryMultiCallable:
"""Creates an async StreamUnaryMultiCallable for a stream-unary method."""
def stream_stream(self, method: str, request_serializer=None, response_deserializer=None) -> StreamStreamMultiCallable:
"""Creates an async StreamStreamMultiCallable for a stream-stream method."""
async def close(self):
"""Closes the channel and releases all resources asynchronously."""
def get_state(self, try_to_connect=False) -> ChannelConnectivity:
"""Gets the current connectivity state of the channel."""
async def wait_for_state_change(self, last_observed_state: ChannelConnectivity):
"""Waits for the channel state to change from the last observed state."""
async def channel_ready(self):
"""Waits until the channel is ready to conduct RPCs."""Asynchronous server creation and management with async handler support and graceful shutdown capabilities.
def server(migration_thread_pool=None, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None) -> aio.Server:
"""
Creates an asynchronous Server for servicing RPCs.
Parameters:
- migration_thread_pool: Optional ThreadPoolExecutor for sync-to-async migration
- handlers: Optional list of GenericRpcHandlers for initial service registration
- interceptors: Optional list of async ServerInterceptors
- options: Optional list of key-value pairs for server configuration
- maximum_concurrent_rpcs: Maximum concurrent RPCs before returning RESOURCE_EXHAUSTED
- compression: Default compression algorithm
Returns:
aio.Server: An asynchronous Server object
"""
class Server(abc.ABC):
"""Asynchronous server interface for serving RPCs."""
def add_generic_rpc_handlers(self, generic_rpc_handlers):
"""Registers GenericRpcHandlers with this async Server."""
def add_insecure_port(self, address: str) -> int:
"""Opens an insecure port for accepting RPCs."""
def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int:
"""Opens a secure port for accepting RPCs."""
async def start(self):
"""Starts this async Server."""
async def stop(self, grace: float = None):
"""Stops this async Server with optional grace period."""
async def wait_for_termination(self):
"""Waits until the server stops serving."""Usage Examples:
import grpc.aio
import asyncio
class AsyncMyServiceServicer(MyServiceServicer):
async def UnaryMethod(self, request, context):
# Async business logic
await asyncio.sleep(0.1) # Simulate async operation
return MyResponse(reply=f"Async echo: {request.message}")
async def UnaryStreamMethod(self, request, context):
for i in range(request.count):
await asyncio.sleep(0.1) # Simulate async processing
yield MyResponse(reply=f"Item {i}")
async def StreamUnaryMethod(self, request_iterator, context):
items = []
async for request in request_iterator:
items.append(request.data)
await asyncio.sleep(0.01) # Simulate async processing
return MyResponse(reply=f"Processed {len(items)} items")
async def StreamStreamMethod(self, request_iterator, context):
async for request in request_iterator:
# Process each request asynchronously
processed = await self.async_process(request)
yield MyResponse(reply=processed)
async def serve():
server = grpc.aio.server()
# Add servicer
add_MyServiceServicer_to_server(AsyncMyServiceServicer(), server)
# Add ports
listen_addr = '[::]:50051'
server.add_insecure_port(listen_addr)
print(f"Starting async server on {listen_addr}")
await server.start()
try:
await server.wait_for_termination()
except KeyboardInterrupt:
print("Shutting down server...")
await server.stop(grace=5.0)
# Run the server
if __name__ == '__main__':
asyncio.run(serve())Asynchronous multi-callable interfaces for all RPC patterns with native async/await support.
class UnaryUnaryMultiCallable(abc.ABC):
"""Async unary-unary multi-callable."""
async def __call__(self, request, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Asynchronously invokes the underlying RPC.
Parameters:
- request: The request value for the RPC
- timeout: Optional timeout in seconds
- metadata: Optional metadata
- credentials: Optional CallCredentials
- wait_for_ready: Optional wait_for_ready flag
- compression: Optional compression
Returns:
The response value for the RPC
"""
class UnaryStreamMultiCallable(abc.ABC):
"""Async unary-stream multi-callable."""
def __call__(self, request, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None) -> UnaryStreamCall:
"""
Returns an async iterator for the RPC responses.
Returns:
UnaryStreamCall: Async iterator for response values
"""
class StreamUnaryMultiCallable(abc.ABC):
"""Async stream-unary multi-callable."""
async def __call__(self, request_iterator, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Asynchronously invokes the underlying RPC with request stream.
Parameters:
- request_iterator: Async iterator yielding request values
Returns:
The response value for the RPC
"""
class StreamStreamMultiCallable(abc.ABC):
"""Async stream-stream multi-callable."""
def __call__(self, request_iterator, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None) -> StreamStreamCall:
"""
Returns an async iterator for bidirectional streaming.
Parameters:
- request_iterator: Async iterator yielding request values
Returns:
StreamStreamCall: Async iterator for bidirectional streaming
"""Usage Examples:
async def async_rpc_examples():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
# Unary-unary
request = MyRequest(message="Hello")
response = await stub.UnaryMethod(request, timeout=5.0)
print(f"Unary response: {response.reply}")
# Unary-stream
stream_request = MyStreamRequest(count=5)
async for response in stub.UnaryStreamMethod(stream_request):
print(f"Stream response: {response.reply}")
# Stream-unary
async def request_generator():
for i in range(3):
yield MyRequest(message=f"Item {i}")
await asyncio.sleep(0.1)
response = await stub.StreamUnaryMethod(request_generator())
print(f"Stream-unary response: {response.reply}")
# Stream-stream (bidirectional)
async def bidirectional_example():
async def request_stream():
for i in range(5):
yield MyRequest(message=f"Message {i}")
await asyncio.sleep(0.5)
call = stub.StreamStreamMethod(request_stream())
async for response in call:
print(f"Bidirectional response: {response.reply}")
await bidirectional_example()
# Advanced async patterns
async def concurrent_requests():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
# Make multiple concurrent requests
tasks = []
for i in range(10):
request = MyRequest(message=f"Concurrent request {i}")
task = asyncio.create_task(stub.UnaryMethod(request))
tasks.append(task)
# Wait for all to complete
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f"Response {i}: {response.reply}")
async def async_streaming_with_timeout():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
try:
async with asyncio.timeout(10.0): # Python 3.11+
async for response in stub.UnaryStreamMethod(request):
print(f"Response: {response.reply}")
except asyncio.TimeoutError:
print("Stream timed out")Asynchronous call objects providing access to RPC metadata, status, and control operations.
class Call(abc.ABC):
"""Async call interface providing RPC metadata and control."""
def cancelled(self) -> bool:
"""Returns True if the RPC was cancelled."""
def done(self) -> bool:
"""Returns True if the RPC has completed."""
async def initial_metadata(self):
"""Returns the initial metadata sent by the server."""
async def trailing_metadata(self):
"""Returns the trailing metadata sent by the server."""
async def code(self) -> StatusCode:
"""Returns the status code sent by the server."""
async def details(self) -> str:
"""Returns the details sent by the server."""
def cancel(self) -> bool:
"""Cancels the RPC. Returns True if cancellation was successful."""
def add_done_callback(self, callback):
"""Adds a callback to be called when the RPC completes."""
class UnaryUnaryCall(Call):
"""Async unary-unary call."""
def __await__(self):
"""Enables direct awaiting of the call for the response."""
class UnaryStreamCall(Call):
"""Async unary-stream call providing async iteration."""
def __aiter__(self):
"""Returns async iterator for response values."""
async def __anext__(self):
"""Returns next response value or raises StopAsyncIteration."""
class StreamUnaryCall(Call):
"""Async stream-unary call."""
async def write(self, request):
"""Writes a request to the stream."""
async def done_writing(self):
"""Signals that no more requests will be written."""
def __await__(self):
"""Enables direct awaiting of the call for the final response."""
class StreamStreamCall(Call):
"""Async stream-stream call providing bidirectional streaming."""
async def write(self, request):
"""Writes a request to the stream."""
async def done_writing(self):
"""Signals that no more requests will be written."""
def __aiter__(self):
"""Returns async iterator for response values."""
async def __anext__(self):
"""Returns next response value or raises StopAsyncIteration."""Usage Examples:
async def call_control_examples():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
# Unary call with metadata access
call = stub.UnaryMethod(MyRequest(message="Hello"))
# Access initial metadata (available immediately after call starts)
initial_md = await call.initial_metadata()
print(f"Initial metadata: {dict(initial_md)}")
# Get response
response = await call
print(f"Response: {response.reply}")
# Access trailing metadata and status
trailing_md = await call.trailing_metadata()
code = await call.code()
details = await call.details()
print(f"Status: {code} - {details}")
print(f"Trailing metadata: {dict(trailing_md)}")
async def streaming_with_cancellation():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
call = stub.UnaryStreamMethod(MyStreamRequest(count=100))
# Process responses with cancellation logic
count = 0
async for response in call:
print(f"Response {count}: {response.reply}")
count += 1
# Cancel after 5 responses
if count >= 5:
success = call.cancel()
print(f"Cancellation successful: {success}")
break
# Check final status
if call.cancelled():
print("Call was cancelled")
elif call.done():
code = await call.code()
print(f"Call completed with status: {code}")
async def bidirectional_streaming():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
# Start bidirectional stream
call = stub.StreamStreamMethod()
# Send requests in background task
async def send_requests():
try:
for i in range(10):
await call.write(MyRequest(message=f"Message {i}"))
await asyncio.sleep(0.5)
await call.done_writing()
except Exception as e:
print(f"Error sending requests: {e}")
send_task = asyncio.create_task(send_requests())
# Receive responses
try:
async for response in call:
print(f"Received: {response.reply}")
except grpc.aio.AioRpcError as e:
print(f"Stream error: {e}")
finally:
send_task.cancel()
try:
await send_task
except asyncio.CancelledError:
passRuntime initialization and shutdown for the async gRPC environment.
def init_grpc_aio():
"""Initialize gRPC async runtime. Called automatically on first use."""
def shutdown_grpc_aio():
"""Shutdown gRPC async runtime. Should be called before program exit."""Async-specific error types and handling patterns.
class AioRpcError(grpc.RpcError):
"""Async RPC error exception."""
class AbortError(Exception):
"""Async abort error."""
class BaseError(Exception):
"""Base async error."""
class UsageError(Exception):
"""Async usage error."""
class InternalError(Exception):
"""Async internal error."""
EOF = object() # End of file marker for streamsUsage Examples:
async def error_handling_examples():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = MyServiceStub(channel)
try:
response = await stub.UnaryMethod(MyRequest(message="test"))
except grpc.aio.AioRpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("Request timed out")
elif e.code() == grpc.StatusCode.UNAVAILABLE:
print("Service unavailable")
except Exception as e:
print(f"Unexpected error: {e}")
# Proper async runtime cleanup
async def main():
try:
await run_async_application()
finally:
# Clean shutdown of async gRPC runtime
grpc.aio.shutdown_grpc_aio()
if __name__ == '__main__':
asyncio.run(main())# All async types are in grpc.aio module
class Channel(abc.ABC):
"""Async channel interface."""
class Server(abc.ABC):
"""Async server interface."""
class ServicerContext(abc.ABC):
"""Async servicer context."""
class Call(abc.ABC):
"""Base async call interface."""
class UnaryUnaryCall(Call):
"""Async unary-unary call."""
class UnaryStreamCall(Call):
"""Async unary-stream call."""
class StreamUnaryCall(Call):
"""Async stream-unary call."""
class StreamStreamCall(Call):
"""Async stream-stream call."""
class Metadata:
"""Async metadata handling."""Install with Tessl CLI
npx tessl i tessl/pypi-grpcio