CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-grpcio

HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems

Pending
Overview
Eval results
Files

protobuf-integration.mddocs/

Protocol Buffers Integration

Runtime loading and compilation of Protocol Buffer definitions from .proto files, enabling dynamic service discovery, client generation without pre-compilation, and flexible protobuf-based service development with comprehensive type support.

Capabilities

Runtime Proto Loading

Load and compile Protocol Buffer definitions at runtime without requiring pre-compilation, enabling dynamic service discovery and flexible development workflows.

def protos(protobuf_path: str):
    """
    Loads protobuf definitions from .proto files.

    Parameters:
    - protobuf_path: Path to .proto files or directory containing .proto files

    Returns:
    Module-like object containing protobuf message classes and enums
    """

def services(protobuf_path: str):
    """
    Loads service definitions from .proto files.

    Parameters:
    - protobuf_path: Path to .proto files or directory containing .proto files

    Returns:
    Module-like object containing service stub classes and servicer base classes
    """

def protos_and_services(protobuf_path: str):
    """
    Loads both protobuf and service definitions from .proto files.

    Parameters:
    - protobuf_path: Path to .proto files or directory containing .proto files

    Returns:
    Tuple of (protos_module, services_module)
    """

Usage Examples:

import grpc

# Load protobuf definitions from a single file
protos = grpc.protos("my_service.proto")

# Access message classes
request = protos.MyRequest(message="Hello", count=5)
print(f"Request: {request}")

# Load service definitions
services = grpc.services("my_service.proto")

# Create client stub
channel = grpc.insecure_channel('localhost:50051')
stub = services.MyServiceStub(channel)

# Make RPC call
response = stub.UnaryMethod(request)
print(f"Response: {response.reply}")

# Load both protos and services together
protos, services = grpc.protos_and_services("my_service.proto")

# Use both message classes and service stubs
request = protos.MyRequest(message="Hello")
stub = services.MyServiceStub(channel)
response = stub.UnaryMethod(request)

# Load from directory containing multiple .proto files
protos = grpc.protos("proto_directory/")
services = grpc.services("proto_directory/")

# Access types from different proto files
user_request = protos.user_pb2.GetUserRequest(user_id="123")
order_request = protos.order_pb2.CreateOrderRequest(user_id="123", items=[])

Dynamic Service Implementation

Implement gRPC services dynamically using runtime-loaded protobuf definitions without requiring pre-generated code.

Usage Examples:

# Load service definitions
protos, services = grpc.protos_and_services("my_service.proto")

# Implement servicer using runtime-loaded definitions
class MyServiceServicer(services.MyServiceServicer):
    def UnaryMethod(self, request, context):
        # Access request fields dynamically
        message = request.message
        count = request.count
        
        # Create response using runtime-loaded message class
        response = protos.MyResponse()
        response.reply = f"Processed: {message} (count: {count})"
        response.timestamp = int(time.time())
        
        return response
    
    def StreamingMethod(self, request, context):
        for i in range(request.count):
            response = protos.MyStreamResponse()
            response.index = i
            response.data = f"Item {i} for {request.message}"
            yield response
    
    def BidirectionalMethod(self, request_iterator, context):
        for request in request_iterator:
            # Process each request
            response = protos.MyBidirectionalResponse()
            response.echo = f"Echo: {request.data}"
            response.processed_at = time.time()
            yield response

# Set up server with dynamic servicer
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # Add servicer to server using runtime-loaded service definition
    services.add_MyServiceServicer_to_server(MyServiceServicer(), server)
    
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started with dynamically loaded service")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

Multi-File Proto Projects

Handle complex protobuf projects with multiple .proto files, imports, and dependencies.

Example Proto Structure:

protos/
  ├── common/
  │   ├── types.proto
  │   └── errors.proto
  ├── user/
  │   └── user_service.proto
  └── order/
      └── order_service.proto

Usage Examples:

# Load entire proto directory structure
protos, services = grpc.protos_and_services("protos/")

# Access types from different proto files
# Assuming types.proto defines common types
common_timestamp = protos.common.types_pb2.Timestamp()
common_timestamp.seconds = int(time.time())

# User service types and stubs
user_request = protos.user.user_service_pb2.GetUserRequest(user_id="123")
user_stub = services.user.user_service_pb2_grpc.UserServiceStub(channel)

