CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-betterproto

A better Protobuf / gRPC generator & library

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

grpc-services.mddocs/

gRPC Service Integration

Async gRPC client stub generation with support for unary and streaming calls, timeout handling, metadata, and deadline management using grpclib.

Capabilities

ServiceStub Base Class

The ServiceStub class serves as the base for all generated gRPC service client stubs, providing async communication with gRPC servers.

class ServiceStub:
    """Base class for async gRPC service stubs."""
    
    def __init__(
        self,
        channel: Channel,
        *,
        timeout: Optional[float] = None,
        deadline: Optional[Deadline] = None,
        metadata: Optional[_MetadataLike] = None,
    ) -> None:
        """
        Initialize the service stub.
        
        Args:
            channel: grpclib Channel for server communication
            timeout: Default timeout in seconds for requests
            deadline: Default deadline for requests
            metadata: Default metadata to send with requests
        """
    
    async def _unary_unary(
        self,
        route: str,
        request: IProtoMessage,
        response_type: Type[T],
        *,
        timeout: Optional[float] = None,
        deadline: Optional[Deadline] = None,
        metadata: Optional[_MetadataLike] = None,
    ) -> T:
        """
        Make a unary request and return the response.
        
        Args:
            route: Service method route (e.g., "/package.Service/Method")
            request: Request message instance
            response_type: Response message class
            timeout: Request timeout override
            deadline: Request deadline override  
            metadata: Request metadata override
            
        Returns:
            Response message instance
        """
    
    async def _unary_stream(
        self,
        route: str,
        request: IProtoMessage,
        response_type: Type[T],
        *,
        timeout: Optional[float] = None,
        deadline: Optional[Deadline] = None,
        metadata: Optional[_MetadataLike] = None,
    ) -> AsyncGenerator[T, None]:
        """
        Make a unary request and return the stream response iterator.
        
        Args:
            route: Service method route
            request: Request message instance
            response_type: Response message class
            timeout: Request timeout override
            deadline: Request deadline override
            metadata: Request metadata override
            
        Yields:
            Response message instances from the stream
        """

Usage Examples

Basic gRPC Client

import asyncio
import betterproto
from grpclib.client import Channel
from dataclasses import dataclass

# Define request/response messages
@dataclass
class HelloRequest(betterproto.Message):
    name: str = betterproto.string_field(1)

@dataclass  
class HelloReply(betterproto.Message):
    message: str = betterproto.string_field(1)

# Define service stub
class GreeterServiceStub(betterproto.ServiceStub):
    async def say_hello(
        self,
        request: HelloRequest,
        *,
        timeout: Optional[float] = None,
        deadline: Optional[betterproto.Deadline] = None,
        metadata: Optional[betterproto._MetadataLike] = None,
    ) -> HelloReply:
        return await self._unary_unary(
            "/helloworld.Greeter/SayHello",
            request,
            HelloReply,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

# Use the service
async def main():
    async with Channel("localhost", 50051) as channel:
        greeter = GreeterServiceStub(channel)
        
        # Make a simple request
        request = HelloRequest(name="World")
        reply = await greeter.say_hello(request)
        print(f"Greeting: {reply.message}")

# Run the client
asyncio.run(main())

Service with Default Configuration

class ConfiguredGreeterStub(betterproto.ServiceStub):
    def __init__(self, channel: Channel):
        # Set default timeout and metadata for all requests
        super().__init__(
            channel,
            timeout=30.0,  # 30 second default timeout
            metadata=[("user-agent", "my-client/1.0")]
        )
    
    async def say_hello(self, request: HelloRequest) -> HelloReply:
        # Uses default timeout and metadata from __init__
        return await self._unary_unary(
            "/helloworld.Greeter/SayHello",
            request,
            HelloReply,
        )

async def main():
    async with Channel("localhost", 50051) as channel:
        greeter = ConfiguredGreeterStub(channel)
        reply = await greeter.say_hello(HelloRequest(name="World"))
        print(reply.message)

Streaming gRPC Service

@dataclass
class StreamRequest(betterproto.Message):
    count: int = betterproto.int32_field(1)

@dataclass
class StreamResponse(betterproto.Message):
    value: int = betterproto.int32_field(1)
    timestamp: str = betterproto.string_field(2)

class StreamingServiceStub(betterproto.ServiceStub):
    async def get_stream(
        self,
        request: StreamRequest,
        *,
        timeout: Optional[float] = None,
    ) -> AsyncGenerator[StreamResponse, None]:
        async for response in self._unary_stream(
            "/streaming.StreamingService/GetStream",
            request,
            StreamResponse,
            timeout=timeout,
        ):
            yield response

# Use streaming service
async def main():
    async with Channel("localhost", 50051) as channel:
        service = StreamingServiceStub(channel)
        
        request = StreamRequest(count=5)
        async for response in service.get_stream(request):
            print(f"Received: {response.value} at {response.timestamp}")

Error Handling and Metadata

from grpclib.exceptions import GRPCError
import grpclib.const

class RobustServiceStub(betterproto.ServiceStub):
    async def call_with_retry(self, request: HelloRequest) -> HelloReply:
        # Add custom metadata
        metadata = [
            ("authorization", "Bearer my-token"),
            ("request-id", "req-12345")
        ]
        
        try:
            return await self._unary_unary(
                "/helloworld.Greeter/SayHello",
                request,
                HelloReply,
                timeout=10.0,
                metadata=metadata,
            )
        except GRPCError as e:
            if e.status == grpclib.const.Status.UNAVAILABLE:
                print("Service unavailable, retrying...")
                # Implement retry logic
                await asyncio.sleep(1)
                return await self._unary_unary(
                    "/helloworld.Greeter/SayHello",
                    request,
                    HelloReply,
                    timeout=15.0,
                    metadata=metadata,
                )
            else:
                raise

async def main():
    async with Channel("localhost", 50051) as channel:
        service = RobustServiceStub(channel)
        try:
            reply = await service.call_with_retry(HelloRequest(name="World"))
            print(reply.message)
        except GRPCError as e:
            print(f"gRPC error: {e.status} - {e.message}")

Multiple Services

class UserServiceStub(betterproto.ServiceStub):
    async def get_user(self, request: GetUserRequest) -> User:
        return await self._unary_unary(
            "/users.UserService/GetUser",
            request,
            User,
        )

class OrderServiceStub(betterproto.ServiceStub):
    async def create_order(self, request: CreateOrderRequest) -> Order:
        return await self._unary_unary(
            "/orders.OrderService/CreateOrder", 
            request,
            Order,
        )

# Use multiple services with the same channel
async def main():
    async with Channel("localhost", 50051) as channel:
        user_service = UserServiceStub(channel)
        order_service = OrderServiceStub(channel)
        
        # Get user information  
        user = await user_service.get_user(GetUserRequest(user_id="123"))
        
        # Create an order for the user
        order = await order_service.create_order(
            CreateOrderRequest(user_id=user.id, product_id="prod-456")
        )
        
        print(f"Created order {order.id} for user {user.name}")

Types

# Type aliases for gRPC metadata
_Value = Union[str, bytes]
_MetadataLike = Union[Mapping[str, _Value], Collection[Tuple[str, _Value]]]

# External types from grpclib (imported when needed)
from grpclib.client import Channel
from grpclib.metadata import Deadline
from grpclib._protocols import IProtoMessage

Install with Tessl CLI

npx tessl i tessl/pypi-betterproto

docs

code-generation.md

enumerations.md

grpc-services.md

index.md

message-fields.md

serialization.md

utilities.md

tile.json