0
# Bidirectional Streaming
1
2
Support for consuming bidirectional streaming gRPC operations in Google API client libraries. This module transforms gRPC's built-in request/response iterator interface into a more socket-like send/recv pattern, making it easier to handle long-running or asymmetric streams with automatic recovery capabilities.
3
4
## Capabilities
5
6
### Core Bidirectional RPC
7
8
Socket-like interface for bidirectional streaming RPCs with explicit control over stream lifecycle, request queuing, and response consumption.
9
10
```python { .api }
11
class BidiRpc:
12
def __init__(self, start_rpc, initial_request=None, metadata=None): ...
13
14
def add_done_callback(self, callback): ...
15
def open(self): ...
16
def close(self): ...
17
def send(self, request): ...
18
def recv(self): ...
19
20
@property
21
def is_active(self): ...
22
23
@property
24
def pending_requests(self): ...
25
```
26
27
### Usage Examples
28
29
```python
30
from google.api_core import bidi
31
import grpc
32
33
# Basic bidirectional streaming
34
def start_stream():
35
return client.bidirectional_method()
36
37
# Create and use bidirectional RPC
38
rpc = bidi.BidiRpc(start_stream)
39
rpc.open()
40
41
# Send requests
42
rpc.send(my_request)
43
44
# Receive responses
45
response = rpc.recv()
46
47
# Clean up
48
rpc.close()
49
```
50
51
### Resumable Bidirectional RPC
52
53
Enhanced bidirectional RPC with automatic recovery from transient errors and configurable retry logic.
54
55
```python { .api }
56
class ResumableBidiRpc(BidiRpc):
57
def __init__(self, start_rpc, should_recover, should_terminate=None, initial_request=None, metadata=None, throttle_reopen=False): ...
58
59
# Inherits all BidiRpc methods with enhanced error handling
60
```
61
62
### Error Recovery Configuration
63
64
```python
65
from google.api_core import exceptions
66
from google.api_core import bidi
67
68
# Define recovery predicate
69
def should_recover(exception):
70
return isinstance(exception, (
71
exceptions.InternalServerError,
72
exceptions.ServiceUnavailable,
73
exceptions.DeadlineExceeded
74
))
75
76
# Create resumable RPC with recovery
77
rpc = bidi.ResumableBidiRpc(
78
start_stream,
79
should_recover=should_recover,
80
throttle_reopen=True # Rate limit reconnections
81
)
82
```
83
84
### Background Stream Consumer
85
86
Runs bidirectional stream consumption in a separate background thread with callback-based response handling.
87
88
```python { .api }
89
class BackgroundConsumer:
90
def __init__(self, bidi_rpc, on_response, on_fatal_exception=None): ...
91
92
def start(self): ...
93
def stop(self): ...
94
def pause(self): ...
95
def resume(self): ...
96
97
@property
98
def is_active(self): ...
99
100
@property
101
def is_paused(self): ...
102
```
103
104
### Background Processing Example
105
106
```python
107
# Response handler
108
def handle_response(response):
109
print(f"Received: {response}")
110
111
# Error handler
112
def handle_error(exception):
113
print(f"Fatal error: {exception}")
114
115
# Setup background consumer
116
consumer = bidi.BackgroundConsumer(
117
rpc,
118
on_response=handle_response,
119
on_fatal_exception=handle_error
120
)
121
122
# Start background processing
123
consumer.start()
124
125
# Send requests while responses are processed in background
126
rpc.send(request1)
127
rpc.send(request2)
128
129
# Control flow
130
consumer.pause() # Pause response processing
131
consumer.resume() # Resume response processing
132
consumer.stop() # Stop and cleanup
133
```
134
135
### Request Queue Management
136
137
Internal queue-based request management with RPC lifecycle coordination and graceful shutdown handling.
138
139
```python { .api }
140
# Internal helper for request generation
141
class _RequestQueueGenerator:
142
def __init__(self, queue, period=1, initial_request=None): ...
143
```
144
145
### Rate Limiting
146
147
Thread-safe rate limiting for stream operations using sliding time windows.
148
149
```python { .api }
150
class _Throttle:
151
def __init__(self, access_limit, time_window): ...
152
153
def __enter__(self): ...
154
def __exit__(self, *_): ...
155
```
156
157
## Import Patterns
158
159
```python
160
from google.api_core import bidi
161
162
# For basic bidirectional streaming
163
rpc = bidi.BidiRpc(start_rpc_func)
164
165
# For resumable streaming with recovery
166
rpc = bidi.ResumableBidiRpc(start_rpc_func, should_recover_func)
167
168
# For background consumption
169
consumer = bidi.BackgroundConsumer(rpc, response_handler)
170
```
171
172
## Types
173
174
```python { .api }
175
from typing import Callable, Optional, Sequence, Tuple, Union
176
import datetime
177
import queue as queue_module
178
import grpc
179
180
# Type aliases
181
StartRpcCallable = grpc.StreamStreamMultiCallable
182
ResponseCallback = Callable[[Any], None]
183
ErrorCallback = Callable[[Exception], None]
184
RecoveryPredicate = Callable[[Exception], bool]
185
TerminationPredicate = Callable[[Exception], bool]
186
DoneCallback = Callable[[grpc.Future], None]
187
188
# Common parameters
189
InitialRequest = Union[Any, Callable[[], Any]] # protobuf.Message or callable
190
Metadata = Sequence[Tuple[str, str]]
191
TimeWindow = datetime.timedelta
192
```
193
194
## Error Handling
195
196
The module provides comprehensive error handling for streaming operations:
197
198
- **Transient Error Recovery**: Automatic retry on recoverable errors in `ResumableBidiRpc`
199
- **User-Defined Recovery Logic**: Custom `should_recover` and `should_terminate` functions
200
- **Rate Limiting**: Throttling of reconnection attempts to prevent overwhelming services
201
- **Thread Safety**: All operations are thread-safe with proper locking mechanisms
202
- **Graceful Shutdown**: Proper cleanup and resource management on errors and normal termination
203
204
## Stream Lifecycle
205
206
1. **Initialization**: Create `BidiRpc` or `ResumableBidiRpc` with gRPC method
207
2. **Opening**: Call `open()` to establish the stream
208
3. **Communication**: Use `send()` and `recv()` for bidirectional communication
209
4. **Error Handling**: Automatic recovery (if using `ResumableBidiRpc`) or manual error handling
210
5. **Cleanup**: Call `close()` to properly terminate the stream
211
212
For background processing, the lifecycle is managed by `BackgroundConsumer` with `start()`, pause/resume controls, and `stop()` for cleanup.