CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-grpcio

HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems

Pending
Overview
Eval results
Files

async-api.mddocs/

Async API

Complete asynchronous API for high-performance async/await programming with native Python asyncio integration, supporting all RPC patterns, server/client implementations, and comprehensive async context management for scalable concurrent applications.

Capabilities

Async Channel Management

Asynchronous channel creation and management with full asyncio integration and context manager support.

# grpc.aio module functions
def insecure_channel(target: str, options=None, compression=None) -> aio.Channel:
    """
    Creates an asynchronous insecure Channel to a server.

    Parameters:
    - target: The server address (e.g., 'localhost:50051')
    - options: Optional list of key-value pairs for channel configuration
    - compression: Optional compression method

    Returns:
    aio.Channel: An asynchronous Channel object for making RPCs
    """

def secure_channel(target: str, credentials: ChannelCredentials, options=None, compression=None) -> aio.Channel:
    """
    Creates an asynchronous secure Channel to a server.

    Parameters:
    - target: The server address
    - credentials: A ChannelCredentials instance for authentication
    - options: Optional list of key-value pairs for channel configuration
    - compression: Optional compression method

    Returns:
    aio.Channel: An asynchronous secure Channel object
    """

Usage Examples:

import grpc.aio
import asyncio

async def async_client_example():
    # Create async channel with context manager
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        # Make async RPC calls
        request = MyRequest(message="Hello async")
        response = await stub.UnaryMethod(request)
        print(f"Response: {response.reply}")
    
    # Channel automatically closed when exiting context

async def secure_async_client():
    credentials = grpc.ssl_channel_credentials()
    async with grpc.aio.secure_channel('secure-server.com:443', credentials) as channel:
        stub = MyServiceStub(channel)
        response = await stub.SecureMethod(request)
        return response

# Manual channel lifecycle
async def manual_channel_management():
    channel = grpc.aio.insecure_channel('localhost:50051')
    try:
        stub = MyServiceStub(channel)
        response = await stub.UnaryMethod(request)
        return response
    finally:
        await channel.close()

Async Channel Interface

Asynchronous channel interface with comprehensive async context management and RPC method creation.

