Backport provider package for gRPC integration with Apache Airflow 1.10.x
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-grpc@2021.3.0A backport provider package that enables Apache Airflow 1.10.x installations to use gRPC functionality that was originally developed for Airflow 2.0. The package provides hooks and operators for establishing gRPC connections and executing gRPC calls within Airflow DAGs, with support for multiple authentication methods and both unary and streaming calls.
pip install apache-airflow-backport-providers-grpcfrom airflow.providers.grpc.hooks.grpc import GrpcHook
from airflow.providers.grpc.operators.grpc import GrpcOperatorfrom airflow import DAG
from airflow.providers.grpc.hooks.grpc import GrpcHook
from airflow.providers.grpc.operators.grpc import GrpcOperator
from datetime import datetime, timedelta
# Using GrpcHook directly
def use_grpc_hook():
hook = GrpcHook(grpc_conn_id='my_grpc_connection')
# Execute a gRPC call
responses = hook.run(
stub_class=MyStubClass,
call_func='my_method',
data={'param1': 'value1', 'param2': 'value2'}
)
for response in responses:
print(response)
# Using GrpcOperator in DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'grpc_example',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
grpc_task = GrpcOperator(
task_id='call_grpc_service',
stub_class=MyStubClass,
call_func='my_method',
grpc_conn_id='my_grpc_connection',
data={'input': 'test_data'},
dag=dag,
)Provides low-level gRPC connection management and call execution capabilities. The GrpcHook establishes connections to gRPC servers with various authentication methods and executes remote procedure calls.
class GrpcHook(BaseHook):
def __init__(
self,
grpc_conn_id: str = "grpc_default",
interceptors: Optional[List[Callable]] = None,
custom_connection_func: Optional[Callable] = None,
) -> None:
"""
Initialize gRPC hook.
Args:
grpc_conn_id: The connection ID to use when fetching connection info
interceptors: List of gRPC interceptor objects to apply to the channel
custom_connection_func: Custom function to return gRPC channel object
"""
def get_conn(self) -> grpc.Channel:
"""
Establish and return gRPC channel based on connection configuration.
Returns:
grpc.Channel: Configured gRPC channel
Raises:
AirflowConfigException: If auth_type is not supported or connection fails
"""
def run(
self,
stub_class: Callable,
call_func: str,
streaming: bool = False,
data: Optional[dict] = None
) -> Generator:
"""
Execute gRPC function and yield response.
Args:
stub_class: gRPC stub class generated from proto file
call_func: Function name to call on the stub
streaming: Whether the call is streaming (default: False)
data: Data to pass to the RPC call
Yields:
Response objects from the gRPC call
Raises:
grpc.RpcError: If gRPC service call fails
"""
@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""
Return connection widgets for Airflow UI form.
Returns:
Dict[str, Any]: Form widgets for connection configuration
"""Airflow operator that executes gRPC calls as part of a DAG workflow. The GrpcOperator wraps the GrpcHook functionality in an operator suitable for use in Airflow task definitions.
The operator supports Airflow templating for stub_class, call_func, and data parameters, allowing dynamic values from task context and Jinja templates.
class GrpcOperator(BaseOperator):
template_fields = ('stub_class', 'call_func', 'data')
def __init__(
self,
*,
stub_class: Callable,
call_func: str,
grpc_conn_id: str = "grpc_default",
data: Optional[dict] = None,
interceptors: Optional[List[Callable]] = None,
custom_connection_func: Optional[Callable] = None,
streaming: bool = False,
response_callback: Optional[Callable] = None,
log_response: bool = False,
**kwargs,
) -> None:
"""
Initialize gRPC operator.
Args:
stub_class: gRPC stub client class generated from proto file
call_func: Client function name to call the gRPC endpoint
grpc_conn_id: Connection ID to use (default: "grpc_default")
data: Data to pass to the RPC call
interceptors: List of gRPC interceptor objects
custom_connection_func: Custom function to return gRPC channel
streaming: Flag for streaming calls (default: False)
response_callback: Callback function to process responses
log_response: Flag to log responses (default: False)
**kwargs: Additional BaseOperator arguments
"""
def execute(self, context: Dict) -> None:
"""
Execute the gRPC operation.
Args:
context: Airflow task context dictionary
"""Configure gRPC connections in Airflow with the following connection parameters:
grpcThe package supports multiple authentication methods through the extra__grpc__auth_type extra field:
{
"extra__grpc__auth_type": "NO_AUTH"
}{
"extra__grpc__auth_type": "SSL",
"extra__grpc__credential_pem_file": "/path/to/credentials.pem"
}{
"extra__grpc__auth_type": "JWT_GOOGLE"
}{
"extra__grpc__auth_type": "OAUTH_GOOGLE",
"extra__grpc__scopes": "grpc,gcs"
}{
"extra__grpc__auth_type": "CUSTOM"
}When using custom authentication, you must provide a custom_connection_func that takes a connection object and returns a gRPC channel.
# Type aliases and imports
from typing import Any, Callable, Dict, Generator, List, Optional
import grpc
# Connection configuration fields
ConnectionConfig = Dict[str, Any] # Connection extra fieldsThe package handles gRPC-specific errors:
Example error handling:
from airflow.exceptions import AirflowConfigException
import grpc
try:
hook = GrpcHook('my_connection')
responses = hook.run(MyStub, 'my_method', data={'input': 'test'})
for response in responses:
process_response(response)
except grpc.RpcError as e:
print(f"gRPC call failed: {e.code()}, {e.details()}")
except AirflowConfigException as e:
print(f"Configuration error: {e}")
except FileNotFoundError as e:
print(f"Credential file not found: {e}")# Custom interceptor example
class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
print(f"Calling {client_call_details.method}")
return continuation(client_call_details, request)
# Use with hook
hook = GrpcHook(
grpc_conn_id='my_connection',
interceptors=[LoggingInterceptor()]
)# Streaming call with operator
streaming_task = GrpcOperator(
task_id='stream_grpc_data',
stub_class=MyStreamingStub,
call_func='stream_method',
streaming=True,
grpc_conn_id='my_connection',
data={'stream_param': 'value'},
dag=dag,
)def process_response(response, context):
# Process each response
result = response.result_field
# Push to XCom
context['task_instance'].xcom_push(key='grpc_result', value=result)
callback_task = GrpcOperator(
task_id='grpc_with_callback',
stub_class=MyStub,
call_func='my_method',
response_callback=process_response,
dag=dag,
)