or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bidirectional-streaming.mdclient-config.mddatetime.mdexceptions.mdgapic-framework.mdiam-policies.mdindex.mdoperations.mdpage-iteration.mdpath-templates.mdprotobuf-helpers.mdretry.mdtimeout.mdtransport.mduniverse-domain.md

bidirectional-streaming.mddocs/

0

# Bidirectional Streaming

1

2

Support for consuming bidirectional streaming gRPC operations in Google API client libraries. This module transforms gRPC's built-in request/response iterator interface into a more socket-like send/recv pattern, making it easier to handle long-running or asymmetric streams with automatic recovery capabilities.

3

4

## Capabilities

5

6

### Core Bidirectional RPC

7

8

Socket-like interface for bidirectional streaming RPCs with explicit control over stream lifecycle, request queuing, and response consumption.

9

10

```python { .api }

11

class BidiRpc:

12

def __init__(self, start_rpc, initial_request=None, metadata=None): ...

13

14

def add_done_callback(self, callback): ...

15

def open(self): ...

16

def close(self): ...

17

def send(self, request): ...

18

def recv(self): ...

19

20

@property

21

def is_active(self): ...

22

23

@property

24

def pending_requests(self): ...

25

```

26

27

### Usage Examples

28

29

```python

30

from google.api_core import bidi

31

import grpc

32

33

# Basic bidirectional streaming

34

def start_stream():

35

return client.bidirectional_method()

36

37

# Create and use bidirectional RPC

38

rpc = bidi.BidiRpc(start_stream)

39

rpc.open()

40

41

# Send requests

42

rpc.send(my_request)

43

44

# Receive responses

45

response = rpc.recv()

46

47

# Clean up

48

rpc.close()

49

```

50

51

### Resumable Bidirectional RPC

52

53

Enhanced bidirectional RPC with automatic recovery from transient errors and configurable retry logic.

54

55

```python { .api }

56

class ResumableBidiRpc(BidiRpc):

57

def __init__(self, start_rpc, should_recover, should_terminate=None, initial_request=None, metadata=None, throttle_reopen=False): ...

58

59

# Inherits all BidiRpc methods with enhanced error handling

60

```

61

62

### Error Recovery Configuration

63

64

```python

65

from google.api_core import exceptions

66

from google.api_core import bidi

67

68

# Define recovery predicate

69

def should_recover(exception):

70

return isinstance(exception, (

71

exceptions.InternalServerError,

72

exceptions.ServiceUnavailable,

73

exceptions.DeadlineExceeded

74

))

75

76

# Create resumable RPC with recovery

77

rpc = bidi.ResumableBidiRpc(

78

start_stream,

79

should_recover=should_recover,

80

throttle_reopen=True # Rate limit reconnections

81

)

82

```

83

84

### Background Stream Consumer

85

86

Runs bidirectional stream consumption in a separate background thread with callback-based response handling.

87

88

```python { .api }

89

class BackgroundConsumer:

90

def __init__(self, bidi_rpc, on_response, on_fatal_exception=None): ...

91

92

def start(self): ...

93

def stop(self): ...

94

def pause(self): ...

95

def resume(self): ...

96

97

@property

98

def is_active(self): ...

99

100

@property

101

def is_paused(self): ...

102

```

103

104

### Background Processing Example

105

106

```python

107

# Response handler

108

def handle_response(response):

109

print(f"Received: {response}")

110

111

# Error handler

112

def handle_error(exception):

113

print(f"Fatal error: {exception}")

114

115

# Setup background consumer

116

consumer = bidi.BackgroundConsumer(

117

rpc,

118

on_response=handle_response,

119

on_fatal_exception=handle_error

120

)

121

122

# Start background processing

123

consumer.start()

124

125

# Send requests while responses are processed in background

126

rpc.send(request1)

127

rpc.send(request2)

128

129

# Control flow

130

consumer.pause() # Pause response processing

131

consumer.resume() # Resume response processing

132

consumer.stop() # Stop and cleanup

133

```

134

135

### Request Queue Management

136

137

Internal queue-based request management with RPC lifecycle coordination and graceful shutdown handling.

138

139

```python { .api }

140

# Internal helper for request generation

141

class _RequestQueueGenerator:

142

def __init__(self, queue, period=1, initial_request=None): ...

143

```

144

145

### Rate Limiting

146

147

Thread-safe rate limiting for stream operations using sliding time windows.

148

149

```python { .api }

150

class _Throttle:

151

def __init__(self, access_limit, time_window): ...

152

153

def __enter__(self): ...

154

def __exit__(self, *_): ...

155

```

156

157

## Import Patterns

158

159

```python

160

from google.api_core import bidi

161

162

# For basic bidirectional streaming

163

rpc = bidi.BidiRpc(start_rpc_func)

164

165

# For resumable streaming with recovery

166

rpc = bidi.ResumableBidiRpc(start_rpc_func, should_recover_func)

167

168

# For background consumption

169

consumer = bidi.BackgroundConsumer(rpc, response_handler)

170

```

171

172

## Types

173

174

```python { .api }

175

from typing import Callable, Optional, Sequence, Tuple, Union

176

import datetime

177

import queue as queue_module

178

import grpc

179

180

# Type aliases

181

StartRpcCallable = grpc.StreamStreamMultiCallable

182

ResponseCallback = Callable[[Any], None]

183

ErrorCallback = Callable[[Exception], None]

184

RecoveryPredicate = Callable[[Exception], bool]

185

TerminationPredicate = Callable[[Exception], bool]

186

DoneCallback = Callable[[grpc.Future], None]

187

188

# Common parameters

189

InitialRequest = Union[Any, Callable[[], Any]] # protobuf.Message or callable

190

Metadata = Sequence[Tuple[str, str]]

191

TimeWindow = datetime.timedelta

192

```

193

194

## Error Handling

195

196

The module provides comprehensive error handling for streaming operations:

197

198

- **Transient Error Recovery**: Automatic retry on recoverable errors in `ResumableBidiRpc`

199

- **User-Defined Recovery Logic**: Custom `should_recover` and `should_terminate` functions

200

- **Rate Limiting**: Throttling of reconnection attempts to prevent overwhelming services

201

- **Thread Safety**: All operations are thread-safe with proper locking mechanisms

202

- **Graceful Shutdown**: Proper cleanup and resource management on errors and normal termination

203

204

## Stream Lifecycle

205

206

1. **Initialization**: Create `BidiRpc` or `ResumableBidiRpc` with gRPC method

207

2. **Opening**: Call `open()` to establish the stream

208

3. **Communication**: Use `send()` and `recv()` for bidirectional communication

209

4. **Error Handling**: Automatic recovery (if using `ResumableBidiRpc`) or manual error handling

210

5. **Cleanup**: Call `close()` to properly terminate the stream

211

212

For background processing, the lifecycle is managed by `BackgroundConsumer` with `start()`, pause/resume controls, and `stop()` for cleanup.