or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bidirectional-streaming.mdclient-config.mddatetime.mdexceptions.mdgapic-framework.mdiam-policies.mdindex.mdoperations.mdpage-iteration.mdpath-templates.mdprotobuf-helpers.mdretry.mdtimeout.mdtransport.mduniverse-domain.md

transport.mddocs/

0

# Transport Helpers

1

2

Utilities for gRPC and REST transport protocols, including channel creation, method wrapping, and streaming support. These helpers provide consistent interfaces for different transport protocols used by Google APIs.

3

4

## Capabilities

5

6

### gRPC Channel Management

7

8

Functions for creating and configuring gRPC channels with authentication and transport options.

9

10

```python { .api }

11

def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):

12

"""

13

Create a gRPC channel with authentication and configuration.

14

15

Args:

16

target (str): Target server address (host:port)

17

credentials: Google Auth credentials object

18

scopes (List[str], optional): OAuth 2.0 scopes for authentication

19

ssl_credentials: gRPC SSL credentials

20

credentials_file (str, optional): Path to service account file

21

quota_project_id (str, optional): Project ID for quota attribution

22

default_scopes (List[str], optional): Default scopes if none provided

23

default_host (str, optional): Default host for the service

24

compression: gRPC compression algorithm

25

**kwargs: Additional channel arguments

26

27

Returns:

28

grpc.Channel: Configured gRPC channel

29

"""

30

31

async def create_channel_async(target, credentials=None, scopes=None, ssl_credentials=None, credentials_file=None, quota_project_id=None, default_scopes=None, default_host=None, compression=None, **kwargs):

32

"""

33

Create an async gRPC channel with authentication and configuration.

34

35

Args:

36

target (str): Target server address (host:port)

37

credentials: Google Auth credentials object

38

scopes (List[str], optional): OAuth 2.0 scopes for authentication

39

ssl_credentials: gRPC SSL credentials

40

credentials_file (str, optional): Path to service account file

41

quota_project_id (str, optional): Project ID for quota attribution

42

default_scopes (List[str], optional): Default scopes if none provided

43

default_host (str, optional): Default host for the service

44

compression: gRPC compression algorithm

45

**kwargs: Additional channel arguments

46

47

Returns:

48

grpc.aio.Channel: Configured async gRPC channel

49

"""

50

```

51

52

### Method Wrapping

53

54

Functions for applying retry, timeout, and other decorators to gRPC and REST methods.

55

56

```python { .api }

57

def wrap_method(func, default_retry=None, default_timeout=None, client_info=None):

58

"""

59

Apply retry, timeout, and client info to a gRPC method.

60

61

Args:

62

func (Callable): gRPC method to wrap

63

default_retry (Retry, optional): Default retry configuration

64

default_timeout (Timeout, optional): Default timeout configuration

65

client_info (ClientInfo, optional): Client information for headers

66

67

Returns:

68

Callable: Wrapped method with applied decorators

69

"""

70

71

async def wrap_method_async(func, default_retry=None, default_timeout=None, client_info=None):

72

"""

73

Apply async retry, timeout, and client info to an async gRPC method.

74

75

Args:

76

func (Callable): Async gRPC method to wrap

77

default_retry (AsyncRetry, optional): Default async retry configuration

78

default_timeout (Timeout, optional): Default timeout configuration

79

client_info (ClientInfo, optional): Client information for headers

80

81

Returns:

82

Callable: Wrapped async method with applied decorators

83

"""

84

```

85

86

### Streaming Support

87

88

Classes and utilities for handling gRPC streaming responses.

89

90

```python { .api }

91

class GrpcStream:

92

"""

93

Wrapper for gRPC streaming responses with additional functionality.

94

95

Args:

96

wrapped: Original gRPC stream iterator

97

"""

98

def __init__(self, wrapped): ...

99

100

def __iter__(self):

101

"""Return iterator for stream."""

102

return self

103

104

def __next__(self):

105

"""Get next item from stream."""

106

107

def cancel(self):

108

"""Cancel the streaming operation."""

109

110

@property

111

def cancelled(self):

112

"""Check if stream was cancelled."""

113

114

class _StreamingResponseIterator:

115

"""Iterator wrapper for gRPC streaming responses."""

116

def __init__(self, wrapped): ...

117

def __iter__(self): ...

118

def __next__(self): ...

119

```

120

121

### Transport Constants

122

123

Constants indicating availability of optional transport dependencies.

124

125

```python { .api }

126

# Boolean indicating if grpc_gcp is available

127

HAS_GRPC_GCP = True # or False based on installation

128

129

# Protobuf version string

130

PROTOBUF_VERSION = "4.25.1"

131

```

132

133

## Usage Examples

134

135

### Creating gRPC Channels

136

137

```python

138

from google.api_core import grpc_helpers

139

from google.auth import default

140

141

# Create channel with default credentials

142

credentials, project = default()

143

channel = grpc_helpers.create_channel(

144

target="googleapis.com:443",

145

credentials=credentials,

146

scopes=["https://www.googleapis.com/auth/cloud-platform"]

147

)

148

149

# Create channel with service account

150

channel = grpc_helpers.create_channel(

151

target="googleapis.com:443",

152

credentials_file="/path/to/service-account.json",

153

quota_project_id="my-project-id"

154

)

155

156

# Create channel with custom options

157

channel = grpc_helpers.create_channel(

158

target="googleapis.com:443",

159

credentials=credentials,

160

compression=grpc.Compression.Gzip,

161

options=[

162

("grpc.keepalive_time_ms", 30000),

163

("grpc.keepalive_timeout_ms", 5000),

164

("grpc.http2.max_pings_without_data", 0)

165

]

166

)

167

```

