or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-api.mdchannel-management.mderror-handling.mdindex.mdinterceptors.mdprotobuf-integration.mdrpc-patterns.mdsecurity-authentication.mdserver-implementation.md
tile.json

tessl/pypi-grpcio

HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/grpcio@1.74.x

To install, run

npx @tessl/cli install tessl/pypi-grpcio@1.74.0

index.mddocs/

gRPC Python (grpcio)

A modern, high-performance RPC framework that uses HTTP/2 for transport and Protocol Buffers as the interface description language. gRPC enables efficient communication between client and server applications across different platforms and languages, with features including streaming, flow control, cancellation, timeouts, authentication, and load balancing.

Package Information

  • Package Name: grpcio
  • Language: Python
  • Installation: pip install grpcio

Core Imports

import grpc

For async functionality:

import grpc.aio

For experimental features:

import grpc.experimental

Basic Usage

Simple Client

import grpc
import my_service_pb2
import my_service_pb2_grpc

# Create channel
channel = grpc.insecure_channel('localhost:50051')

# Create stub
stub = my_service_pb2_grpc.MyServiceStub(channel)

# Make call
request = my_service_pb2.MyRequest(message="Hello")
response = stub.MyMethod(request)
print(response.reply)

# Clean up
channel.close()

Simple Server

import grpc
from concurrent import futures
import my_service_pb2_grpc

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
    def MyMethod(self, request, context):
        return my_service_pb2.MyResponse(reply=f"Echo: {request.message}")

# Create server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
my_service_pb2_grpc.add_MyServiceServicer_to_server(MyServiceServicer(), server)

# Add port and start
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()

Async Client/Server

import grpc.aio

# Async client
async def run_client():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = my_service_pb2_grpc.MyServiceStub(channel)
        response = await stub.MyMethod(my_service_pb2.MyRequest(message="Hello"))
        print(response.reply)

# Async server
async def serve():
    server = grpc.aio.server()
    my_service_pb2_grpc.add_MyServiceServicer_to_server(MyServiceServicer(), server)
    listen_addr = '[::]:50051'
    server.add_insecure_port(listen_addr)
    await server.start()
    await server.wait_for_termination()

Architecture

gRPC Python is built around several key components:

  • Channels: Client-side connection management with connectivity monitoring and load balancing
  • Stubs: Client-side objects for making RPC calls, supporting all four RPC patterns
  • Servers: Multi-threaded or async servers with configurable thread pools and interceptors
  • Servicers: Server-side service implementations with full context access
  • Interceptors: Middleware for both client and server sides with state passing via contextvars
  • Credentials: Comprehensive security with SSL/TLS, ALTS, OAuth2, and custom authentication
  • Metadata: Key-value pairs for request/response headers with full lifecycle management

The framework supports four RPC patterns: unary-unary (single request/response), unary-stream (single request/multiple responses), stream-unary (multiple requests/single response), and stream-stream (bidirectional streaming), all available in both synchronous and asynchronous modes.

Capabilities

Channel Management

Client-side channel creation and management with support for secure and insecure connections, connectivity monitoring, interceptor chains, and configurable options for load balancing and connection parameters.

def insecure_channel(target: str, options=None, compression=None) -> Channel: ...
def secure_channel(target: str, credentials: ChannelCredentials, options=None, compression=None) -> Channel: ...
def intercept_channel(channel: Channel, *interceptors) -> Channel: ...
def channel_ready_future(channel: Channel) -> Future: ...

Channel Management

Server Implementation

Server creation and lifecycle management with support for multiple service registration, port configuration, graceful shutdown, and both synchronous and asynchronous execution models.

def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False) -> Server: ...

class Server(abc.ABC):
    def add_generic_rpc_handlers(self, generic_rpc_handlers): ...
    def add_insecure_port(self, address: str) -> int: ...
    def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int: ...
    def start(self): ...
    def stop(self, grace): ...

Server Implementation

Security and Authentication

Comprehensive security framework with SSL/TLS credentials, mutual authentication, OAuth2 integration, ALTS for GCP environments, and custom authentication plugins.

def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None) -> ChannelCredentials: ...
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False) -> ServerCredentials: ...
def metadata_call_credentials(metadata_plugin: AuthMetadataPlugin, name=None) -> CallCredentials: ...
def access_token_call_credentials(access_token: str) -> CallCredentials: ...
def composite_channel_credentials(channel_credentials: ChannelCredentials, *call_credentials) -> ChannelCredentials: ...

Security and Authentication

RPC Patterns and Multi-Callables

Support for all four RPC patterns with both synchronous and asynchronous invocation methods, including timeout handling, metadata passing, and credential specification.

class UnaryUnaryMultiCallable(abc.ABC):
    def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...
    def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...
    def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...

class UnaryStreamMultiCallable(abc.ABC):
    def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...

RPC Patterns

Interceptors

Client and server-side middleware for cross-cutting concerns like logging, metrics, authentication, and request/response modification, with full support for all RPC patterns.

