HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
—
Server implementation provides comprehensive server creation and lifecycle management with support for multiple service registration, port configuration, thread pool management, graceful shutdown, and both synchronous and asynchronous execution models.
Creates gRPC servers with configurable thread pools, interceptors, and advanced options for production deployments.
def server(thread_pool, handlers=None, interceptors=None, options=None, maximum_concurrent_rpcs=None, compression=None, xds=False) -> Server:
"""
Creates a Server with which RPCs can be serviced.
Parameters:
- thread_pool: A futures.ThreadPoolExecutor for executing RPC handlers
- handlers: Optional list of GenericRpcHandlers for initial service registration
- interceptors: Optional list of ServerInterceptors for middleware
- options: Optional list of key-value pairs for server configuration
- maximum_concurrent_rpcs: Maximum concurrent RPCs before returning RESOURCE_EXHAUSTED
- compression: Default compression algorithm for the server lifetime
- xds: If True, retrieves server configuration via xDS (EXPERIMENTAL)
Returns:
Server: A Server object ready for service registration and startup
"""Usage Examples:
from concurrent import futures
# Basic server with thread pool
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Server with interceptors
class LoggingInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
print(f"Handling {handler_call_details.method}")
return continuation(handler_call_details)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingInterceptor()]
)
# Server with advanced options
options = [
('grpc.keepalive_time_ms', 30000),
('grpc.max_concurrent_streams', 100),
]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=20),
options=options,
maximum_concurrent_rpcs=1000,
compression=grpc.compression.Gzip
)Core server interface providing service registration, port management, and lifecycle control.
class Server(abc.ABC):
"""Services RPCs with comprehensive lifecycle management."""
def add_generic_rpc_handlers(self, generic_rpc_handlers):
"""
Registers GenericRpcHandlers with this Server.
Must be called before server is started.
Parameters:
- generic_rpc_handlers: Iterable of GenericRpcHandlers
"""
def add_registered_method_handlers(self, service_name: str, method_handlers):
"""
Registers method handlers for a specific service.
Registered handlers take precedence over generic handlers.
Parameters:
- service_name: The service name
- method_handlers: Dictionary mapping method names to RpcMethodHandlers
"""
def add_insecure_port(self, address: str) -> int:
"""
Opens an insecure port for accepting RPCs.
Must be called before starting the server.
Parameters:
- address: Address to bind (e.g., '[::]:50051', 'localhost:0')
Returns:
int: The actual port number where server will accept requests
"""
def add_secure_port(self, address: str, server_credentials: ServerCredentials) -> int:
"""
Opens a secure port for accepting RPCs.
Must be called before starting the server.
Parameters:
- address: Address to bind
- server_credentials: ServerCredentials object for SSL/TLS
Returns:
int: The actual port number where server will accept requests
"""
def start(self):
"""
Starts this Server.
May only be called once (not idempotent).
"""
def stop(self, grace) -> threading.Event:
"""
Stops this Server with optional grace period.
Parameters:
- grace: Duration in seconds to wait for active RPCs, or None for immediate stop
Returns:
threading.Event: Event that will be set when server completely stops
"""
def wait_for_termination(self, timeout=None) -> bool:
"""
Block current thread until the server stops.
Parameters:
- timeout: Optional timeout in seconds
Returns:
bool: True if server stopped normally, False if timeout occurred
"""Usage Examples:
from concurrent import futures
import time
# Complete server lifecycle
def run_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Register services
add_MyServiceServicer_to_server(MyServiceServicer(), server)
# Add ports
insecure_port = server.add_insecure_port('[::]:0')
print(f"Server will listen on insecure port: {insecure_port}")
# Start server
server.start()
print("Server started")
try:
# Keep server running
server.wait_for_termination()
except KeyboardInterrupt:
print("Shutting down server...")
server.stop(grace=5.0).wait()
print("Server stopped")
# Secure server setup
def run_secure_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Load SSL credentials
with open('server.key', 'rb') as f:
private_key = f.read()
with open('server.crt', 'rb') as f:
certificate_chain = f.read()
credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
# Add secure port
secure_port = server.add_secure_port('[::]:50051', credentials)
print(f"Secure server listening on port: {secure_port}")
server.start()
server.wait_for_termination()Provides RPC-specific context and control for server-side method implementations.
class ServicerContext(RpcContext):
"""Context object passed to method implementations."""
def invocation_metadata(self):
"""
Accesses the metadata sent by the client.
Returns:
Metadata: The invocation metadata key-value pairs
"""
def peer(self) -> str:
"""
Identifies the peer that invoked the RPC.
Returns:
str: String identifying the peer (format determined by gRPC runtime)
"""
def peer_identities(self):
"""
Gets one or more peer identity(s).
Returns:
Iterable of bytes or None: Peer identities if authenticated, None otherwise
"""
def peer_identity_key(self):
"""
The auth property used to identify the peer.
Returns:
str or None: Auth property name or None if not authenticated
"""
def auth_context(self):
"""
Gets the auth context for the call.
Returns:
dict: Map of auth properties to iterables of bytes
"""
def send_initial_metadata(self, initial_metadata):
"""
Sends the initial metadata value to the client.
Parameters:
- initial_metadata: The initial metadata key-value pairs
"""
def set_trailing_metadata(self, trailing_metadata):
"""
Sets the trailing metadata for the RPC.
Parameters:
- trailing_metadata: The trailing metadata key-value pairs
"""
def abort(self, code: StatusCode, details: str):
"""
Raises an exception to terminate the RPC with a non-OK status.
Parameters:
- code: A StatusCode object (must not be StatusCode.OK)
- details: A UTF-8-encodable string for the client
Raises:
Exception: Always raised to signal RPC abortion
"""
def abort_with_status(self, status):
"""
Raises an exception to terminate the RPC with a status object.
Parameters:
- status: A grpc.Status object (EXPERIMENTAL)
Raises:
Exception: Always raised to signal RPC abortion
"""
def set_code(self, code: StatusCode):
"""
Sets the value to be used as status code upon RPC completion.
Parameters:
- code: A StatusCode object to be sent to the client
"""
def set_details(self, details: str):
"""
Sets the value to be used as detail string upon RPC completion.
Parameters:
- details: A UTF-8-encodable string to be sent to the client
"""
def set_compression(self, compression):
"""
Set the compression algorithm to be used for the entire call.
Parameters:
- compression: An element of grpc.compression (e.g., grpc.compression.Gzip)
"""
def disable_next_message_compression(self):
"""
Disables compression for the next response message.
Overrides any compression configuration.
"""Usage Examples:
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
def UnaryMethod(self, request, context):
# Access client metadata
metadata = dict(context.invocation_metadata())
user_agent = metadata.get('user-agent', 'unknown')
# Check authentication
if not self.is_authenticated(context):
context.abort(grpc.StatusCode.UNAUTHENTICATED, 'Authentication required')
# Send initial metadata
context.send_initial_metadata([('server-version', '1.0')])
# Process request
try:
result = self.process_request(request)
# Set trailing metadata
context.set_trailing_metadata([('processed-count', str(len(result)))])
return my_service_pb2.MyResponse(data=result)
except ValueError as e:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(f'Invalid request: {str(e)}')
raise
except Exception as e:
context.abort(grpc.StatusCode.INTERNAL, 'Internal server error')
def StreamingMethod(self, request_iterator, context):
try:
for request in request_iterator:
# Check if client cancelled
if not context.is_active():
break
# Process and yield response
response = self.process_item(request)
yield my_service_pb2.MyResponse(data=response)
except Exception as e:
context.abort(grpc.StatusCode.INTERNAL, f'Processing error: {str(e)}')Creates RPC method handlers for different RPC patterns with serialization support.
def unary_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
"""
Creates an RpcMethodHandler for a unary-unary RPC method.
Parameters:
- behavior: Implementation accepting one request and returning one response
- request_deserializer: Optional deserializer for request deserialization
- response_serializer: Optional serializer for response serialization
Returns:
RpcMethodHandler: Handler object for use with grpc.Server
"""
def unary_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
"""Creates an RpcMethodHandler for a unary-stream RPC method."""
def stream_unary_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
"""Creates an RpcMethodHandler for a stream-unary RPC method."""
def stream_stream_rpc_method_handler(behavior, request_deserializer=None, response_serializer=None) -> RpcMethodHandler:
"""Creates an RpcMethodHandler for a stream-stream RPC method."""
def method_handlers_generic_handler(service: str, method_handlers) -> GenericRpcHandler:
"""
Creates a GenericRpcHandler from RpcMethodHandlers.
Parameters:
- service: The name of the service implemented by the method handlers
- method_handlers: Dictionary mapping method names to RpcMethodHandlers
Returns:
GenericRpcHandler: Handler for adding to grpc.Server
"""Usage Example:
def my_unary_unary(request, context):
return my_service_pb2.MyResponse(message=f"Echo: {request.message}")
def my_unary_stream(request, context):
for i in range(request.count):
yield my_service_pb2.MyResponse(message=f"Item {i}")
# Create method handlers
handlers = {
'UnaryMethod': grpc.unary_unary_rpc_method_handler(
my_unary_unary,
request_deserializer=my_service_pb2.MyRequest.FromString,
response_serializer=my_service_pb2.MyResponse.SerializeToString,
),
'StreamMethod': grpc.unary_stream_rpc_method_handler(
my_unary_stream,
request_deserializer=my_service_pb2.MyRequest.FromString,
response_serializer=my_service_pb2.MyResponse.SerializeToString,
),
}
# Create generic handler
generic_handler = grpc.method_handlers_generic_handler('MyService', handlers)
# Add to server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server.add_generic_rpc_handlers([generic_handler])class RpcMethodHandler(abc.ABC):
"""
An implementation of a single RPC method.
Attributes:
- request_streaming: Whether RPC supports multiple request messages
- response_streaming: Whether RPC supports multiple response messages
- request_deserializer: Callable for request deserialization
- response_serializer: Callable for response serialization
- unary_unary: Business logic for unary-unary pattern
- unary_stream: Business logic for unary-stream pattern
- stream_unary: Business logic for stream-unary pattern
- stream_stream: Business logic for stream-stream pattern
"""
class HandlerCallDetails(abc.ABC):
"""
Describes an RPC that has just arrived for service.
Attributes:
- method: The method name of the RPC
- invocation_metadata: The metadata sent by the client
"""
class GenericRpcHandler(abc.ABC):
"""An implementation of arbitrarily many RPC methods."""
def service(self, handler_call_details: HandlerCallDetails):
"""
Returns the handler for servicing the RPC.
Parameters:
- handler_call_details: HandlerCallDetails describing the RPC
Returns:
RpcMethodHandler or None: Handler if this implementation services the RPC
"""
class ServiceRpcHandler(GenericRpcHandler):
"""
An implementation of RPC methods belonging to a service.
Handles RPCs with structured names: '/Service.Name/Service.Method'
"""
def service_name(self) -> str:
"""Returns this service's name."""Install with Tessl CLI
npx tessl i tessl/pypi-grpcio