168

169

### Method Wrapping with Retry and Timeout

170

171

```python

172

from google.api_core import grpc_helpers

173

from google.api_core import retry

174

from google.api_core import timeout

175

from google.api_core import client_info

176

import grpc

177

178

# Create retry and timeout configurations

179

retry_config = retry.Retry(

180

predicate=retry.if_exception_type(

181

grpc.StatusCode.UNAVAILABLE,

182

grpc.StatusCode.DEADLINE_EXCEEDED

183

),

184

initial=1.0,

185

maximum=60.0,

186

multiplier=2.0

187

)

188

189

timeout_config = timeout.TimeToDeadlineTimeout(deadline=300.0)

190

191

client_info_obj = client_info.ClientInfo(

192

client_library_name="my-client",

193

client_library_version="1.0.0"

194

)

195

196

# Wrap a gRPC method

197

def create_wrapped_method(stub_method):

198

return grpc_helpers.wrap_method(

199

stub_method,

200

default_retry=retry_config,

201

default_timeout=timeout_config,

202

client_info=client_info_obj

203

)

204

205

# Use with a gRPC client

206

from my_service_pb2_grpc import MyServiceStub

207

208

channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)

209

stub = MyServiceStub(channel)

210

211

# Wrap methods with retry/timeout

212

wrapped_method = create_wrapped_method(stub.GetData)

213

214

# Call wrapped method - retry and timeout applied automatically

215

try:

216

response = wrapped_method(request)

217

print("Success:", response)

218

except grpc.RpcError as e:

219

print("gRPC error:", e)

220

```

221

222

### Async gRPC Usage

223

224

```python

225

import asyncio

226

from google.api_core import grpc_helpers_async

227

from google.api_core import retry

228

from google.auth import default

229

230

async def async_grpc_example():

231

# Create async credentials

232

credentials, project = default()

233

234

# Create async gRPC channel

235

channel = await grpc_helpers_async.create_channel(

236

target="googleapis.com:443",

237

credentials=credentials,

238

scopes=["https://www.googleapis.com/auth/cloud-platform"]

239

)

240

241

# Create async retry configuration

242

async_retry = retry.AsyncRetry(

243

predicate=retry.if_exception_type(

244

grpc.StatusCode.UNAVAILABLE,

245

grpc.StatusCode.INTERNAL

246

),

247

initial=1.0,

248

maximum=30.0

249

)

250

251

# Use async stub with wrapped methods

252

from my_service_pb2_grpc import MyServiceStub

253

254

stub = MyServiceStub(channel)

255

wrapped_method = grpc_helpers_async.wrap_method(

256

stub.GetDataAsync,

257

default_retry=async_retry

258

)

259

260

try:

261

response = await wrapped_method(request)

262

print("Async response:", response)

263

except grpc.RpcError as e:

264

print("Async gRPC error:", e)

265

finally:

266

await channel.close()

267

268

asyncio.run(async_grpc_example())

269

```

270

271

### Streaming Operations

272

273

```python

274

from google.api_core import grpc_helpers

275

import grpc

276

277

# Create streaming client

278

channel = grpc_helpers.create_channel("api.example.com:443", credentials=credentials)

279

stub = MyServiceStub(channel)

280

281

# Server streaming

282

def handle_server_stream():

283

request = StreamRequest(query="data")

284

stream = stub.StreamData(request)

285

286

# Wrap stream for additional functionality

287

wrapped_stream = grpc_helpers.GrpcStream(stream)

288

289

try:

290

for response in wrapped_stream:

291

print("Streamed data:", response.data)

292

293

# Check for cancellation

294

if some_cancellation_condition():

295

wrapped_stream.cancel()

296

break

297

298

except grpc.RpcError as e:

299

if wrapped_stream.cancelled:

300

print("Stream was cancelled")

301

else:

302

print("Stream error:", e)

303

304

# Bidirectional streaming

305

def handle_bidirectional_stream():

306

def request_generator():

307

for i in range(10):

308

yield BidiRequest(id=i, data=f"request_{i}")

309

310

stream = stub.BidirectionalStream(request_generator())

311

wrapped_stream = grpc_helpers.GrpcStream(stream)

312

313

try:

314

for response in wrapped_stream:

315

print("Bidirectional response:", response.result)

316

except grpc.RpcError as e:

317

print("Bidirectional stream error:", e)

318

319

handle_server_stream()

320

handle_bidirectional_stream()

321

```

322

323

### Custom Transport Configuration

324

325

```python

326

from google.api_core import grpc_helpers

327

import grpc

328

329

def create_custom_channel():

330

"""Create gRPC channel with custom configuration."""

331

332

# Custom gRPC options for performance tuning

333

options = [

334

# Connection keepalive settings

335

("grpc.keepalive_time_ms", 30000),

336

("grpc.keepalive_timeout_ms", 5000),

337

("grpc.keepalive_permit_without_calls", True),

338

339

# Message size limits

340

("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB

341

("grpc.max_receive_message_length", 100 * 1024 * 1024), # 100MB

342

343

# Connection pool settings

344

("grpc.http2.max_pings_without_data", 0),

345

("grpc.http2.min_time_between_pings_ms", 10000),

346

]

347

348

# Create channel with custom options

349

channel = grpc_helpers.create_channel(

350

target="api.example.com:443",

351

credentials=credentials,

352

compression=grpc.Compression.Gzip,

353

options=options

354

)

355

356

return channel

357

358

# Use custom channel

359

custom_channel = create_custom_channel()

360

stub = MyServiceStub(custom_channel)

361

```