HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
—
Runtime loading and compilation of Protocol Buffer definitions from .proto files, enabling dynamic service discovery, client generation without pre-compilation, and flexible protobuf-based service development with comprehensive type support.
Load and compile Protocol Buffer definitions at runtime without requiring pre-compilation, enabling dynamic service discovery and flexible development workflows.
def protos(protobuf_path: str):
"""
Loads protobuf definitions from .proto files.
Parameters:
- protobuf_path: Path to .proto files or directory containing .proto files
Returns:
Module-like object containing protobuf message classes and enums
"""
def services(protobuf_path: str):
"""
Loads service definitions from .proto files.
Parameters:
- protobuf_path: Path to .proto files or directory containing .proto files
Returns:
Module-like object containing service stub classes and servicer base classes
"""
def protos_and_services(protobuf_path: str):
"""
Loads both protobuf and service definitions from .proto files.
Parameters:
- protobuf_path: Path to .proto files or directory containing .proto files
Returns:
Tuple of (protos_module, services_module)
"""Usage Examples:
import grpc
# Load protobuf definitions from a single file
protos = grpc.protos("my_service.proto")
# Access message classes
request = protos.MyRequest(message="Hello", count=5)
print(f"Request: {request}")
# Load service definitions
services = grpc.services("my_service.proto")
# Create client stub
channel = grpc.insecure_channel('localhost:50051')
stub = services.MyServiceStub(channel)
# Make RPC call
response = stub.UnaryMethod(request)
print(f"Response: {response.reply}")
# Load both protos and services together
protos, services = grpc.protos_and_services("my_service.proto")
# Use both message classes and service stubs
request = protos.MyRequest(message="Hello")
stub = services.MyServiceStub(channel)
response = stub.UnaryMethod(request)
# Load from directory containing multiple .proto files
protos = grpc.protos("proto_directory/")
services = grpc.services("proto_directory/")
# Access types from different proto files
user_request = protos.user_pb2.GetUserRequest(user_id="123")
order_request = protos.order_pb2.CreateOrderRequest(user_id="123", items=[])Implement gRPC services dynamically using runtime-loaded protobuf definitions without requiring pre-generated code.
Usage Examples:
# Load service definitions
protos, services = grpc.protos_and_services("my_service.proto")
# Implement servicer using runtime-loaded definitions
class MyServiceServicer(services.MyServiceServicer):
def UnaryMethod(self, request, context):
# Access request fields dynamically
message = request.message
count = request.count
# Create response using runtime-loaded message class
response = protos.MyResponse()
response.reply = f"Processed: {message} (count: {count})"
response.timestamp = int(time.time())
return response
def StreamingMethod(self, request, context):
for i in range(request.count):
response = protos.MyStreamResponse()
response.index = i
response.data = f"Item {i} for {request.message}"
yield response
def BidirectionalMethod(self, request_iterator, context):
for request in request_iterator:
# Process each request
response = protos.MyBidirectionalResponse()
response.echo = f"Echo: {request.data}"
response.processed_at = time.time()
yield response
# Set up server with dynamic servicer
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Add servicer to server using runtime-loaded service definition
services.add_MyServiceServicer_to_server(MyServiceServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started with dynamically loaded service")
server.wait_for_termination()
if __name__ == '__main__':
serve()Handle complex protobuf projects with multiple .proto files, imports, and dependencies.
Example Proto Structure:
protos/
├── common/
│ ├── types.proto
│ └── errors.proto
├── user/
│ └── user_service.proto
└── order/
└── order_service.protoUsage Examples:
# Load entire proto directory structure
protos, services = grpc.protos_and_services("protos/")
# Access types from different proto files
# Assuming types.proto defines common types
common_timestamp = protos.common.types_pb2.Timestamp()
common_timestamp.seconds = int(time.time())
# User service types and stubs
user_request = protos.user.user_service_pb2.GetUserRequest(user_id="123")
user_stub = services.user.user_service_pb2_grpc.UserServiceStub(channel)
# Order service types and stubs
order_request = protos.order.order_service_pb2.CreateOrderRequest(
user_id="123",
timestamp=common_timestamp
)
order_stub = services.order.order_service_pb2_grpc.OrderServiceStub(channel)
# Make calls to different services
user = user_stub.GetUser(user_request)
order = order_stub.CreateOrder(order_request)Inspect and work with protobuf message types dynamically at runtime.
Usage Examples:
# Load protos
protos = grpc.protos("my_service.proto")
# Get message class
MyRequest = protos.MyRequest
# Inspect message fields
print("MyRequest fields:")
for field in MyRequest.DESCRIPTOR.fields:
print(f" {field.name}: {field.type}")
# Create message with dynamic field access
request = MyRequest()
# Set fields dynamically
for field in MyRequest.DESCRIPTOR.fields:
if field.name == "message":
setattr(request, field.name, "Dynamic message")
elif field.name == "count":
setattr(request, field.name, 42)
print(f"Dynamic request: {request}")
# Check if field exists
if hasattr(request, 'optional_field'):
print(f"Optional field value: {request.optional_field}")
# Handle repeated fields
if hasattr(request, 'items'):
request.items.extend(["item1", "item2", "item3"])
# Handle nested messages
if hasattr(request, 'metadata'):
request.metadata.created_by = "dynamic_client"
request.metadata.created_at = int(time.time())Work with protobuf serialization at runtime for custom transport or storage scenarios.
Usage Examples:
# Load protos
protos = grpc.protos("my_service.proto")
# Create message
request = protos.MyRequest(message="Hello", count=5)
# Serialize to bytes
serialized_data = request.SerializeToString()
print(f"Serialized size: {len(serialized_data)} bytes")
# Deserialize from bytes
new_request = protos.MyRequest()
new_request.ParseFromString(serialized_data)
print(f"Deserialized: {new_request}")
# JSON serialization (requires protobuf JSON support)
from google.protobuf import json_format
# Convert to JSON
json_data = json_format.MessageToJson(request)
print(f"JSON: {json_data}")
# Convert from JSON
json_request = protos.MyRequest()
json_format.Parse(json_data, json_request)
print(f"From JSON: {json_request}")
# Custom serialization for storage
def store_message(message, filename):
with open(filename, 'wb') as f:
f.write(message.SerializeToString())
def load_message(message_class, filename):
message = message_class()
with open(filename, 'rb') as f:
message.ParseFromString(f.read())
return message
# Store and load messages
store_message(request, "request.bin")
loaded_request = load_message(protos.MyRequest, "request.bin")Handle errors and edge cases when working with runtime-loaded protobuf definitions.
Usage Examples:
import os
import sys
def load_protos_safely(proto_path):
"""Safely load protobuf definitions with error handling."""
try:
if not os.path.exists(proto_path):
raise FileNotFoundError(f"Proto path not found: {proto_path}")
# Load protos and services
protos, services = grpc.protos_and_services(proto_path)
return protos, services
except Exception as e:
print(f"Error loading protos from {proto_path}: {e}")
sys.exit(1)
def validate_proto_structure(protos, expected_messages):
"""Validate that expected message types are available."""
missing_messages = []
for message_name in expected_messages:
if not hasattr(protos, message_name):
missing_messages.append(message_name)
if missing_messages:
raise ValueError(f"Missing expected messages: {missing_messages}")
# Safe proto loading
try:
protos, services = load_protos_safely("my_service.proto")
# Validate expected structure
expected_messages = ["MyRequest", "MyResponse", "StreamRequest"]
validate_proto_structure(protos, expected_messages)
print("Protos loaded and validated successfully")
except Exception as e:
print(f"Failed to load or validate protos: {e}")
sys.exit(1)
# Handle missing fields gracefully
def create_request_safely(protos, **kwargs):
"""Create request message with safe field setting."""
request = protos.MyRequest()
# Get available fields
available_fields = {field.name for field in request.DESCRIPTOR.fields}
# Set only available fields
for field_name, value in kwargs.items():
if field_name in available_fields:
setattr(request, field_name, value)
else:
print(f"Warning: Field '{field_name}' not available in MyRequest")
return request
# Usage
request = create_request_safely(
protos,
message="Hello",
count=5,
unknown_field="ignored" # Will be ignored with warning
)Work with advanced protobuf features like oneof fields, maps, and any types.
Usage Examples:
# Load protos with advanced features
protos = grpc.protos("advanced_service.proto")
# Handle oneof fields
request = protos.AdvancedRequest()
# Set oneof field (only one can be set at a time)
request.text_data = "Hello, world!"
# request.binary_data = b"Binary data" # Would clear text_data
# Check which oneof field is set
which_data = request.WhichOneof('data')
print(f"Active data field: {which_data}")
# Handle map fields
if hasattr(request, 'metadata'):
request.metadata['key1'] = 'value1'
request.metadata['key2'] = 'value2'
# Iterate over map
for key, value in request.metadata.items():
print(f"Metadata: {key} = {value}")
# Handle repeated fields
if hasattr(request, 'tags'):
request.tags.extend(['tag1', 'tag2', 'tag3'])
# Modify repeated field
request.tags.append('tag4')
del request.tags[0] # Remove first tag
# Handle nested messages
if hasattr(request, 'config'):
request.config.timeout = 30
request.config.retries = 3
request.config.debug = True
print(f"Advanced request: {request}")def discover_services(proto_directory):
"""Discover all available services from proto directory."""
protos, services = grpc.protos_and_services(proto_directory)
discovered_services = {}
# Inspect services module for available stubs
for attr_name in dir(services):
if attr_name.endswith('Stub'):
service_name = attr_name[:-4] # Remove 'Stub' suffix
stub_class = getattr(services, attr_name)
servicer_name = f"{service_name}Servicer"
if hasattr(services, servicer_name):
servicer_class = getattr(services, servicer_name)
discovered_services[service_name] = {
'stub': stub_class,
'servicer': servicer_class,
'methods': [method for method in dir(servicer_class)
if not method.startswith('_')]
}
return discovered_services, protos, services
# Usage
services_info, protos, services = discover_services("protos/")
for service_name, info in services_info.items():
print(f"Service: {service_name}")
print(f" Methods: {info['methods']}")import yaml
def create_clients_from_config(config_file):
"""Create gRPC clients based on configuration file."""
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
clients = {}
for service_config in config['services']:
service_name = service_config['name']
proto_path = service_config['proto_path']
server_address = service_config['address']
# Load protos for this service
_, services = grpc.protos_and_services(proto_path)
# Create channel
if service_config.get('secure', False):
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(server_address, credentials)
else:
channel = grpc.insecure_channel(server_address)
# Create stub
stub_name = f"{service_name}Stub"
if hasattr(services, stub_name):
stub_class = getattr(services, stub_name)
clients[service_name] = stub_class(channel)
return clients
# Example config.yaml:
# services:
# - name: UserService
# proto_path: protos/user_service.proto
# address: localhost:50051
# secure: false
# - name: OrderService
# proto_path: protos/order_service.proto
# address: secure-orders.example.com:443
# secure: true
# Usage
clients = create_clients_from_config("config.yaml")
user_client = clients['UserService']
order_client = clients['OrderService']# The protos() and services() functions return module-like objects
# that contain the dynamically loaded protobuf classes and service definitions
# Example of what's available after loading:
# protos = grpc.protos("my_service.proto")
# - protos.MyRequest (message class)
# - protos.MyResponse (message class)
# - protos.StatusEnum (enum class)
# services = grpc.services("my_service.proto")
# - services.MyServiceStub (client stub class)
# - services.MyServiceServicer (server servicer base class)
# - services.add_MyServiceServicer_to_server (function to add servicer to server)
# The exact contents depend on what's defined in the .proto filesInstall with Tessl CLI
npx tessl i tessl/pypi-grpcio