0
# Transport Helpers
1
2
Utilities for gRPC and REST transport protocols, including channel creation, method wrapping, and streaming support. These helpers provide consistent interfaces for different transport protocols used by Google APIs.
3
4
## Capabilities
5
6
### gRPC Channel Management
7
8
Functions for creating and configuring gRPC channels with authentication and transport options.
9
10
```python { .api }
11
def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):
12
"""
13
Create a gRPC channel with authentication and configuration.
14
15
Args:
16
target (str): Target server address (host:port)
17
credentials: Google Auth credentials object
18
scopes (List[str], optional): OAuth 2.0 scopes for authentication
19
ssl_credentials: gRPC SSL credentials
20
credentials_file (str, optional): Path to service account file
21
quota_project_id (str, optional): Project ID for quota attribution
22
default_scopes (List[str], optional): Default scopes if none provided
23
default_host (str, optional): Default host for the service
24
compression: gRPC compression algorithm
25
**kwargs: Additional channel arguments
26
27
Returns:
28
grpc.Channel: Configured gRPC channel
29
"""
30
31
async def create_channel_async(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):
32
"""
33
Create an async gRPC channel with authentication and configuration.
34
35
Args:
36
target (str): Target server address (host:port)
37
credentials: Google Auth credentials object
38
scopes (List[str], optional): OAuth 2.0 scopes for authentication
39
ssl_credentials: gRPC SSL credentials
40
credentials_file (str, optional): Path to service account file
41
quota_project_id (str, optional): Project ID for quota attribution
42
default_scopes (List[str], optional): Default scopes if none provided
43
default_host (str, optional): Default host for the service
44
compression: gRPC compression algorithm
45
**kwargs: Additional channel arguments
46
47
Returns:
48
grpc.aio.Channel: Configured async gRPC channel
49
"""
50
```
51
52
### Method Wrapping
53
54
Functions for applying retry, timeout, and other decorators to gRPC and REST methods.
55
56
```python { .api }
57
def wrap_method(func, default_retry=None, default_timeout=None, client_info=None):
58
"""
59
Apply retry, timeout, and client info to a gRPC method.
60
61
Args:
62
func (Callable): gRPC method to wrap
63
default_retry (Retry, optional): Default retry configuration
64
default_timeout (Timeout, optional): Default timeout configuration
65
client_info (ClientInfo, optional): Client information for headers
66
67
Returns:
68
Callable: Wrapped method with applied decorators
69
"""
70
71
async def wrap_method_async(func, default_retry=None, default_timeout=None, client_info=None):
72
"""
73
Apply async retry, timeout, and client info to an async gRPC method.
74
75
Args:
76
func (Callable): Async gRPC method to wrap
77
default_retry (AsyncRetry, optional): Default async retry configuration
78
default_timeout (Timeout, optional): Default timeout configuration
79
client_info (ClientInfo, optional): Client information for headers
80
81
Returns:
82
Callable: Wrapped async method with applied decorators
83
"""
84
```
85
86
### Streaming Support
87
88
Classes and utilities for handling gRPC streaming responses.
89
90
```python { .api }
91
class GrpcStream:
92
"""
93
Wrapper for gRPC streaming responses with additional functionality.
94
95
Args:
96
wrapped: Original gRPC stream iterator
97
"""
98
def __init__(self, wrapped): ...
99
100
def __iter__(self):
101
"""Return iterator for stream."""
102
return self
103
104
def __next__(self):
105
"""Get next item from stream."""
106
107
def cancel(self):
108
"""Cancel the streaming operation."""
109
110
@property
111
def cancelled(self):
112
"""Check if stream was cancelled."""
113
114
class _StreamingResponseIterator:
115
"""Iterator wrapper for gRPC streaming responses."""
116
def __init__(self, wrapped): ...
117
def __iter__(self): ...
118
def __next__(self): ...
119
```
120
121
### Transport Constants
122
123
Constants indicating availability of optional transport dependencies.
124
125
```python { .api }
126
# Boolean indicating if grpc_gcp is available
127
HAS_GRPC_GCP = True # or False based on installation
128
129
# Protobuf version string
130
PROTOBUF_VERSION = "4.25.1"
131
```
132
133
## Usage Examples
134
135
### Creating gRPC Channels
136
137
```python
138
from google.api_core import grpc_helpers
139
from google.auth import default
140
141
# Create channel with default credentials
142
credentials, project = default()
143
channel = grpc_helpers.create_channel(
144
target="googleapis.com:443",
145
credentials=credentials,
146
scopes=["https://www.googleapis.com/auth/cloud-platform"]
147
)
148
149
# Create channel with service account
150
channel = grpc_helpers.create_channel(
151
target="googleapis.com:443",
152
credentials_file="/path/to/service-account.json",
153
quota_project_id="my-project-id"
154
)
155
156
# Create channel with custom options
157
channel = grpc_helpers.create_channel(
158
target="googleapis.com:443",
159
credentials=credentials,
160
compression=grpc.Compression.Gzip,
161
options=[
162
("grpc.keepalive_time_ms", 30000),
163
("grpc.keepalive_timeout_ms", 5000),
164
("grpc.http2.max_pings_without_data", 0)
165
]
166
)
167
```
168
169
### Method Wrapping with Retry and Timeout
170
171
```python
172
from google.api_core import grpc_helpers
173
from google.api_core import retry
174
from google.api_core import timeout
175
from google.api_core import client_info
176
import grpc
177
178
# Create retry and timeout configurations
179
retry_config = retry.Retry(
180
predicate=retry.if_exception_type(
181
grpc.StatusCode.UNAVAILABLE,
182
grpc.StatusCode.DEADLINE_EXCEEDED
183
),
184
initial=1.0,
185
maximum=60.0,
186
multiplier=2.0
187
)
188
189
timeout_config = timeout.TimeToDeadlineTimeout(deadline=300.0)
190
191
client_info_obj = client_info.ClientInfo(
192
client_library_name="my-client",
193
client_library_version="1.0.0"
194
)
195
196
# Wrap a gRPC method
197
def create_wrapped_method(stub_method):
198
return grpc_helpers.wrap_method(
199
stub_method,
200
default_retry=retry_config,
201
default_timeout=timeout_config,
202
client_info=client_info_obj
203
)
204
205
# Use with a gRPC client
206
from my_service_pb2_grpc import MyServiceStub
207
208
channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)
209
stub = MyServiceStub(channel)
210
211
# Wrap methods with retry/timeout
212
wrapped_method = create_wrapped_method(stub.GetData)
213
214
# Call wrapped method - retry and timeout applied automatically
215
try:
216
response = wrapped_method(request)
217
print("Success:", response)
218
except grpc.RpcError as e:
219
print("gRPC error:", e)
220
```
221
222
### Async gRPC Usage
223
224
```python
225
import asyncio
226
from google.api_core import grpc_helpers_async
227
from google.api_core import retry
228
from google.auth import default
229
230
async def async_grpc_example():
231
# Create async credentials
232
credentials, project = default()
233
234
# Create async gRPC channel
235
channel = await grpc_helpers_async.create_channel(
236
target="googleapis.com:443",
237
credentials=credentials,
238
scopes=["https://www.googleapis.com/auth/cloud-platform"]
239
)
240
241
# Create async retry configuration
242
async_retry = retry.AsyncRetry(
243
predicate=retry.if_exception_type(
244
grpc.StatusCode.UNAVAILABLE,
245
grpc.StatusCode.INTERNAL
246
),
247
initial=1.0,
248
maximum=30.0
249
)
250
251
# Use async stub with wrapped methods
252
from my_service_pb2_grpc import MyServiceStub
253
254
stub = MyServiceStub(channel)
255
wrapped_method = grpc_helpers_async.wrap_method(
256
stub.GetDataAsync,
257
default_retry=async_retry
258
)
259
260
try:
261
response = await wrapped_method(request)
262
print("Async response:", response)
263
except grpc.RpcError as e:
264
print("Async gRPC error:", e)
265
finally:
266
await channel.close()
267
268
asyncio.run(async_grpc_example())
269
```
270
271
### Streaming Operations
272
273
```python
274
from google.api_core import grpc_helpers
275
import grpc
276
277
# Create streaming client
278
channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)
279
stub = MyServiceStub(channel)
280
281
# Server streaming
282
def handle_server_stream():
283
request = StreamRequest(query="data")
284
stream = stub.StreamData(request)
285
286
# Wrap stream for additional functionality
287
wrapped_stream = grpc_helpers.GrpcStream(stream)
288
289
try:
290
for response in wrapped_stream:
291
print("Streamed data:", response.data)
292
293
# Check for cancellation
294
if some_cancellation_condition():
295
wrapped_stream.cancel()
296
break
297
298
except grpc.RpcError as e:
299
if wrapped_stream.cancelled:
300
print("Stream was cancelled")
301
else:
302
print("Stream error:", e)
303
304
# Bidirectional streaming
305
def handle_bidirectional_stream():
306
def request_generator():
307
for i in range(10):
308
yield BidiRequest(id=i, data=f"request_{i}")
309
310
stream = stub.BidirectionalStream(request_generator())
311
wrapped_stream = grpc_helpers.GrpcStream(stream)
312
313
try:
314
for response in wrapped_stream:
315
print("Bidirectional response:", response.result)
316
except grpc.RpcError as e:
317
print("Bidirectional stream error:", e)
318
319
handle_server_stream()
320
handle_bidirectional_stream()
321
```
322
323
### Custom Transport Configuration
324
325
```python
326
from google.api_core import grpc_helpers
327
import grpc
328
329
def create_custom_channel():
330
"""Create gRPC channel with custom configuration."""
331
332
# Custom gRPC options for performance tuning
333
options = [
334
# Connection keepalive settings
335
("grpc.keepalive_time_ms", 30000),
336
("grpc.keepalive_timeout_ms", 5000),
337
("grpc.keepalive_permit_without_calls", True),
338
339
# Message size limits
340
("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB
341
("grpc.max_receive_message_length", 100 * 1024 * 1024), # 100MB
342
343
# Connection pool settings
344
("grpc.http2.max_pings_without_data", 0),
345
("grpc.http2.min_time_between_pings_ms", 10000),
346
]
347
348
# Create channel with custom options
349
channel = grpc_helpers.create_channel(
350
target="api.example.com:443",
351
credentials=credentials,
352
compression=grpc.Compression.Gzip,
353
options=options
354
)
355
356
return channel
357
358
# Use custom channel
359
custom_channel = create_custom_channel()
360
stub = MyServiceStub(custom_channel)
361
```