or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-backport-providers-grpc

Backport provider package for gRPC integration with Apache Airflow 1.10.x

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-backport-providers-grpc@2021.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-grpc@2021.3.0

index.mddocs/

Apache Airflow Backport Providers gRPC

A backport provider package that enables Apache Airflow 1.10.x installations to use gRPC functionality that was originally developed for Airflow 2.0. The package provides hooks and operators for establishing gRPC connections and executing gRPC calls within Airflow DAGs, with support for multiple authentication methods and both unary and streaming calls.

Package Information

  • Package Name: apache-airflow-backport-providers-grpc
  • Language: Python
  • Installation: pip install apache-airflow-backport-providers-grpc
  • Python Version: Python 3.6+

Core Imports

from airflow.providers.grpc.hooks.grpc import GrpcHook
from airflow.providers.grpc.operators.grpc import GrpcOperator

Basic Usage

from airflow import DAG
from airflow.providers.grpc.hooks.grpc import GrpcHook
from airflow.providers.grpc.operators.grpc import GrpcOperator
from datetime import datetime, timedelta

# Using GrpcHook directly
def use_grpc_hook():
    hook = GrpcHook(grpc_conn_id='my_grpc_connection')
    
    # Execute a gRPC call
    responses = hook.run(
        stub_class=MyStubClass,
        call_func='my_method',
        data={'param1': 'value1', 'param2': 'value2'}
    )
    
    for response in responses:
        print(response)

# Using GrpcOperator in DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'grpc_example',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

grpc_task = GrpcOperator(
    task_id='call_grpc_service',
    stub_class=MyStubClass,
    call_func='my_method',
    grpc_conn_id='my_grpc_connection',
    data={'input': 'test_data'},
    dag=dag,
)

Capabilities

gRPC Hook

Provides low-level gRPC connection management and call execution capabilities. The GrpcHook establishes connections to gRPC servers with various authentication methods and executes remote procedure calls.

class GrpcHook(BaseHook):
    def __init__(
        self,
        grpc_conn_id: str = "grpc_default",
        interceptors: Optional[List[Callable]] = None,
        custom_connection_func: Optional[Callable] = None,
    ) -> None:
        """
        Initialize gRPC hook.
        
        Args:
            grpc_conn_id: The connection ID to use when fetching connection info
            interceptors: List of gRPC interceptor objects to apply to the channel
            custom_connection_func: Custom function to return gRPC channel object
        """

    def get_conn(self) -> grpc.Channel:
        """
        Establish and return gRPC channel based on connection configuration.
        
        Returns:
            grpc.Channel: Configured gRPC channel
            
        Raises:
            AirflowConfigException: If auth_type is not supported or connection fails
        """

    def run(
        self,
        stub_class: Callable,
        call_func: str,
        streaming: bool = False,
        data: Optional[dict] = None
    ) -> Generator:
        """
        Execute gRPC function and yield response.
        
        Args:
            stub_class: gRPC stub class generated from proto file
            call_func: Function name to call on the stub
            streaming: Whether the call is streaming (default: False)
            data: Data to pass to the RPC call
            
        Yields:
            Response objects from the gRPC call
            
        Raises:
            grpc.RpcError: If gRPC service call fails
        """

    @staticmethod
    def get_connection_form_widgets() -> Dict[str, Any]:
        """
        Return connection widgets for Airflow UI form.
        
        Returns:
            Dict[str, Any]: Form widgets for connection configuration
        """

gRPC Operator

Airflow operator that executes gRPC calls as part of a DAG workflow. The GrpcOperator wraps the GrpcHook functionality in an operator suitable for use in Airflow task definitions.

The operator supports Airflow templating for stub_class, call_func, and data parameters, allowing dynamic values from task context and Jinja templates.

