A better Protobuf / gRPC generator & library
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Async gRPC client stub generation with support for unary and streaming calls, timeout handling, metadata, and deadline management using grpclib.
The ServiceStub class serves as the base for all generated gRPC service client stubs, providing async communication with gRPC servers.
class ServiceStub:
"""Base class for async gRPC service stubs."""
def __init__(
self,
channel: Channel,
*,
timeout: Optional[float] = None,
deadline: Optional[Deadline] = None,
metadata: Optional[_MetadataLike] = None,
) -> None:
"""
Initialize the service stub.
Args:
channel: grpclib Channel for server communication
timeout: Default timeout in seconds for requests
deadline: Default deadline for requests
metadata: Default metadata to send with requests
"""
async def _unary_unary(
self,
route: str,
request: IProtoMessage,
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional[Deadline] = None,
metadata: Optional[_MetadataLike] = None,
) -> T:
"""
Make a unary request and return the response.
Args:
route: Service method route (e.g., "/package.Service/Method")
request: Request message instance
response_type: Response message class
timeout: Request timeout override
deadline: Request deadline override
metadata: Request metadata override
Returns:
Response message instance
"""
async def _unary_stream(
self,
route: str,
request: IProtoMessage,
response_type: Type[T],
*,
timeout: Optional[float] = None,
deadline: Optional[Deadline] = None,
metadata: Optional[_MetadataLike] = None,
) -> AsyncGenerator[T, None]:
"""
Make a unary request and return the stream response iterator.
Args:
route: Service method route
request: Request message instance
response_type: Response message class
timeout: Request timeout override
deadline: Request deadline override
metadata: Request metadata override
Yields:
Response message instances from the stream
"""import asyncio
import betterproto
from grpclib.client import Channel
from dataclasses import dataclass
# Define request/response messages
@dataclass
class HelloRequest(betterproto.Message):
name: str = betterproto.string_field(1)
@dataclass
class HelloReply(betterproto.Message):
message: str = betterproto.string_field(1)
# Define service stub
class GreeterServiceStub(betterproto.ServiceStub):
async def say_hello(
self,
request: HelloRequest,
*,
timeout: Optional[float] = None,
deadline: Optional[betterproto.Deadline] = None,
metadata: Optional[betterproto._MetadataLike] = None,
) -> HelloReply:
return await self._unary_unary(
"/helloworld.Greeter/SayHello",
request,
HelloReply,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
# Use the service
async def main():
async with Channel("localhost", 50051) as channel:
greeter = GreeterServiceStub(channel)
# Make a simple request
request = HelloRequest(name="World")
reply = await greeter.say_hello(request)
print(f"Greeting: {reply.message}")
# Run the client
asyncio.run(main())class ConfiguredGreeterStub(betterproto.ServiceStub):
def __init__(self, channel: Channel):
# Set default timeout and metadata for all requests
super().__init__(
channel,
timeout=30.0, # 30 second default timeout
metadata=[("user-agent", "my-client/1.0")]
)
async def say_hello(self, request: HelloRequest) -> HelloReply:
# Uses default timeout and metadata from __init__
return await self._unary_unary(
"/helloworld.Greeter/SayHello",
request,
HelloReply,
)
async def main():
async with Channel("localhost", 50051) as channel:
greeter = ConfiguredGreeterStub(channel)
reply = await greeter.say_hello(HelloRequest(name="World"))
print(reply.message)@dataclass
class StreamRequest(betterproto.Message):
count: int = betterproto.int32_field(1)
@dataclass
class StreamResponse(betterproto.Message):
value: int = betterproto.int32_field(1)
timestamp: str = betterproto.string_field(2)
class StreamingServiceStub(betterproto.ServiceStub):
async def get_stream(
self,
request: StreamRequest,
*,
timeout: Optional[float] = None,
) -> AsyncGenerator[StreamResponse, None]:
async for response in self._unary_stream(
"/streaming.StreamingService/GetStream",
request,
StreamResponse,
timeout=timeout,
):
yield response
# Use streaming service
async def main():
async with Channel("localhost", 50051) as channel:
service = StreamingServiceStub(channel)
request = StreamRequest(count=5)
async for response in service.get_stream(request):
print(f"Received: {response.value} at {response.timestamp}")from grpclib.exceptions import GRPCError
import grpclib.const
class RobustServiceStub(betterproto.ServiceStub):
async def call_with_retry(self, request: HelloRequest) -> HelloReply:
# Add custom metadata
metadata = [
("authorization", "Bearer my-token"),
("request-id", "req-12345")
]
try:
return await self._unary_unary(
"/helloworld.Greeter/SayHello",
request,
HelloReply,
timeout=10.0,
metadata=metadata,
)
except GRPCError as e:
if e.status == grpclib.const.Status.UNAVAILABLE:
print("Service unavailable, retrying...")
# Implement retry logic
await asyncio.sleep(1)
return await self._unary_unary(
"/helloworld.Greeter/SayHello",
request,
HelloReply,
timeout=15.0,
metadata=metadata,
)
else:
raise
async def main():
async with Channel("localhost", 50051) as channel:
service = RobustServiceStub(channel)
try:
reply = await service.call_with_retry(HelloRequest(name="World"))
print(reply.message)
except GRPCError as e:
print(f"gRPC error: {e.status} - {e.message}")class UserServiceStub(betterproto.ServiceStub):
async def get_user(self, request: GetUserRequest) -> User:
return await self._unary_unary(
"/users.UserService/GetUser",
request,
User,
)
class OrderServiceStub(betterproto.ServiceStub):
async def create_order(self, request: CreateOrderRequest) -> Order:
return await self._unary_unary(
"/orders.OrderService/CreateOrder",
request,
Order,
)
# Use multiple services with the same channel
async def main():
async with Channel("localhost", 50051) as channel:
user_service = UserServiceStub(channel)
order_service = OrderServiceStub(channel)
# Get user information
user = await user_service.get_user(GetUserRequest(user_id="123"))
# Create an order for the user
order = await order_service.create_order(
CreateOrderRequest(user_id=user.id, product_id="prod-456")
)
print(f"Created order {order.id} for user {user.name}")# Type aliases for gRPC metadata
_Value = Union[str, bytes]
_MetadataLike = Union[Mapping[str, _Value], Collection[Tuple[str, _Value]]]
# External types from grpclib (imported when needed)
from grpclib.client import Channel
from grpclib.metadata import Deadline
from grpclib._protocols import IProtoMessageInstall with Tessl CLI
npx tessl i tessl/pypi-betterproto