class Channel(abc.ABC):
    """Asynchronous channel interface for client-side RPC invocations."""
    
    async def __aenter__(self) -> Channel:
        """Enters the async runtime context for the channel."""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exits the async runtime context and closes the channel."""
    
    def unary_unary(self, method: str, request_serializer=None, response_deserializer=None) -> UnaryUnaryMultiCallable:
        """Creates an async UnaryUnaryMultiCallable for a unary-unary method."""
    
    def unary_stream(self, method: str, request_serializer=None, response_deserializer=None) -> UnaryStreamMultiCallable:
        """Creates an async UnaryStreamMultiCallable for a unary-stream method."""
    
    def stream_unary(self, method: str, request_serializer=None, response_deserializer=None) -> StreamUnaryMultiCallable:
        """Creates an async StreamUnaryMultiCallable for a stream-unary method."""
    
    def stream_stream(self, method: str, request_serializer=None, response_deserializer=None) -> StreamStreamMultiCallable:
        """Creates an async StreamStreamMultiCallable for a stream-stream method."""
    
    async def close(self):
        """Closes the channel and releases all resources asynchronously."""
    
    def get_state(self, try_to_connect=False) -> ChannelConnectivity:
        """Gets the current connectivity state of the channel."""
    
    async def wait_for_state_change(self, last_observed_state: ChannelConnectivity):
        """Waits for the channel state to change from the last observed state."""
    
    async def channel_ready(self):
        """Waits until the channel is ready to conduct RPCs."""

Async Server Implementation

Asynchronous server creation and management with async handler support and graceful shutdown capabilities.

def server(migration_thread_pool=None, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None) -> aio.Server:
    """
    Creates an asynchronous Server for servicing RPCs.

    Parameters:
    - migration_thread_pool: Optional ThreadPoolExecutor for sync-to-async migration
    - handlers: Optional list of GenericRpcHandlers for initial service registration
    - interceptors: Optional list of async ServerInterceptors
    - options: Optional list of key-value pairs for server configuration
    - maximum_concurrent_rpcs: Maximum concurrent RPCs before returning RESOURCE_EXHAUSTED
    - compression: Default compression algorithm

    Returns:
    aio.Server: An asynchronous Server object
    """

class Server(abc.ABC):
    """Asynchronous server interface for serving RPCs."""
    
    def add_generic_rpc_handlers(self, generic_rpc_handlers):
        """Registers GenericRpcHandlers with this async Server."""
    
    def add_insecure_port(self, address: str) -> int:
        """Opens an insecure port for accepting RPCs."""
    
    def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int:
        """Opens a secure port for accepting RPCs."""
    
    async def start(self):
        """Starts this async Server."""
    
    async def stop(self, grace: float = None):
        """Stops this async Server with optional grace period."""
    
    async def wait_for_termination(self):
        """Waits until the server stops serving."""

Usage Examples:

import grpc.aio
import asyncio

class AsyncMyServiceServicer(MyServiceServicer):
    async def UnaryMethod(self, request, context):
        # Async business logic
        await asyncio.sleep(0.1)  # Simulate async operation
        return MyResponse(reply=f"Async echo: {request.message}")
    
    async def UnaryStreamMethod(self, request, context):
        for i in range(request.count):
            await asyncio.sleep(0.1)  # Simulate async processing
            yield MyResponse(reply=f"Item {i}")
    
    async def StreamUnaryMethod(self, request_iterator, context):
        items = []
        async for request in request_iterator:
            items.append(request.data)
            await asyncio.sleep(0.01)  # Simulate async processing
        
        return MyResponse(reply=f"Processed {len(items)} items")
    
    async def StreamStreamMethod(self, request_iterator, context):
        async for request in request_iterator:
            # Process each request asynchronously
            processed = await self.async_process(request)
            yield MyResponse(reply=processed)

async def serve():
    server = grpc.aio.server()
    
    # Add servicer
    add_MyServiceServicer_to_server(AsyncMyServiceServicer(), server)
    
    # Add ports
    listen_addr = '[::]:50051'
    server.add_insecure_port(listen_addr)
    
    print(f"Starting async server on {listen_addr}")
    await server.start()
    
    try:
        await server.wait_for_termination()
    except KeyboardInterrupt:
        print("Shutting down server...")
        await server.stop(grace=5.0)

# Run the server
if __name__ == '__main__':
    asyncio.run(serve())

Async Multi-Callables

Asynchronous multi-callable interfaces for all RPC patterns with native async/await support.

class UnaryUnaryMultiCallable(abc.ABC):
    """Async unary-unary multi-callable."""
    
    async def __call__(self, request, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Asynchronously invokes the underlying RPC.

        Parameters:
        - request: The request value for the RPC
        - timeout: Optional timeout in seconds
        - metadata: Optional metadata
        - credentials: Optional CallCredentials
        - wait_for_ready: Optional wait_for_ready flag
        - compression: Optional compression

        Returns:
        The response value for the RPC
        """

class UnaryStreamMultiCallable(abc.ABC):
    """Async unary-stream multi-callable."""
    
    def __call__(self, request, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None) -> UnaryStreamCall:
        """
        Returns an async iterator for the RPC responses.

        Returns:
        UnaryStreamCall: Async iterator for response values
        """

