or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Airflow Backport Providers gRPC

1

2

A backport provider package that enables Apache Airflow 1.10.x installations to use gRPC functionality that was originally developed for Airflow 2.0. The package provides hooks and operators for establishing gRPC connections and executing gRPC calls within Airflow DAGs, with support for multiple authentication methods and both unary and streaming calls.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-backport-providers-grpc

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-backport-providers-grpc`

9

- **Python Version**: Python 3.6+

10

11

## Core Imports

12

13

```python

14

from airflow.providers.grpc.hooks.grpc import GrpcHook

15

from airflow.providers.grpc.operators.grpc import GrpcOperator

16

```

17

18

## Basic Usage

19

20

```python

21

from airflow import DAG

22

from airflow.providers.grpc.hooks.grpc import GrpcHook

23

from airflow.providers.grpc.operators.grpc import GrpcOperator

24

from datetime import datetime, timedelta

25

26

# Using GrpcHook directly

27

def use_grpc_hook():

28

hook = GrpcHook(grpc_conn_id='my_grpc_connection')

29

30

# Execute a gRPC call

31

responses = hook.run(

32

stub_class=MyStubClass,

33

call_func='my_method',

34

data={'param1': 'value1', 'param2': 'value2'}

35

)

36

37

for response in responses:

38

print(response)

39

40

# Using GrpcOperator in DAG

41

default_args = {

42

'owner': 'airflow',

43

'depends_on_past': False,

44

'start_date': datetime(2021, 1, 1),

45

'retries': 1,

46

'retry_delay': timedelta(minutes=5),

47

}

48

49

dag = DAG(

50

'grpc_example',

51

default_args=default_args,

52

schedule_interval=timedelta(days=1),

53

)

54

55

grpc_task = GrpcOperator(

56

task_id='call_grpc_service',

57

stub_class=MyStubClass,

58

call_func='my_method',

59

grpc_conn_id='my_grpc_connection',

60

data={'input': 'test_data'},

61

dag=dag,

62

)

63

```

64

65

## Capabilities

66

67

### gRPC Hook

68

69

Provides low-level gRPC connection management and call execution capabilities. The GrpcHook establishes connections to gRPC servers with various authentication methods and executes remote procedure calls.

70

71

```python { .api }

72

class GrpcHook(BaseHook):

73

def __init__(

74

self,

75

grpc_conn_id: str = "grpc_default",

76

interceptors: Optional[List[Callable]] = None,

77

custom_connection_func: Optional[Callable] = None,

78

) -> None:

79

"""

80

Initialize gRPC hook.

81

82

Args:

83

grpc_conn_id: The connection ID to use when fetching connection info

84

interceptors: List of gRPC interceptor objects to apply to the channel

85

custom_connection_func: Custom function to return gRPC channel object

86

"""

87

88

def get_conn(self) -> grpc.Channel:

89

"""

90

Establish and return gRPC channel based on connection configuration.

91

92

Returns:

93

grpc.Channel: Configured gRPC channel

94

95

Raises:

96

AirflowConfigException: If auth_type is not supported or connection fails

97

"""

98

99

def run(

100

self,

101

stub_class: Callable,

102

call_func: str,

103

streaming: bool = False,

104

data: Optional[dict] = None

105

) -> Generator:

106

"""

107

Execute gRPC function and yield response.

108

109

Args:

110

stub_class: gRPC stub class generated from proto file

111

call_func: Function name to call on the stub

112

streaming: Whether the call is streaming (default: False)

113

data: Data to pass to the RPC call

114

115

Yields:

116

Response objects from the gRPC call

117

118

Raises:

119

grpc.RpcError: If gRPC service call fails

120

"""

121

122

@staticmethod

123

def get_connection_form_widgets() -> Dict[str, Any]:

124

"""

125

Return connection widgets for Airflow UI form.

126

127

Returns:

128

Dict[str, Any]: Form widgets for connection configuration

129

"""

130

```

131

132

### gRPC Operator

133

134

Airflow operator that executes gRPC calls as part of a DAG workflow. The GrpcOperator wraps the GrpcHook functionality in an operator suitable for use in Airflow task definitions.

135

136

The operator supports Airflow templating for `stub_class`, `call_func`, and `data` parameters, allowing dynamic values from task context and Jinja templates.

137

138

```python { .api }

139

class GrpcOperator(BaseOperator):

140

template_fields = ('stub_class', 'call_func', 'data')

141

142

def __init__(

143

self,

144

*,

145

stub_class: Callable,

146

call_func: str,

147

grpc_conn_id: str = "grpc_default",

148

data: Optional[dict] = None,

149

interceptors: Optional[List[Callable]] = None,

150

custom_connection_func: Optional[Callable] = None,

151

streaming: bool = False,

152

response_callback: Optional[Callable] = None,

153

log_response: bool = False,

154

**kwargs,

155

) -> None:

156

