HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
npx @tessl/cli install tessl/pypi-grpcio@1.74.0A modern, high-performance RPC framework that uses HTTP/2 for transport and Protocol Buffers as the interface description language. gRPC enables efficient communication between client and server applications across different platforms and languages, with features including streaming, flow control, cancellation, timeouts, authentication, and load balancing.
pip install grpcioimport grpcFor async functionality:
import grpc.aioFor experimental features:
import grpc.experimentalimport grpc
import my_service_pb2
import my_service_pb2_grpc
# Create channel
channel = grpc.insecure_channel('localhost:50051')
# Create stub
stub = my_service_pb2_grpc.MyServiceStub(channel)
# Make call
request = my_service_pb2.MyRequest(message="Hello")
response = stub.MyMethod(request)
print(response.reply)
# Clean up
channel.close()import grpc
from concurrent import futures
import my_service_pb2_grpc
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
def MyMethod(self, request, context):
return my_service_pb2.MyResponse(reply=f"Echo: {request.message}")
# Create server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
my_service_pb2_grpc.add_MyServiceServicer_to_server(MyServiceServicer(), server)
# Add port and start
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()import grpc.aio
# Async client
async def run_client():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = my_service_pb2_grpc.MyServiceStub(channel)
response = await stub.MyMethod(my_service_pb2.MyRequest(message="Hello"))
print(response.reply)
# Async server
async def serve():
server = grpc.aio.server()
my_service_pb2_grpc.add_MyServiceServicer_to_server(MyServiceServicer(), server)
listen_addr = '[::]:50051'
server.add_insecure_port(listen_addr)
await server.start()
await server.wait_for_termination()gRPC Python is built around several key components:
The framework supports four RPC patterns: unary-unary (single request/response), unary-stream (single request/multiple responses), stream-unary (multiple requests/single response), and stream-stream (bidirectional streaming), all available in both synchronous and asynchronous modes.
Client-side channel creation and management with support for secure and insecure connections, connectivity monitoring, interceptor chains, and configurable options for load balancing and connection parameters.
def insecure_channel(target: str, options=None, compression=None) -> Channel: ...
def secure_channel(target: str, credentials: ChannelCredentials, options=None, compression=None) -> Channel: ...
def intercept_channel(channel: Channel, *interceptors) -> Channel: ...
def channel_ready_future(channel: Channel) -> Future: ...Server creation and lifecycle management with support for multiple service registration, port configuration, graceful shutdown, and both synchronous and asynchronous execution models.
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False) -> Server: ...
class Server(abc.ABC):
def add_generic_rpc_handlers(self, generic_rpc_handlers): ...
def add_insecure_port(self, address: str) -> int: ...
def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int: ...
def start(self): ...
def stop(self, grace): ...Comprehensive security framework with SSL/TLS credentials, mutual authentication, OAuth2 integration, ALTS for GCP environments, and custom authentication plugins.
def ssl_channel_credentials(root_certificates=None, private_key=None, certificate_chain=None) -> ChannelCredentials: ...
def ssl_server_credentials(private_key_certificate_chain_pairs, root_certificates=None, require_client_auth=False) -> ServerCredentials: ...
def metadata_call_credentials(metadata_plugin: AuthMetadataPlugin, name=None) -> CallCredentials: ...
def access_token_call_credentials(access_token: str) -> CallCredentials: ...
def composite_channel_credentials(channel_credentials: ChannelCredentials, *call_credentials) -> ChannelCredentials: ...Support for all four RPC patterns with both synchronous and asynchronous invocation methods, including timeout handling, metadata passing, and credential specification.
class UnaryUnaryMultiCallable(abc.ABC):
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...
def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...
def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...
class UnaryStreamMultiCallable(abc.ABC):
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): ...Client and server-side middleware for cross-cutting concerns like logging, metrics, authentication, and request/response modification, with full support for all RPC patterns.
class UnaryUnaryClientInterceptor(abc.ABC):
def intercept_unary_unary(self, continuation, client_call_details, request): ...
class ServerInterceptor(abc.ABC):
def intercept_service(self, continuation, handler_call_details): ...Complete asynchronous API for high-performance async/await programming with native Python asyncio integration, supporting all RPC patterns and server/client implementations.
# grpc.aio module
def insecure_channel(target: str, options=None, compression=None) -> aio.Channel: ...
def server(migration_thread_pool=None, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None) -> aio.Server: ...
class Channel(abc.ABC):
async def __aenter__(self) -> Channel: ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
def unary_unary(self, method, request_serializer=None, response_deserializer=None) -> UnaryUnaryMultiCallable: ...Comprehensive error handling with gRPC status codes, custom exceptions, detailed error information, and proper exception propagation for both sync and async contexts.
class StatusCode(enum.Enum):
OK = ...
CANCELLED = ...
UNKNOWN = ...
INVALID_ARGUMENT = ...
DEADLINE_EXCEEDED = ...
NOT_FOUND = ...
ALREADY_EXISTS = ...
PERMISSION_DENIED = ...
RESOURCE_EXHAUSTED = ...
FAILED_PRECONDITION = ...
ABORTED = ...
OUT_OF_RANGE = ...
UNIMPLEMENTED = ...
INTERNAL = ...
UNAVAILABLE = ...
DATA_LOSS = ...
UNAUTHENTICATED = ...
class RpcError(Exception): ...
class ServicerContext(abc.ABC):
def abort(self, code: StatusCode, details: str): ...
def abort_with_status(self, status): ...
def set_code(self, code: StatusCode): ...
def set_details(self, details: str): ...Runtime loading and compilation of Protocol Buffer definitions from .proto files, enabling dynamic service discovery and client generation without pre-compilation.
def protos(protobuf_path: str): ...
def services(protobuf_path: str): ...
def protos_and_services(protobuf_path: str): ...Experimental features in the grpc.experimental module providing additional functionality and simplified RPC calling patterns.
# grpc.experimental module
class ChannelOptions:
SingleThreadedUnaryStream = ... # Perform unary-stream RPCs on single thread
def insecure_channel_credentials() -> ChannelCredentials: ...
def experimental_api(f): ...
def wrap_server_method_handler(wrapper, handler): ...
# Simple stub functions (available in Python 3.7+)
def unary_unary(
request,
target: str,
method: str,
request_serializer=None,
response_deserializer=None,
options=(),
channel_credentials=None,
insecure: bool = False,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None
): ...
def unary_stream(
request,
target: str,
method: str,
request_serializer=None,
response_deserializer=None,
options=(),
channel_credentials=None,
insecure: bool = False,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None
): ...
def stream_unary(
request_iterator,
target: str,
method: str,
request_serializer=None,
response_deserializer=None,
options=(),
channel_credentials=None,
insecure: bool = False,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None
): ...
def stream_stream(
request_iterator,
target: str,
method: str,
request_serializer=None,
response_deserializer=None,
options=(),
channel_credentials=None,
insecure: bool = False,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None
): ...
class UsageError(Exception): ...
class ExperimentalApiWarning(Warning): ...import grpc
print(grpc.__version__) # Package version stringclass ChannelCredentials:
def __init__(self, credentials): ...
class CallCredentials:
def __init__(self, credentials): ...
class ServerCredentials:
def __init__(self, credentials): ...
class Future(abc.ABC):
def cancel(self) -> bool: ...
def cancelled(self) -> bool: ...
def running(self) -> bool: ...
def done(self) -> bool: ...
def result(self, timeout=None): ...
def exception(self, timeout=None): ...
def traceback(self, timeout=None): ...
def add_done_callback(self, fn): ...
class ChannelConnectivity(enum.Enum):
IDLE = ...
CONNECTING = ...
READY = ...
TRANSIENT_FAILURE = ...
SHUTDOWN = ...
class Compression(enum.IntEnum):
NoCompression = ...
Deflate = ...
Gzip = ...
class LocalConnectionType(enum.Enum):
UDS = ...
LOCAL_TCP = ...