0
# RPC Patterns and Multi-Callables
1
2
Support for all four RPC patterns (unary-unary, unary-stream, stream-unary, stream-stream) with synchronous and asynchronous invocation methods, comprehensive timeout handling, metadata passing, and credential specification for flexible client-side RPC execution.
3
4
## Capabilities
5
6
### Unary-Unary Pattern
7
8
Single request to single response RPC pattern with synchronous, asynchronous, and callback-based invocation modes.
9
10
```python { .api }
11
class UnaryUnaryMultiCallable(abc.ABC):
12
"""Affords invoking a unary-unary RPC from client-side."""
13
14
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
15
"""
16
Synchronously invokes the underlying RPC.
17
18
Parameters:
19
- request: The request value for the RPC
20
- timeout: Optional duration in seconds to allow for the RPC
21
- metadata: Optional metadata to be transmitted to the service-side
22
- credentials: Optional CallCredentials for the RPC (secure Channel only)
23
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
24
- compression: Optional compression element (e.g., grpc.compression.Gzip)
25
26
Returns:
27
The response value for the RPC
28
29
Raises:
30
RpcError: Indicating that the RPC terminated with non-OK status
31
"""
32
33
def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
34
"""
35
Synchronously invokes the underlying RPC and returns Call object.
36
37
Parameters:
38
Same as __call__()
39
40
Returns:
41
tuple: (response_value, Call) - Response and Call object for RPC metadata
42
43
Raises:
44
RpcError: Indicating that the RPC terminated with non-OK status
45
"""
46
47
def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
48
"""
49
Asynchronously invokes the underlying RPC.
50
51
Parameters:
52
Same as __call__()
53
54
Returns:
55
Future: Call-Future object; result() returns response, exception() returns RpcError
56
"""
57
```
58
59
**Usage Examples:**
60
61
```python
62
# Create channel and stub
63
channel = grpc.insecure_channel('localhost:50051')
64
stub = my_service_pb2_grpc.MyServiceStub(channel)
65
66
# Synchronous call
67
request = my_service_pb2.MyRequest(message="Hello")
68
response = stub.UnaryMethod(request, timeout=5.0)
69
print(response.reply)
70
71
# Synchronous call with metadata and call info
72
metadata = [('user-agent', 'my-client/1.0')]
73
response, call = stub.UnaryMethod.with_call(
74
request,
75
timeout=5.0,
76
metadata=metadata
77
)
78
print(f"Response: {response.reply}")
79
print(f"Status code: {call.code()}")
80
print(f"Status details: {call.details()}")
81
82
# Asynchronous call with Future
83
future = stub.UnaryMethod.future(request, timeout=5.0)
84
try:
85
response = future.result(timeout=10.0)
86
print(response.reply)
87
except grpc.RpcError as e:
88
print(f"RPC failed: {e.code()} - {e.details()}")
89
except grpc.FutureTimeoutError:
90
print("Future timed out")
91
92
# With credentials and compression
93
ssl_creds = grpc.ssl_channel_credentials()
94
call_creds = grpc.access_token_call_credentials("token")
95
channel_creds = grpc.composite_channel_credentials(ssl_creds, call_creds)
96
secure_channel = grpc.secure_channel('secure-server.com:443', channel_creds)
97
secure_stub = my_service_pb2_grpc.MyServiceStub(secure_channel)
98
99
response = secure_stub.UnaryMethod(
100
request,
101
timeout=30.0,
102
compression=grpc.compression.Gzip,
103
metadata=[('request-id', 'req-123')]
104
)
105
```
106
107
### Unary-Stream Pattern
108
109
Single request to multiple response RPC pattern with iterator-based response consumption.
110
111
```python { .api }
112
class UnaryStreamMultiCallable(abc.ABC):
113
"""Affords invoking a unary-stream RPC from client-side."""
114
115
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
116
"""
117
Invokes the underlying RPC.
118
119
Parameters:
120
- request: The request value for the RPC
121
- timeout: Optional duration in seconds (None for infinite)
122
- metadata: Optional metadata to be transmitted to the service-side
123
- credentials: Optional CallCredentials for the RPC (secure Channel only)
124
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
125
- compression: Optional compression element
126
127
Returns:
128
Iterator: Call-iterator for response values and Future for RPC completion
129
130
Note:
131
Drawing response values may raise RpcError indicating non-OK termination
132
"""
133
```
134
135
**Usage Examples:**
136
137
```python
138
# Unary-stream call
139
request = my_service_pb2.StreamRequest(count=5)
140
response_iterator = stub.UnaryStreamMethod(request, timeout=30.0)
141
142
# Iterate over responses
143
try:
144
for response in response_iterator:
145
print(f"Received: {response.message}")
146
# Can break early if needed
147
if should_stop():
148
break
149
except grpc.RpcError as e:
150
print(f"Stream failed: {e.code()} - {e.details()}")
151
152
# Access call information
153
response_iterator = stub.UnaryStreamMethod(request)
154
print(f"Initial metadata: {response_iterator.initial_metadata()}")
155
156
try:
157
responses = list(response_iterator) # Consume all responses
158
print(f"Trailing metadata: {response_iterator.trailing_metadata()}")
159
print(f"Final status: {response_iterator.code()}")
160
except grpc.RpcError as e:
161
print(f"Stream terminated with error: {e}")
162
```
163
164
### Stream-Unary Pattern
165
166
Multiple request to single response RPC pattern with iterator-based request sending.
167
168
```python { .api }
169
class StreamUnaryMultiCallable(abc.ABC):
170
"""Affords invoking a stream-unary RPC from client-side."""
171
172
def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
173
"""
174
Synchronously invokes the underlying RPC.
175
176
Parameters:
177
- request_iterator: An iterator that yields request values for the RPC
178
- timeout: Optional duration in seconds (None for infinite)
179
- metadata: Optional metadata to be transmitted to the service-side
180
- credentials: Optional CallCredentials for the RPC (secure Channel only)
181
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
182
- compression: Optional compression element
183
184
Returns:
185
The response value for the RPC
186
187
Raises:
188
RpcError: Indicating that the RPC terminated with non-OK status
189
"""
190
191
def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
192
"""
193
Synchronously invokes the underlying RPC and returns Call object.
194
195
Returns:
196
tuple: (response_value, Call) - Response and Call object for RPC metadata
197
"""
198
199
def future(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
200
"""
201
Asynchronously invokes the underlying RPC.
202
203
Returns:
204
Future: Call-Future object for asynchronous result retrieval
205
"""
206
```
207
208
**Usage Examples:**
209
210
```python
211
# Stream-unary call with generator
212
def request_generator():
213
for i in range(10):
214
yield my_service_pb2.StreamRequest(data=f"item-{i}")
215
time.sleep(0.1) # Simulate processing time
216
217
response = stub.StreamUnaryMethod(request_generator(), timeout=30.0)
218
print(f"Final result: {response.summary}")
219
220
# Stream-unary with list of requests
221
requests = [
222
my_service_pb2.StreamRequest(data="first"),
223
my_service_pb2.StreamRequest(data="second"),
224
my_service_pb2.StreamRequest(data="third"),
225
]
226
227
response, call = stub.StreamUnaryMethod.with_call(iter(requests))
228
print(f"Response: {response.summary}")
229
print(f"Metadata: {dict(call.trailing_metadata())}")
230
231
# Asynchronous stream-unary
232
def async_request_generator():
233
for i in range(100):
234
yield my_service_pb2.StreamRequest(data=f"batch-{i}")
235
236
future = stub.StreamUnaryMethod.future(async_request_generator(), timeout=60.0)
237
238
# Do other work while RPC executes
239
do_other_work()
240
241
# Get result when ready
242
try:
243
response = future.result(timeout=10.0)
244
print(f"Async result: {response.summary}")
245
except grpc.FutureTimeoutError:
246
print("Still waiting for result...")
247
response = future.result() # Wait indefinitely
248
```
249
250
### Stream-Stream Pattern
251
252
Bidirectional streaming RPC pattern with full-duplex communication capabilities.
253
254
```python { .api }
255
class StreamStreamMultiCallable(abc.ABC):
256
"""Affords invoking a stream-stream RPC on client-side."""
257
258
def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
259
"""
260
Invokes the underlying RPC on the client.
261
262
Parameters:
263
- request_iterator: An iterator that yields request values for the RPC
264
- timeout: Optional duration in seconds (None for infinite)
265
- metadata: Optional metadata to be transmitted to the service-side
266
- credentials: Optional CallCredentials for the RPC (secure Channel only)
267
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
268
- compression: Optional compression element
269
270
Returns:
271
Iterator: Call-iterator for response values and Future for RPC completion
272
273
Note:
274
Drawing response values may raise RpcError indicating non-OK termination
275
"""
276
```
277
278
**Usage Examples:**
279
280
```python
281
# Bidirectional streaming
282
def request_generator():
283
for i in range(5):
284
yield my_service_pb2.ChatMessage(user="client", message=f"Message {i}")
285
time.sleep(1)
286
287
response_iterator = stub.StreamStreamMethod(request_generator())
288
289
# Process responses as they arrive
290
try:
291
for response in response_iterator:
292
print(f"Server says: {response.message}")
293
except grpc.RpcError as e:
294
print(f"Stream error: {e}")
295
296
# Chat-like bidirectional streaming
297
import threading
298
import queue
299
300
def chat_client():
301
# Queue for sending messages
302
message_queue = queue.Queue()
303
304
def request_generator():
305
while True:
306
try:
307
message = message_queue.get(timeout=1.0)
308
if message is None: # Sentinel to stop
309
break
310
yield my_service_pb2.ChatMessage(user="client", message=message)
311
except queue.Empty:
312
continue
313
314
# Start streaming RPC
315
response_iterator = stub.ChatMethod(request_generator())
316
317
# Thread to handle incoming messages
318
def handle_responses():
319
try:
320
for response in response_iterator:
321
print(f"[{response.user}]: {response.message}")
322
except grpc.RpcError as e:
323
print(f"Chat ended: {e}")
324
325
response_thread = threading.Thread(target=handle_responses)
326
response_thread.start()
327
328
# Send messages from user input
329
try:
330
while True:
331
user_input = input("You: ")
332
if user_input.lower() == '/quit':
333
break
334
message_queue.put(user_input)
335
finally:
336
message_queue.put(None) # Signal to stop request generator
337
response_thread.join()
338
339
# Advanced bidirectional streaming with flow control
340
class FlowControlledStreaming:
341
def __init__(self, stub):
342
self.stub = stub
343
self.request_queue = queue.Queue(maxsize=10) # Limit pending requests
344
self.stop_event = threading.Event()
345
346
def request_generator(self):
347
while not self.stop_event.is_set():
348
try:
349
request = self.request_queue.get(timeout=0.5)
350
yield request
351
except queue.Empty:
352
continue
353
354
def start_streaming(self):
355
response_iterator = self.stub.StreamStreamMethod(
356
self.request_generator(),
357
timeout=300.0
358
)
359
360
for response in response_iterator:
361
self.process_response(response)
362
363
# Flow control: only send new requests after processing response
364
if not self.request_queue.full():
365
self.maybe_send_request()
366
367
def send_request(self, request):
368
if not self.request_queue.full():
369
self.request_queue.put(request)
370
else:
371
print("Request queue full, dropping request")
372
373
def stop(self):
374
self.stop_event.set()
375
```
376
377
### Call Objects and Context
378
379
Access to RPC metadata, status, and control information through Call objects.
380
381
```python { .api }
382
class Call(RpcContext):
383
"""Invocation-side utility object for an RPC."""
384
385
def initial_metadata(self):
386
"""
387
Accesses the initial metadata sent by the server.
388
This method blocks until the value is available.
389
390
Returns:
391
Metadata: The initial metadata key-value pairs
392
"""
393
394
def trailing_metadata(self):
395
"""
396
Accesses the trailing metadata sent by the server.
397
This method blocks until the value is available.
398
399
Returns:
400
Metadata: The trailing metadata key-value pairs
401
"""
402
403
def code(self) -> StatusCode:
404
"""
405
Accesses the status code sent by the server.
406
This method blocks until the value is available.
407
408
Returns:
409
StatusCode: The status code value for the RPC
410
"""
411
412
def details(self) -> str:
413
"""
414
Accesses the details sent by the server.
415
This method blocks until the value is available.
416
417
Returns:
418
str: The details string of the RPC
419
"""
420
421
def is_active(self) -> bool:
422
"""
423
Describes whether the RPC is active or has terminated.
424
425
Returns:
426
bool: True if RPC is active, False otherwise
427
"""
428
429
def time_remaining(self):
430
"""
431
Describes the length of allowed time remaining for the RPC.
432
433
Returns:
434
float or None: Seconds remaining for RPC completion, or None if no deadline
435
"""
436
437
def cancel(self):
438
"""
439
Cancels the RPC.
440
Idempotent and has no effect if the RPC has already terminated.
441
"""
442
443
def add_callback(self, callback) -> bool:
444
"""
445
Registers a callback to be called on RPC termination.
446
447
Parameters:
448
- callback: A no-parameter callable to be called on RPC termination
449
450
Returns:
451
bool: True if callback was added, False if RPC already terminated
452
"""
453
```
454
455
**Usage Examples:**
456
457
```python
458
# Access call metadata and status
459
response, call = stub.UnaryMethod.with_call(request)
460
461
print(f"Initial metadata: {dict(call.initial_metadata())}")
462
print(f"Status code: {call.code()}")
463
print(f"Status details: {call.details()}")
464
print(f"Trailing metadata: {dict(call.trailing_metadata())}")
465
466
# RPC cancellation
467
future = stub.LongRunningMethod.future(request)
468
469
# Cancel after 5 seconds if not done
470
def cancel_if_needed():
471
time.sleep(5)
472
if not future.done():
473
future.cancel()
474
print("RPC cancelled due to timeout")
475
476
threading.Thread(target=cancel_if_needed).start()
477
478
try:
479
response = future.result()
480
except grpc.FutureCancelledError:
481
print("RPC was cancelled")
482
483
# Streaming with call control
484
response_iterator = stub.UnaryStreamMethod(request)
485
486
def handle_cancellation():
487
time.sleep(10)
488
if response_iterator.is_active():
489
response_iterator.cancel()
490
print("Stream cancelled")
491
492
threading.Thread(target=handle_cancellation).start()
493
494
try:
495
for response in response_iterator:
496
print(f"Response: {response}")
497
if not response_iterator.is_active():
498
break
499
except grpc.RpcError as e:
500
if e.code() == grpc.StatusCode.CANCELLED:
501
print("Stream was cancelled")
502
```
503
504
## Types
505
506
```python { .api }
507
class ClientCallDetails(abc.ABC):
508
"""
509
Describes an RPC to be invoked.
510
511
Attributes:
512
- method: The method name of the RPC
513
- timeout: Optional duration of time in seconds to allow for the RPC
514
- metadata: Optional metadata to be transmitted to the service-side
515
- credentials: Optional CallCredentials for the RPC
516
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
517
- compression: Optional compression element
518
"""
519
520
class RpcContext(abc.ABC):
521
"""Provides RPC-related information and action."""
522
523
def is_active(self) -> bool:
524
"""Returns True if RPC is active, False otherwise."""
525
526
def time_remaining(self):
527
"""Returns seconds remaining for RPC or None if no deadline."""
528
529
def cancel(self):
530
"""Cancels the RPC. Idempotent."""
531
532
def add_callback(self, callback) -> bool:
533
"""Registers callback for RPC termination."""
534
```