0
# Apache Airflow Backport Providers gRPC
1
2
A backport provider package that enables Apache Airflow 1.10.x installations to use gRPC functionality that was originally developed for Airflow 2.0. The package provides hooks and operators for establishing gRPC connections and executing gRPC calls within Airflow DAGs, with support for multiple authentication methods and both unary and streaming calls.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-backport-providers-grpc
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-backport-providers-grpc`
9
- **Python Version**: Python 3.6+
10
11
## Core Imports
12
13
```python
14
from airflow.providers.grpc.hooks.grpc import GrpcHook
15
from airflow.providers.grpc.operators.grpc import GrpcOperator
16
```
17
18
## Basic Usage
19
20
```python
21
from airflow import DAG
22
from airflow.providers.grpc.hooks.grpc import GrpcHook
23
from airflow.providers.grpc.operators.grpc import GrpcOperator
24
from datetime import datetime, timedelta
25
26
# Using GrpcHook directly
27
def use_grpc_hook():
28
hook = GrpcHook(grpc_conn_id='my_grpc_connection')
29
30
# Execute a gRPC call
31
responses = hook.run(
32
stub_class=MyStubClass,
33
call_func='my_method',
34
data={'param1': 'value1', 'param2': 'value2'}
35
)
36
37
for response in responses:
38
print(response)
39
40
# Using GrpcOperator in DAG
41
default_args = {
42
'owner': 'airflow',
43
'depends_on_past': False,
44
'start_date': datetime(2021, 1, 1),
45
'retries': 1,
46
'retry_delay': timedelta(minutes=5),
47
}
48
49
dag = DAG(
50
'grpc_example',
51
default_args=default_args,
52
schedule_interval=timedelta(days=1),
53
)
54
55
grpc_task = GrpcOperator(
56
task_id='call_grpc_service',
57
stub_class=MyStubClass,
58
call_func='my_method',
59
grpc_conn_id='my_grpc_connection',
60
data={'input': 'test_data'},
61
dag=dag,
62
)
63
```
64
65
## Capabilities
66
67
### gRPC Hook
68
69
Provides low-level gRPC connection management and call execution capabilities. The GrpcHook establishes connections to gRPC servers with various authentication methods and executes remote procedure calls.
70
71
```python { .api }
72
class GrpcHook(BaseHook):
73
def __init__(
74
self,
75
grpc_conn_id: str = "grpc_default",
76
interceptors: Optional[List[Callable]] = None,
77
custom_connection_func: Optional[Callable] = None,
78
) -> None:
79
"""
80
Initialize gRPC hook.
81
82
Args:
83
grpc_conn_id: The connection ID to use when fetching connection info
84
interceptors: List of gRPC interceptor objects to apply to the channel
85
custom_connection_func: Custom function to return gRPC channel object
86
"""
87
88
def get_conn(self) -> grpc.Channel:
89
"""
90
Establish and return gRPC channel based on connection configuration.
91
92
Returns:
93
grpc.Channel: Configured gRPC channel
94
95
Raises:
96
AirflowConfigException: If auth_type is not supported or connection fails
97
"""
98
99
def run(
100
self,
101
stub_class: Callable,
102
call_func: str,
103
streaming: bool = False,
104
data: Optional[dict] = None
105
) -> Generator:
106
"""
107
Execute gRPC function and yield response.
108
109
Args:
110
stub_class: gRPC stub class generated from proto file
111
call_func: Function name to call on the stub
112
streaming: Whether the call is streaming (default: False)
113
data: Data to pass to the RPC call
114
115
Yields:
116
Response objects from the gRPC call
117
118
Raises:
119
grpc.RpcError: If gRPC service call fails
120
"""
121
122
@staticmethod
123
def get_connection_form_widgets() -> Dict[str, Any]:
124
"""
125
Return connection widgets for Airflow UI form.
126
127
Returns:
128
Dict[str, Any]: Form widgets for connection configuration
129
"""
130
```
131
132
### gRPC Operator
133
134
Airflow operator that executes gRPC calls as part of a DAG workflow. The GrpcOperator wraps the GrpcHook functionality in an operator suitable for use in Airflow task definitions.
135
136
The operator supports Airflow templating for `stub_class`, `call_func`, and `data` parameters, allowing dynamic values from task context and Jinja templates.
137
138
```python { .api }
139
class GrpcOperator(BaseOperator):
140
template_fields = ('stub_class', 'call_func', 'data')
141
142
def __init__(
143
self,
144
*,
145
stub_class: Callable,
146
call_func: str,
147
grpc_conn_id: str = "grpc_default",
148
data: Optional[dict] = None,
149
interceptors: Optional[List[Callable]] = None,
150
custom_connection_func: Optional[Callable] = None,
151
streaming: bool = False,
152
response_callback: Optional[Callable] = None,
153
log_response: bool = False,
154
**kwargs,
155
) -> None:
156
"""
157
Initialize gRPC operator.
158
159
Args:
160
stub_class: gRPC stub client class generated from proto file
161
call_func: Client function name to call the gRPC endpoint
162
grpc_conn_id: Connection ID to use (default: "grpc_default")
163
data: Data to pass to the RPC call
164
interceptors: List of gRPC interceptor objects
165
custom_connection_func: Custom function to return gRPC channel
166
streaming: Flag for streaming calls (default: False)
167
response_callback: Callback function to process responses
168
log_response: Flag to log responses (default: False)
169
**kwargs: Additional BaseOperator arguments
170
"""
171
172
def execute(self, context: Dict) -> None:
173
"""
174
Execute the gRPC operation.
175
176
Args:
177
context: Airflow task context dictionary
178
"""
179
```
180
181
## Connection Configuration
182
183
Configure gRPC connections in Airflow with the following connection parameters:
184
185
- **Connection Type**: `grpc`
186
- **Host**: gRPC server hostname
187
- **Port**: gRPC server port (optional)
188
- **Extra**: JSON configuration with authentication settings
189
190
### Authentication Types
191
192
The package supports multiple authentication methods through the `extra__grpc__auth_type` extra field:
193
194
#### NO_AUTH
195
```json
196
{
197
"extra__grpc__auth_type": "NO_AUTH"
198
}
199
```
200
201
#### SSL/TLS
202
```json
203
{
204
"extra__grpc__auth_type": "SSL",
205
"extra__grpc__credential_pem_file": "/path/to/credentials.pem"
206
}
207
```
208
209
#### Google JWT
210
```json
211
{
212
"extra__grpc__auth_type": "JWT_GOOGLE"
213
}
214
```
215
216
#### Google OAuth
217
```json
218
{
219
"extra__grpc__auth_type": "OAUTH_GOOGLE",
220
"extra__grpc__scopes": "grpc,gcs"
221
}
222
```
223
224
#### Custom Authentication
225
```json
226
{
227
"extra__grpc__auth_type": "CUSTOM"
228
}
229
```
230
231
When using custom authentication, you must provide a `custom_connection_func` that takes a connection object and returns a gRPC channel.
232
233
## Types
234
235
```python { .api }
236
# Type aliases and imports
237
from typing import Any, Callable, Dict, Generator, List, Optional
238
import grpc
239
240
# Connection configuration fields
241
ConnectionConfig = Dict[str, Any] # Connection extra fields
242
```
243
244
## Error Handling
245
246
The package handles gRPC-specific errors:
247
248
- **grpc.RpcError**: Raised when gRPC service calls fail. The hook logs error details including status code and error message. Common causes include network connectivity issues, service unavailability, or malformed requests.
249
- **AirflowConfigException**: Raised when connection configuration is invalid or authentication type is not supported. This includes unsupported auth_type values, missing credential files for SSL/TLS authentication, or missing custom_connection_func for CUSTOM auth type.
250
- **FileNotFoundError**: Can occur when SSL/TLS authentication is used but the credential_pem_file path is invalid or the file doesn't exist.
251
252
Example error handling:
253
254
```python
255
from airflow.exceptions import AirflowConfigException
256
import grpc
257
258
try:
259
hook = GrpcHook('my_connection')
260
responses = hook.run(MyStub, 'my_method', data={'input': 'test'})
261
for response in responses:
262
process_response(response)
263
except grpc.RpcError as e:
264
print(f"gRPC call failed: {e.code()}, {e.details()}")
265
except AirflowConfigException as e:
266
print(f"Configuration error: {e}")
267
except FileNotFoundError as e:
268
print(f"Credential file not found: {e}")
269
```
270
271
## Advanced Usage
272
273
### Using Interceptors
274
275
```python
276
# Custom interceptor example
277
class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):
278
def intercept_unary_unary(self, continuation, client_call_details, request):
279
print(f"Calling {client_call_details.method}")
280
return continuation(client_call_details, request)
281
282
# Use with hook
283
hook = GrpcHook(
284
grpc_conn_id='my_connection',
285
interceptors=[LoggingInterceptor()]
286
)
287
```
288
289
### Streaming Calls
290
291
```python
292
# Streaming call with operator
293
streaming_task = GrpcOperator(
294
task_id='stream_grpc_data',
295
stub_class=MyStreamingStub,
296
call_func='stream_method',
297
streaming=True,
298
grpc_conn_id='my_connection',
299
data={'stream_param': 'value'},
300
dag=dag,
301
)
302
```
303
304
### Response Callbacks
305
306
```python
307
def process_response(response, context):
308
# Process each response
309
result = response.result_field
310
# Push to XCom
311
context['task_instance'].xcom_push(key='grpc_result', value=result)
312
313
callback_task = GrpcOperator(
314
task_id='grpc_with_callback',
315
stub_class=MyStub,
316
call_func='my_method',
317
response_callback=process_response,
318
dag=dag,
319
)
320
```