class UnaryUnaryClientInterceptor(abc.ABC):
    def intercept_unary_unary(self, continuation, client_call_details, request): ...

class ServerInterceptor(abc.ABC):
    def intercept_service(self, continuation, handler_call_details): ...

Interceptors

Async API

Complete asynchronous API for high-performance async/await programming with native Python asyncio integration, supporting all RPC patterns and server/client implementations.

# grpc.aio module
def insecure_channel(target: str, options=None, compression=None) -> aio.Channel: ...
def server(migration_thread_pool=None, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None) -> aio.Server: ...

class Channel(abc.ABC):
    async def __aenter__(self) -> Channel: ...
    async def __aexit__(self, exc_type, exc_val, exc_tb): ...
    def unary_unary(self, method, request_serializer=None, response_deserializer=None) -> UnaryUnaryMultiCallable: ...

Async API

Error Handling and Status

Comprehensive error handling with gRPC status codes, custom exceptions, detailed error information, and proper exception propagation for both sync and async contexts.

class StatusCode(enum.Enum):
    OK = ...
    CANCELLED = ...
    UNKNOWN = ...
    INVALID_ARGUMENT = ...
    DEADLINE_EXCEEDED = ...
    NOT_FOUND = ...
    ALREADY_EXISTS = ...
    PERMISSION_DENIED = ...
    RESOURCE_EXHAUSTED = ...
    FAILED_PRECONDITION = ...
    ABORTED = ...
    OUT_OF_RANGE = ...
    UNIMPLEMENTED = ...
    INTERNAL = ...
    UNAVAILABLE = ...
    DATA_LOSS = ...
    UNAUTHENTICATED = ...

class RpcError(Exception): ...

class ServicerContext(abc.ABC):
    def abort(self, code: StatusCode, details: str): ...
    def abort_with_status(self, status): ...
    def set_code(self, code: StatusCode): ...
    def set_details(self, details: str): ...

Error Handling

Protocol Buffers Integration

Runtime loading and compilation of Protocol Buffer definitions from .proto files, enabling dynamic service discovery and client generation without pre-compilation.

def protos(protobuf_path: str): ...
def services(protobuf_path: str): ...
def protos_and_services(protobuf_path: str): ...

Protocol Buffers Integration

Experimental APIs

Experimental features in the grpc.experimental module providing additional functionality and simplified RPC calling patterns.

# grpc.experimental module
class ChannelOptions:
    SingleThreadedUnaryStream = ...  # Perform unary-stream RPCs on single thread

def insecure_channel_credentials() -> ChannelCredentials: ...
def experimental_api(f): ...
def wrap_server_method_handler(wrapper, handler): ...

# Simple stub functions (available in Python 3.7+)
def unary_unary(
    request,
    target: str,
    method: str,
    request_serializer=None,
    response_deserializer=None,
    options=(),
    channel_credentials=None,
    insecure: bool = False,
    call_credentials=None,
    compression=None,
    wait_for_ready=None,
    timeout=None,
    metadata=None
): ...

def unary_stream(
    request,
    target: str,
    method: str,
    request_serializer=None,
    response_deserializer=None,
    options=(),
    channel_credentials=None,
    insecure: bool = False,
    call_credentials=None,
    compression=None,
    wait_for_ready=None,
    timeout=None,
    metadata=None
): ...

def stream_unary(
    request_iterator,
    target: str,
    method: str,
    request_serializer=None,
    response_deserializer=None,
    options=(),
    channel_credentials=None,
    insecure: bool = False,
    call_credentials=None,
    compression=None,
    wait_for_ready=None,
    timeout=None,
    metadata=None
): ...

def stream_stream(
    request_iterator,
    target: str,
    method: str,
    request_serializer=None,
    response_deserializer=None,
    options=(),
    channel_credentials=None,
    insecure: bool = False,
    call_credentials=None,
    compression=None,
    wait_for_ready=None,
    timeout=None,
    metadata=None
): ...

class UsageError(Exception): ...
class ExperimentalApiWarning(Warning): ...

Version Information

import grpc
print(grpc.__version__)  # Package version string

Types

Core Types

class ChannelCredentials:
    def __init__(self, credentials): ...

class CallCredentials:
    def __init__(self, credentials): ...

class ServerCredentials:
    def __init__(self, credentials): ...

class Future(abc.ABC):
    def cancel(self) -> bool: ...
    def cancelled(self) -> bool: ...
    def running(self) -> bool: ...
    def done(self) -> bool: ...
    def result(self, timeout=None): ...
    def exception(self, timeout=None): ...
    def traceback(self, timeout=None): ...
    def add_done_callback(self, fn): ...

class ChannelConnectivity(enum.Enum):
    IDLE = ...
    CONNECTING = ...
    READY = ...
    TRANSIENT_FAILURE = ...
    SHUTDOWN = ...

class Compression(enum.IntEnum):
    NoCompression = ...
    Deflate = ...
    Gzip = ...

class LocalConnectionType(enum.Enum):
    UDS = ...
    LOCAL_TCP = ...