# Order service types and stubs  
order_request = protos.order.order_service_pb2.CreateOrderRequest(
    user_id="123",
    timestamp=common_timestamp
)
order_stub = services.order.order_service_pb2_grpc.OrderServiceStub(channel)

# Make calls to different services
user = user_stub.GetUser(user_request)
order = order_stub.CreateOrder(order_request)

Dynamic Type Inspection

Inspect and work with protobuf message types dynamically at runtime.

Usage Examples:

# Load protos
protos = grpc.protos("my_service.proto")

# Get message class
MyRequest = protos.MyRequest

# Inspect message fields
print("MyRequest fields:")
for field in MyRequest.DESCRIPTOR.fields:
    print(f"  {field.name}: {field.type}")

# Create message with dynamic field access
request = MyRequest()

# Set fields dynamically
for field in MyRequest.DESCRIPTOR.fields:
    if field.name == "message":
        setattr(request, field.name, "Dynamic message")
    elif field.name == "count":
        setattr(request, field.name, 42)

print(f"Dynamic request: {request}")

# Check if field exists
if hasattr(request, 'optional_field'):
    print(f"Optional field value: {request.optional_field}")

# Handle repeated fields
if hasattr(request, 'items'):
    request.items.extend(["item1", "item2", "item3"])

# Handle nested messages
if hasattr(request, 'metadata'):
    request.metadata.created_by = "dynamic_client"
    request.metadata.created_at = int(time.time())

Protobuf Serialization and Deserialization

Work with protobuf serialization at runtime for custom transport or storage scenarios.

Usage Examples:

# Load protos
protos = grpc.protos("my_service.proto")

# Create message
request = protos.MyRequest(message="Hello", count=5)

# Serialize to bytes
serialized_data = request.SerializeToString()
print(f"Serialized size: {len(serialized_data)} bytes")

# Deserialize from bytes
new_request = protos.MyRequest()
new_request.ParseFromString(serialized_data)
print(f"Deserialized: {new_request}")

# JSON serialization (requires protobuf JSON support)
from google.protobuf import json_format

# Convert to JSON
json_data = json_format.MessageToJson(request)
print(f"JSON: {json_data}")

# Convert from JSON
json_request = protos.MyRequest()
json_format.Parse(json_data, json_request)
print(f"From JSON: {json_request}")

# Custom serialization for storage
def store_message(message, filename):
    with open(filename, 'wb') as f:
        f.write(message.SerializeToString())

def load_message(message_class, filename):
    message = message_class()
    with open(filename, 'rb') as f:
        message.ParseFromString(f.read())
    return message

# Store and load messages
store_message(request, "request.bin")
loaded_request = load_message(protos.MyRequest, "request.bin")

Error Handling with Dynamic Protos

Handle errors and edge cases when working with runtime-loaded protobuf definitions.

Usage Examples:

import os
import sys

def load_protos_safely(proto_path):
    """Safely load protobuf definitions with error handling."""
    try:
        if not os.path.exists(proto_path):
            raise FileNotFoundError(f"Proto path not found: {proto_path}")
        
        # Load protos and services
        protos, services = grpc.protos_and_services(proto_path)
        return protos, services
        
    except Exception as e:
        print(f"Error loading protos from {proto_path}: {e}")
        sys.exit(1)

def validate_proto_structure(protos, expected_messages):
    """Validate that expected message types are available."""
    missing_messages = []
    
    for message_name in expected_messages:
        if not hasattr(protos, message_name):
            missing_messages.append(message_name)
    
    if missing_messages:
        raise ValueError(f"Missing expected messages: {missing_messages}")

# Safe proto loading
try:
    protos, services = load_protos_safely("my_service.proto")
    
    # Validate expected structure
    expected_messages = ["MyRequest", "MyResponse", "StreamRequest"]
    validate_proto_structure(protos, expected_messages)
    
    print("Protos loaded and validated successfully")
    
except Exception as e:
    print(f"Failed to load or validate protos: {e}")
    sys.exit(1)

# Handle missing fields gracefully
def create_request_safely(protos, **kwargs):
    """Create request message with safe field setting."""
    request = protos.MyRequest()
    
    # Get available fields
    available_fields = {field.name for field in request.DESCRIPTOR.fields}
    
    # Set only available fields
    for field_name, value in kwargs.items():
        if field_name in available_fields:
            setattr(request, field_name, value)
        else:
            print(f"Warning: Field '{field_name}' not available in MyRequest")
    
    return request