"""

157

Initialize gRPC operator.

158

159

Args:

160

stub_class: gRPC stub client class generated from proto file

161

call_func: Client function name to call the gRPC endpoint

162

grpc_conn_id: Connection ID to use (default: "grpc_default")

163

data: Data to pass to the RPC call

164

interceptors: List of gRPC interceptor objects

165

custom_connection_func: Custom function to return gRPC channel

166

streaming: Flag for streaming calls (default: False)

167

response_callback: Callback function to process responses

168

log_response: Flag to log responses (default: False)

169

**kwargs: Additional BaseOperator arguments

170

"""

171

172

def execute(self, context: Dict) -> None:

173

"""

174

Execute the gRPC operation.

175

176

Args:

177

context: Airflow task context dictionary

178

"""

179

```

180

181

## Connection Configuration

182

183

Configure gRPC connections in Airflow with the following connection parameters:

184

185

- **Connection Type**: `grpc`

186

- **Host**: gRPC server hostname

187

- **Port**: gRPC server port (optional)

188

- **Extra**: JSON configuration with authentication settings

189

190

### Authentication Types

191

192

The package supports multiple authentication methods through the `extra__grpc__auth_type` extra field:

193

194

#### NO_AUTH

195

```json

196

{

197

"extra__grpc__auth_type": "NO_AUTH"

198

}

199

```

200

201

#### SSL/TLS

202

```json

203

{

204

"extra__grpc__auth_type": "SSL",

205

"extra__grpc__credential_pem_file": "/path/to/credentials.pem"

206

}

207

```

208

209

#### Google JWT

210

```json

211

{

212

"extra__grpc__auth_type": "JWT_GOOGLE"

213

}

214

```

215

216

#### Google OAuth

217

```json

218

{

219

"extra__grpc__auth_type": "OAUTH_GOOGLE",

220

"extra__grpc__scopes": "grpc,gcs"

221

}

222

```

223

224

#### Custom Authentication

225

```json

226

{

227

"extra__grpc__auth_type": "CUSTOM"

228

}

229

```

230

231

When using custom authentication, you must provide a `custom_connection_func` that takes a connection object and returns a gRPC channel.

232

233

## Types

234

235

```python { .api }

236

# Type aliases and imports

237

from typing import Any, Callable, Dict, Generator, List, Optional

238

import grpc

239

240

# Connection configuration fields

241

ConnectionConfig = Dict[str, Any] # Connection extra fields

242

```

243

244

## Error Handling

245

246

The package handles gRPC-specific errors:

247

248

- **grpc.RpcError**: Raised when gRPC service calls fail. The hook logs error details including status code and error message. Common causes include network connectivity issues, service unavailability, or malformed requests.

249

- **AirflowConfigException**: Raised when connection configuration is invalid or authentication type is not supported. This includes unsupported auth_type values, missing credential files for SSL/TLS authentication, or missing custom_connection_func for CUSTOM auth type.

250

- **FileNotFoundError**: Can occur when SSL/TLS authentication is used but the credential_pem_file path is invalid or the file doesn't exist.

251

252

Example error handling:

253

254

```python

255

from airflow.exceptions import AirflowConfigException

256

import grpc

257

258

try:

259

hook = GrpcHook('my_connection')

260

responses = hook.run(MyStub, 'my_method', data={'input': 'test'})

261

for response in responses:

262

process_response(response)

263

except grpc.RpcError as e:

264

print(f"gRPC call failed: {e.code()}, {e.details()}")

265

except AirflowConfigException as e:

266

print(f"Configuration error: {e}")

267

except FileNotFoundError as e:

268

print(f"Credential file not found: {e}")

269

```

270

271

## Advanced Usage

272

273

### Using Interceptors

274

275

```python

276

# Custom interceptor example

277

class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):

278

def intercept_unary_unary(self, continuation, client_call_details, request):

279

print(f"Calling {client_call_details.method}")

280

return continuation(client_call_details, request)

281

282

# Use with hook

283

hook = GrpcHook(

284

grpc_conn_id='my_connection',

285

interceptors=[LoggingInterceptor()]

286

)

287

```

288

289

### Streaming Calls

290

291

```python

292

# Streaming call with operator

293

streaming_task = GrpcOperator(

294

task_id='stream_grpc_data',

295

stub_class=MyStreamingStub,

296

call_func='stream_method',

297

streaming=True,

298

grpc_conn_id='my_connection',

299

data={'stream_param': 'value'},

300

dag=dag,

301

)

302

```

303

304

### Response Callbacks

305

306

```python

307

def process_response(response, context):

308

# Process each response

309

result = response.result_field

310

# Push to XCom

311

context['task_instance'].xcom_push(key='grpc_result', value=result)

312

313

callback_task = GrpcOperator(

314

task_id='grpc_with_callback',

315

stub_class=MyStub,

316

call_func='my_method',

317

response_callback=process_response,

318

dag=dag,

319

)

320

```