0
# gRPC Service Integration
1
2
Async gRPC client stub generation with support for unary and streaming calls, timeout handling, metadata, and deadline management using grpclib.
3
4
## Capabilities
5
6
### ServiceStub Base Class
7
8
The `ServiceStub` class serves as the base for all generated gRPC service client stubs, providing async communication with gRPC servers.
9
10
```python { .api }
11
class ServiceStub:
12
"""Base class for async gRPC service stubs."""
13
14
def __init__(
15
self,
16
channel: Channel,
17
*,
18
timeout: Optional[float] = None,
19
deadline: Optional[Deadline] = None,
20
metadata: Optional[_MetadataLike] = None,
21
) -> None:
22
"""
23
Initialize the service stub.
24
25
Args:
26
channel: grpclib Channel for server communication
27
timeout: Default timeout in seconds for requests
28
deadline: Default deadline for requests
29
metadata: Default metadata to send with requests
30
"""
31
32
async def _unary_unary(
33
self,
34
route: str,
35
request: IProtoMessage,
36
response_type: Type[T],
37
*,
38
timeout: Optional[float] = None,
39
deadline: Optional[Deadline] = None,
40
metadata: Optional[_MetadataLike] = None,
41
) -> T:
42
"""
43
Make a unary request and return the response.
44
45
Args:
46
route: Service method route (e.g., "/package.Service/Method")
47
request: Request message instance
48
response_type: Response message class
49
timeout: Request timeout override
50
deadline: Request deadline override
51
metadata: Request metadata override
52
53
Returns:
54
Response message instance
55
"""
56
57
async def _unary_stream(
58
self,
59
route: str,
60
request: IProtoMessage,
61
response_type: Type[T],
62
*,
63
timeout: Optional[float] = None,
64
deadline: Optional[Deadline] = None,
65
metadata: Optional[_MetadataLike] = None,
66
) -> AsyncGenerator[T, None]:
67
"""
68
Make a unary request and return the stream response iterator.
69
70
Args:
71
route: Service method route
72
request: Request message instance
73
response_type: Response message class
74
timeout: Request timeout override
75
deadline: Request deadline override
76
metadata: Request metadata override
77
78
Yields:
79
Response message instances from the stream
80
"""
81
```
82
83
## Usage Examples
84
85
### Basic gRPC Client
86
87
```python
88
import asyncio
89
import betterproto
90
from grpclib.client import Channel
91
from dataclasses import dataclass
92
93
# Define request/response messages
94
@dataclass
95
class HelloRequest(betterproto.Message):
96
name: str = betterproto.string_field(1)
97
98
@dataclass
99
class HelloReply(betterproto.Message):
100
message: str = betterproto.string_field(1)
101
102
# Define service stub
103
class GreeterServiceStub(betterproto.ServiceStub):
104
async def say_hello(
105
self,
106
request: HelloRequest,
107
*,
108
timeout: Optional[float] = None,
109
deadline: Optional[betterproto.Deadline] = None,
110
metadata: Optional[betterproto._MetadataLike] = None,
111
) -> HelloReply:
112
return await self._unary_unary(
113
"/helloworld.Greeter/SayHello",
114
request,
115
HelloReply,
116
timeout=timeout,
117
deadline=deadline,
118
metadata=metadata,
119
)
120
121
# Use the service
122
async def main():
123
async with Channel("localhost", 50051) as channel:
124
greeter = GreeterServiceStub(channel)
125
126
# Make a simple request
127
request = HelloRequest(name="World")
128
reply = await greeter.say_hello(request)
129
print(f"Greeting: {reply.message}")
130
131
# Run the client
132
asyncio.run(main())
133
```
134
135
### Service with Default Configuration
136
137
```python
138
class ConfiguredGreeterStub(betterproto.ServiceStub):
139
def __init__(self, channel: Channel):
140
# Set default timeout and metadata for all requests
141
super().__init__(
142
channel,
143
timeout=30.0, # 30 second default timeout
144
metadata=[("user-agent", "my-client/1.0")]
145
)
146
147
async def say_hello(self, request: HelloRequest) -> HelloReply:
148
# Uses default timeout and metadata from __init__
149
return await self._unary_unary(
150
"/helloworld.Greeter/SayHello",
151
request,
152
HelloReply,
153
)
154
155
async def main():
156
async with Channel("localhost", 50051) as channel:
157
greeter = ConfiguredGreeterStub(channel)
158
reply = await greeter.say_hello(HelloRequest(name="World"))
159
print(reply.message)
160
```
161
162
### Streaming gRPC Service
163
164
```python
165
@dataclass
166
class StreamRequest(betterproto.Message):
167
count: int = betterproto.int32_field(1)
168
169
@dataclass
170
class StreamResponse(betterproto.Message):
171
value: int = betterproto.int32_field(1)
172
timestamp: str = betterproto.string_field(2)
173
174
class StreamingServiceStub(betterproto.ServiceStub):
175
async def get_stream(
176
self,
177
request: StreamRequest,
178
*,
179
timeout: Optional[float] = None,
180
) -> AsyncGenerator[StreamResponse, None]:
181
async for response in self._unary_stream(
182
"/streaming.StreamingService/GetStream",
183
request,
184
StreamResponse,
185
timeout=timeout,
186
):
187
yield response
188
189
# Use streaming service
190
async def main():
191
async with Channel("localhost", 50051) as channel:
192
service = StreamingServiceStub(channel)
193
194
request = StreamRequest(count=5)
195
async for response in service.get_stream(request):
196
print(f"Received: {response.value} at {response.timestamp}")
197
```
198
199
### Error Handling and Metadata
200
201
```python
202
from grpclib.exceptions import GRPCError
203
import grpclib.const
204
205
class RobustServiceStub(betterproto.ServiceStub):
206
async def call_with_retry(self, request: HelloRequest) -> HelloReply:
207
# Add custom metadata
208
metadata = [
209
("authorization", "Bearer my-token"),
210
("request-id", "req-12345")
211
]
212
213
try:
214
return await self._unary_unary(
215
"/helloworld.Greeter/SayHello",
216
request,
217
HelloReply,
218
timeout=10.0,
219
metadata=metadata,
220
)
221
except GRPCError as e:
222
if e.status == grpclib.const.Status.UNAVAILABLE:
223
print("Service unavailable, retrying...")
224
# Implement retry logic
225
await asyncio.sleep(1)
226
return await self._unary_unary(
227
"/helloworld.Greeter/SayHello",
228
request,
229
HelloReply,
230
timeout=15.0,
231
metadata=metadata,
232
)
233
else:
234
raise
235
236
async def main():
237
async with Channel("localhost", 50051) as channel:
238
service = RobustServiceStub(channel)
239
try:
240
reply = await service.call_with_retry(HelloRequest(name="World"))
241
print(reply.message)
242
except GRPCError as e:
243
print(f"gRPC error: {e.status} - {e.message}")
244
```
245
246
### Multiple Services
247
248
```python
249
class UserServiceStub(betterproto.ServiceStub):
250
async def get_user(self, request: GetUserRequest) -> User:
251
return await self._unary_unary(
252
"/users.UserService/GetUser",
253
request,
254
User,
255
)
256
257
class OrderServiceStub(betterproto.ServiceStub):
258
async def create_order(self, request: CreateOrderRequest) -> Order:
259
return await self._unary_unary(
260
"/orders.OrderService/CreateOrder",
261
request,
262
Order,
263
)
264
265
# Use multiple services with the same channel
266
async def main():
267
async with Channel("localhost", 50051) as channel:
268
user_service = UserServiceStub(channel)
269
order_service = OrderServiceStub(channel)
270
271
# Get user information
272
user = await user_service.get_user(GetUserRequest(user_id="123"))
273
274
# Create an order for the user
275
order = await order_service.create_order(
276
CreateOrderRequest(user_id=user.id, product_id="prod-456")
277
)
278
279
print(f"Created order {order.id} for user {user.name}")
280
```
281
282
## Types
283
284
```python { .api }
285
# Type aliases for gRPC metadata
286
_Value = Union[str, bytes]
287
_MetadataLike = Union[Mapping[str, _Value], Collection[Tuple[str, _Value]]]
288
289
# External types from grpclib (imported when needed)
290
from grpclib.client import Channel
291
from grpclib.metadata import Deadline
292
from grpclib._protocols import IProtoMessage
293
```