# Usage
request = create_request_safely(
    protos,
    message="Hello",
    count=5,
    unknown_field="ignored"  # Will be ignored with warning
)

Advanced Proto Features

Work with advanced protobuf features like oneof fields, maps, and any types.

Usage Examples:

# Load protos with advanced features
protos = grpc.protos("advanced_service.proto")

# Handle oneof fields
request = protos.AdvancedRequest()

# Set oneof field (only one can be set at a time)
request.text_data = "Hello, world!"
# request.binary_data = b"Binary data"  # Would clear text_data

# Check which oneof field is set
which_data = request.WhichOneof('data')
print(f"Active data field: {which_data}")

# Handle map fields
if hasattr(request, 'metadata'):
    request.metadata['key1'] = 'value1'
    request.metadata['key2'] = 'value2'
    
    # Iterate over map
    for key, value in request.metadata.items():
        print(f"Metadata: {key} = {value}")

# Handle repeated fields
if hasattr(request, 'tags'):
    request.tags.extend(['tag1', 'tag2', 'tag3'])
    
    # Modify repeated field
    request.tags.append('tag4')
    del request.tags[0]  # Remove first tag

# Handle nested messages
if hasattr(request, 'config'):
    request.config.timeout = 30
    request.config.retries = 3
    request.config.debug = True

print(f"Advanced request: {request}")

Integration Patterns

Service Discovery Pattern

def discover_services(proto_directory):
    """Discover all available services from proto directory."""
    protos, services = grpc.protos_and_services(proto_directory)
    
    discovered_services = {}
    
    # Inspect services module for available stubs
    for attr_name in dir(services):
        if attr_name.endswith('Stub'):
            service_name = attr_name[:-4]  # Remove 'Stub' suffix
            stub_class = getattr(services, attr_name)
            servicer_name = f"{service_name}Servicer"
            
            if hasattr(services, servicer_name):
                servicer_class = getattr(services, servicer_name)
                discovered_services[service_name] = {
                    'stub': stub_class,
                    'servicer': servicer_class,
                    'methods': [method for method in dir(servicer_class) 
                              if not method.startswith('_')]
                }
    
    return discovered_services, protos, services

# Usage
services_info, protos, services = discover_services("protos/")
for service_name, info in services_info.items():
    print(f"Service: {service_name}")
    print(f"  Methods: {info['methods']}")

Configuration-Driven Client

import yaml

def create_clients_from_config(config_file):
    """Create gRPC clients based on configuration file."""
    with open(config_file, 'r') as f:
        config = yaml.safe_load(f)
    
    clients = {}
    
    for service_config in config['services']:
        service_name = service_config['name']
        proto_path = service_config['proto_path']
        server_address = service_config['address']
        
        # Load protos for this service
        _, services = grpc.protos_and_services(proto_path)
        
        # Create channel
        if service_config.get('secure', False):
            credentials = grpc.ssl_channel_credentials()
            channel = grpc.secure_channel(server_address, credentials)
        else:
            channel = grpc.insecure_channel(server_address)
        
        # Create stub
        stub_name = f"{service_name}Stub"
        if hasattr(services, stub_name):
            stub_class = getattr(services, stub_name)
            clients[service_name] = stub_class(channel)
    
    return clients

# Example config.yaml:
# services:
#   - name: UserService
#     proto_path: protos/user_service.proto
#     address: localhost:50051
#     secure: false
#   - name: OrderService
#     proto_path: protos/order_service.proto
#     address: secure-orders.example.com:443
#     secure: true

# Usage
clients = create_clients_from_config("config.yaml")
user_client = clients['UserService']
order_client = clients['OrderService']

Types

# The protos() and services() functions return module-like objects
# that contain the dynamically loaded protobuf classes and service definitions

# Example of what's available after loading:
# protos = grpc.protos("my_service.proto")
# - protos.MyRequest (message class)
# - protos.MyResponse (message class)  
# - protos.StatusEnum (enum class)

# services = grpc.services("my_service.proto")
# - services.MyServiceStub (client stub class)
# - services.MyServiceServicer (server servicer base class)
# - services.add_MyServiceServicer_to_server (function to add servicer to server)

# The exact contents depend on what's defined in the .proto files

Install with Tessl CLI

npx tessl i tessl/pypi-grpcio

docs

async-api.md

channel-management.md

error-handling.md

index.md

interceptors.md

protobuf-integration.md

rpc-patterns.md

security-authentication.md

server-implementation.md

tile.json