class GrpcOperator(BaseOperator):
    template_fields = ('stub_class', 'call_func', 'data')
    
    def __init__(
        self,
        *,
        stub_class: Callable,
        call_func: str,
        grpc_conn_id: str = "grpc_default",
        data: Optional[dict] = None,
        interceptors: Optional[List[Callable]] = None,
        custom_connection_func: Optional[Callable] = None,
        streaming: bool = False,
        response_callback: Optional[Callable] = None,
        log_response: bool = False,
        **kwargs,
    ) -> None:
        """
        Initialize gRPC operator.
        
        Args:
            stub_class: gRPC stub client class generated from proto file
            call_func: Client function name to call the gRPC endpoint
            grpc_conn_id: Connection ID to use (default: "grpc_default")
            data: Data to pass to the RPC call
            interceptors: List of gRPC interceptor objects
            custom_connection_func: Custom function to return gRPC channel
            streaming: Flag for streaming calls (default: False)
            response_callback: Callback function to process responses
            log_response: Flag to log responses (default: False)
            **kwargs: Additional BaseOperator arguments
        """

    def execute(self, context: Dict) -> None:
        """
        Execute the gRPC operation.
        
        Args:
            context: Airflow task context dictionary
        """

Connection Configuration

Configure gRPC connections in Airflow with the following connection parameters:

  • Connection Type: grpc
  • Host: gRPC server hostname
  • Port: gRPC server port (optional)
  • Extra: JSON configuration with authentication settings

Authentication Types

The package supports multiple authentication methods through the extra__grpc__auth_type extra field:

NO_AUTH

{
  "extra__grpc__auth_type": "NO_AUTH"
}

SSL/TLS

{
  "extra__grpc__auth_type": "SSL",
  "extra__grpc__credential_pem_file": "/path/to/credentials.pem"
}

Google JWT

{
  "extra__grpc__auth_type": "JWT_GOOGLE"
}

Google OAuth

{
  "extra__grpc__auth_type": "OAUTH_GOOGLE",
  "extra__grpc__scopes": "grpc,gcs"
}

Custom Authentication

{
  "extra__grpc__auth_type": "CUSTOM"
}

When using custom authentication, you must provide a custom_connection_func that takes a connection object and returns a gRPC channel.

Types

# Type aliases and imports
from typing import Any, Callable, Dict, Generator, List, Optional
import grpc

# Connection configuration fields
ConnectionConfig = Dict[str, Any]  # Connection extra fields

Error Handling

The package handles gRPC-specific errors:

  • grpc.RpcError: Raised when gRPC service calls fail. The hook logs error details including status code and error message. Common causes include network connectivity issues, service unavailability, or malformed requests.
  • AirflowConfigException: Raised when connection configuration is invalid or authentication type is not supported. This includes unsupported auth_type values, missing credential files for SSL/TLS authentication, or missing custom_connection_func for CUSTOM auth type.
  • FileNotFoundError: Can occur when SSL/TLS authentication is used but the credential_pem_file path is invalid or the file doesn't exist.

Example error handling:

from airflow.exceptions import AirflowConfigException
import grpc

try:
    hook = GrpcHook('my_connection')
    responses = hook.run(MyStub, 'my_method', data={'input': 'test'})
    for response in responses:
        process_response(response)
except grpc.RpcError as e:
    print(f"gRPC call failed: {e.code()}, {e.details()}")
except AirflowConfigException as e:
    print(f"Configuration error: {e}")
except FileNotFoundError as e:
    print(f"Credential file not found: {e}")

Advanced Usage

Using Interceptors

# Custom interceptor example
class LoggingInterceptor(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        print(f"Calling {client_call_details.method}")
        return continuation(client_call_details, request)

# Use with hook
hook = GrpcHook(
    grpc_conn_id='my_connection',
    interceptors=[LoggingInterceptor()]
)

Streaming Calls

# Streaming call with operator
streaming_task = GrpcOperator(
    task_id='stream_grpc_data',
    stub_class=MyStreamingStub,
    call_func='stream_method',
    streaming=True,
    grpc_conn_id='my_connection',
    data={'stream_param': 'value'},
    dag=dag,
)

Response Callbacks

def process_response(response, context):
    # Process each response
    result = response.result_field
    # Push to XCom
    context['task_instance'].xcom_push(key='grpc_result', value=result)

callback_task = GrpcOperator(
    task_id='grpc_with_callback',
    stub_class=MyStub,
    call_func='my_method',
    response_callback=process_response,
    dag=dag,
)