class StreamUnaryMultiCallable(abc.ABC):
    """Async stream-unary multi-callable."""
    
    async def __call__(self, request_iterator, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Asynchronously invokes the underlying RPC with request stream.

        Parameters:
        - request_iterator: Async iterator yielding request values

        Returns:
        The response value for the RPC
        """

class StreamStreamMultiCallable(abc.ABC):
    """Async stream-stream multi-callable."""
    
    def __call__(self, request_iterator, *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None) -> StreamStreamCall:
        """
        Returns an async iterator for bidirectional streaming.

        Parameters:
        - request_iterator: Async iterator yielding request values

        Returns:
        StreamStreamCall: Async iterator for bidirectional streaming
        """

Usage Examples:

async def async_rpc_examples():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        # Unary-unary
        request = MyRequest(message="Hello")
        response = await stub.UnaryMethod(request, timeout=5.0)
        print(f"Unary response: {response.reply}")
        
        # Unary-stream
        stream_request = MyStreamRequest(count=5)
        async for response in stub.UnaryStreamMethod(stream_request):
            print(f"Stream response: {response.reply}")
        
        # Stream-unary
        async def request_generator():
            for i in range(3):
                yield MyRequest(message=f"Item {i}")
                await asyncio.sleep(0.1)
        
        response = await stub.StreamUnaryMethod(request_generator())
        print(f"Stream-unary response: {response.reply}")
        
        # Stream-stream (bidirectional)
        async def bidirectional_example():
            async def request_stream():
                for i in range(5):
                    yield MyRequest(message=f"Message {i}")
                    await asyncio.sleep(0.5)
            
            call = stub.StreamStreamMethod(request_stream())
            async for response in call:
                print(f"Bidirectional response: {response.reply}")
        
        await bidirectional_example()

# Advanced async patterns
async def concurrent_requests():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        # Make multiple concurrent requests
        tasks = []
        for i in range(10):
            request = MyRequest(message=f"Concurrent request {i}")
            task = asyncio.create_task(stub.UnaryMethod(request))
            tasks.append(task)
        
        # Wait for all to complete
        responses = await asyncio.gather(*tasks)
        for i, response in enumerate(responses):
            print(f"Response {i}: {response.reply}")

async def async_streaming_with_timeout():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        try:
            async with asyncio.timeout(10.0):  # Python 3.11+
                async for response in stub.UnaryStreamMethod(request):
                    print(f"Response: {response.reply}")
        except asyncio.TimeoutError:
            print("Stream timed out")

Async Call Objects

Asynchronous call objects providing access to RPC metadata, status, and control operations.

class Call(abc.ABC):
    """Async call interface providing RPC metadata and control."""
    
    def cancelled(self) -> bool:
        """Returns True if the RPC was cancelled."""
    
    def done(self) -> bool:
        """Returns True if the RPC has completed."""
    
    async def initial_metadata(self):
        """Returns the initial metadata sent by the server."""
    
    async def trailing_metadata(self):
        """Returns the trailing metadata sent by the server."""
    
    async def code(self) -> StatusCode:
        """Returns the status code sent by the server."""
    
    async def details(self) -> str:
        """Returns the details sent by the server."""
    
    def cancel(self) -> bool:
        """Cancels the RPC. Returns True if cancellation was successful."""
    
    def add_done_callback(self, callback):
        """Adds a callback to be called when the RPC completes."""

class UnaryUnaryCall(Call):
    """Async unary-unary call."""
    
    def __await__(self):
        """Enables direct awaiting of the call for the response."""

class UnaryStreamCall(Call):
    """Async unary-stream call providing async iteration."""
    
    def __aiter__(self):
        """Returns async iterator for response values."""
    
    async def __anext__(self):
        """Returns next response value or raises StopAsyncIteration."""

class StreamUnaryCall(Call):
    """Async stream-unary call."""
    
    async def write(self, request):
        """Writes a request to the stream."""
    
    async def done_writing(self):
        """Signals that no more requests will be written."""
    
    def __await__(self):
        """Enables direct awaiting of the call for the final response."""

class StreamStreamCall(Call):
    """Async stream-stream call providing bidirectional streaming."""
    
    async def write(self, request):
        """Writes a request to the stream."""
    
    async def done_writing(self):
        """Signals that no more requests will be written."""
    
    def __aiter__(self):
        """Returns async iterator for response values."""
    
    async def __anext__(self):
        """Returns next response value or raises StopAsyncIteration."""

Usage Examples:

async def call_control_examples():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        # Unary call with metadata access
        call = stub.UnaryMethod(MyRequest(message="Hello"))
        
        # Access initial metadata (available immediately after call starts)
        initial_md = await call.initial_metadata()
        print(f"Initial metadata: {dict(initial_md)}")
        
        # Get response
        response = await call
        print(f"Response: {response.reply}")
        
        # Access trailing metadata and status
        trailing_md = await call.trailing_metadata()
        code = await call.code()
        details = await call.details()
        print(f"Status: {code} - {details}")
        print(f"Trailing metadata: {dict(trailing_md)}")

async def streaming_with_cancellation():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        call = stub.UnaryStreamMethod(MyStreamRequest(count=100))
        
        # Process responses with cancellation logic
        count = 0
        async for response in call:
            print(f"Response {count}: {response.reply}")
            count += 1
            
            # Cancel after 5 responses
            if count >= 5:
                success = call.cancel()
                print(f"Cancellation successful: {success}")
                break
        
        # Check final status
        if call.cancelled():
            print("Call was cancelled")
        elif call.done():
            code = await call.code()
            print(f"Call completed with status: {code}")

async def bidirectional_streaming():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        # Start bidirectional stream
        call = stub.StreamStreamMethod()
        
        # Send requests in background task
        async def send_requests():
            try:
                for i in range(10):
                    await call.write(MyRequest(message=f"Message {i}"))
                    await asyncio.sleep(0.5)
                await call.done_writing()
            except Exception as e:
                print(f"Error sending requests: {e}")
        
        send_task = asyncio.create_task(send_requests())
        
        # Receive responses
        try:
            async for response in call:
                print(f"Received: {response.reply}")
        except grpc.aio.AioRpcError as e:
            print(f"Stream error: {e}")
        finally:
            send_task.cancel()
            try:
                await send_task
            except asyncio.CancelledError:
                pass

Async Runtime Control

Runtime initialization and shutdown for the async gRPC environment.

def init_grpc_aio():
    """Initialize gRPC async runtime. Called automatically on first use."""

def shutdown_grpc_aio():
    """Shutdown gRPC async runtime. Should be called before program exit."""

Async Error Handling

Async-specific error types and handling patterns.

class AioRpcError(grpc.RpcError):
    """Async RPC error exception."""

class AbortError(Exception):
    """Async abort error."""

class BaseError(Exception):
    """Base async error."""

class UsageError(Exception):
    """Async usage error."""

class InternalError(Exception):
    """Async internal error."""

EOF = object()  # End of file marker for streams

Usage Examples:

async def error_handling_examples():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = MyServiceStub(channel)
        
        try:
            response = await stub.UnaryMethod(MyRequest(message="test"))
        except grpc.aio.AioRpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")
            if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
                print("Request timed out")
            elif e.code() == grpc.StatusCode.UNAVAILABLE:
                print("Service unavailable")
        except Exception as e:
            print(f"Unexpected error: {e}")

# Proper async runtime cleanup
async def main():
    try:
        await run_async_application()
    finally:
        # Clean shutdown of async gRPC runtime
        grpc.aio.shutdown_grpc_aio()

if __name__ == '__main__':
    asyncio.run(main())

Types

# All async types are in grpc.aio module
class Channel(abc.ABC):
    """Async channel interface."""

class Server(abc.ABC):
    """Async server interface."""

class ServicerContext(abc.ABC):
    """Async servicer context."""

class Call(abc.ABC):
    """Base async call interface."""

class UnaryUnaryCall(Call):
    """Async unary-unary call."""

class UnaryStreamCall(Call):
    """Async unary-stream call."""

class StreamUnaryCall(Call):
    """Async stream-unary call."""

class StreamStreamCall(Call):
    """Async stream-stream call."""

class Metadata:
    """Async metadata handling."""

Install with Tessl CLI

npx tessl i tessl/pypi-grpcio

docs

async-api.md

channel-management.md

error-handling.md

index.md

interceptors.md

protobuf-integration.md

rpc-patterns.md

security-authentication.md

server-implementation.md

tile.json