or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-http

Apache Airflow HTTP Provider package enabling HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-http@2021.4.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-http@2021.4.0

index.mddocs/

Apache Airflow HTTP Provider

Apache Airflow HTTP Provider package enables HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.

Package Information

  • Package Name: apache-airflow-providers-http
  • Language: Python
  • Installation: pip install apache-airflow-providers-http

Core Imports

Basic HTTP functionality:

from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor

Common for utility functions:

from airflow.providers.http import determine_kwargs, make_kwargs_callable

Additional imports for type annotations:

from typing import Any, Callable, Dict, Optional, Union
import requests
from requests.auth import HTTPBasicAuth
from airflow.exceptions import AirflowException

Basic Usage

Making HTTP Requests with HttpHook

from airflow.providers.http.hooks.http import HttpHook

# Create HTTP hook
hook = HttpHook(method='GET', http_conn_id='my_http_conn')

# Make a simple GET request
response = hook.run(endpoint='api/data')
print(response.text)

# Make a POST request with data
hook_post = HttpHook(method='POST', http_conn_id='my_http_conn')
response = hook_post.run(
    endpoint='api/submit',
    data={'key': 'value'},
    headers={'Content-Type': 'application/json'}
)

Using HTTP Operator in DAGs

from datetime import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator

dag = DAG(
    'http_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None
)

# Simple HTTP task with response validation
def check_response(response, **context):
    """Custom response validation function"""
    json_data = response.json()
    return json_data.get('status') == 'success'

def filter_response(response, **context):
    """Extract specific data from response"""
    return response.json().get('result_data')

http_task = SimpleHttpOperator(
    task_id='call_api',
    endpoint='api/process/{{ ds }}',  # Templated with execution date
    method='POST',
    data={
        'action': 'process',
        'date': '{{ ds }}',  # Templated field
        'run_id': '{{ run_id }}'  # Templated field
    },
    headers={
        'Content-Type': 'application/json',
        'X-Run-ID': '{{ run_id }}'  # Templated field
    },
    response_check=check_response,
    response_filter=filter_response,
    http_conn_id='my_http_conn',
    log_response=True,
    dag=dag
)

Monitoring Endpoints with HttpSensor

from airflow.providers.http.sensors.http import HttpSensor

# Wait for API to be ready
sensor = HttpSensor(
    task_id='wait_for_api',
    endpoint='health',
    http_conn_id='my_http_conn',
    response_check=lambda response: response.json()['status'] == 'healthy',
    poke_interval=30,
    timeout=300,
    dag=dag
)

Architecture

The Apache Airflow HTTP Provider follows Airflow's standard provider pattern:

  • HttpHook: Core component for HTTP connections and request execution, handles authentication, session management, and error handling
  • SimpleHttpOperator: Task-level component that wraps HttpHook for use in DAGs, provides templating and response processing capabilities
  • HttpSensor: Monitoring component for polling HTTP endpoints until conditions are met, extends BaseSensorOperator with HTTP-specific logic
  • Utility Functions: Helper functions for dynamic callable management and parameter filtering

Template Fields

Template fields enable dynamic content using Jinja2 templating with Airflow context variables:

SimpleHttpOperator Template Fields

  • endpoint: API endpoint path (e.g., "api/data/{{ ds }}")
  • data: Request payload or parameters (supports nested templating)
  • headers: HTTP headers dictionary (keys and values can be templated)

HttpSensor Template Fields

  • endpoint: API endpoint path for polling
  • request_params: Request parameters (becomes data in hook.run())
  • headers: HTTP headers dictionary

Common Template Variables

  • {{ ds }}: Execution date (YYYY-MM-DD)
  • {{ run_id }}: Unique run identifier
  • {{ task_instance }}: Access to task instance object
  • {{ macros.datetime }}: Date/time manipulation functions

HTTP Hook Capabilities

The HttpHook provides the foundational HTTP connectivity layer for Airflow workflows.

class HttpHook(BaseHook):
    # Class attributes
    conn_name_attr = 'http_conn_id'
    default_conn_name = 'http_default'
    conn_type = 'http'
    hook_name = 'HTTP'

    def __init__(
        self,
        method: str = 'POST',
        http_conn_id: str = 'http_default',
        auth_type: Any = HTTPBasicAuth
    ) -> None:
        """
        Initialize HTTP hook with connection and method settings.

        Parameters:
        - method: HTTP method to use (GET, POST, PUT, DELETE, HEAD)
        - http_conn_id: Airflow connection ID for HTTP configuration
        - auth_type: Authentication type (HTTPBasicAuth or custom)
        """

    def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session:
        """
        Create HTTP session with connection configuration.

        Parameters:
        - headers: Additional headers to include in session

        Returns:
        Configured requests.Session object
        """

    def run(
        self,
        endpoint: Optional[str],
        data: Optional[Union[Dict[str, Any], str]] = None,
        headers: Optional[Dict[str, Any]] = None,
        extra_options: Optional[Dict[str, Any]] = None,
        **request_kwargs: Any
    ) -> Any:
        """
        Execute HTTP request with specified parameters.
        
        Method-specific behavior:
        - GET: data becomes URL parameters (params)
        - HEAD: data is ignored (no body or params)
        - POST/PUT/PATCH: data becomes request body
        
        URL construction: base_url + '/' + endpoint (with smart slash handling)

        Parameters:
        - endpoint: API endpoint to call (relative path)
        - data: Request payload (POST/PUT) or URL parameters (GET)
        - headers: HTTP headers for the request
        - extra_options: Additional options (timeout, verify, stream, etc.)
        - request_kwargs: Additional arguments passed to requests.Request (json, files, etc.)

        Returns:
        requests.Response object from the HTTP call
        
        Raises:
        AirflowException: On HTTP errors (non-2XX/3XX status codes)
        requests.exceptions.ConnectionError: On connection issues
        """

    def check_response(self, response: requests.Response) -> None:
        """
        Validate response status code, raise exception on HTTP errors.

        Parameters:
        - response: Response object to validate

        Raises:
        AirflowException: For non-2XX/3XX status codes
        """

    def run_and_check(
        self,
        session: requests.Session,
        prepped_request: requests.PreparedRequest,
        extra_options: Dict[Any, Any]
    ) -> Any:
        """
        Execute prepared request using session and validate response.
        
        Handles request execution with configurable options like timeout,
        SSL verification, proxies, and response checking.

        Parameters:
        - session: Configured requests session to use
        - prepped_request: Prepared request object from session.prepare_request()
        - extra_options: Request execution options (stream, verify, proxies, cert, timeout, etc.)

        Returns:
        requests.Response object from successful request
        
        Raises:
        requests.exceptions.ConnectionError: On connection issues (will be retried if using run_with_advanced_retry)
        AirflowException: On HTTP errors if check_response is enabled
        """

    def run_with_advanced_retry(
        self, 
        _retry_args: Dict[Any, Any], 
        *args: Any, 
        **kwargs: Any
    ) -> Any:
        """
        Execute run method with Tenacity-based retry logic.

        Parameters:
        - _retry_args: Tenacity retry configuration dict
        - args, kwargs: Arguments passed to run method

        Returns:
        Result from successful HTTP request after retries
        """

HTTP Operator Capabilities

The SimpleHttpOperator enables HTTP requests as Airflow tasks with full templating support.

class SimpleHttpOperator(BaseOperator):
    # Template configuration
    template_fields = ['endpoint', 'data', 'headers']
    template_fields_renderers = {'headers': 'json', 'data': 'py'}
    template_ext = ()
    ui_color = '#f4a460'

    def __init__(
        self,
        *,
        endpoint: Optional[str] = None,
        method: str = 'POST',
        data: Any = None,
        headers: Optional[Dict[str, str]] = None,
        response_check: Optional[Callable[..., bool]] = None,
        response_filter: Optional[Callable[..., Any]] = None,
        extra_options: Optional[Dict[str, Any]] = None,
        http_conn_id: str = 'http_default',
        log_response: bool = False,
        **kwargs: Any
    ) -> None:
        """
        Initialize HTTP operator with request configuration.

        Parameters:
        - endpoint: API endpoint (templated)
        - method: HTTP method to use
        - data: Request data/parameters (templated)
        - headers: HTTP headers (templated)
        - response_check: Function to validate response (returns bool)
        - response_filter: Function to transform response data
        - extra_options: Additional request options
        - http_conn_id: Airflow connection ID
        - log_response: Whether to log response content
        - kwargs: Additional BaseOperator arguments
        """

    def execute(self, context: Dict[str, Any]) -> Any:
        """
        Execute HTTP request using HttpHook.

        Parameters:
        - context: Airflow execution context

        Returns:
        Response text or filtered response data
        """

HTTP Sensor Capabilities

The HttpSensor monitors HTTP endpoints until specified conditions are met.

class HttpSensor(BaseSensorOperator):
    # Template configuration
    template_fields = ('endpoint', 'request_params', 'headers')

    def __init__(
        self,
        *,
        endpoint: str,
        http_conn_id: str = 'http_default',
        method: str = 'GET',
        request_params: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, Any]] = None,
        response_check: Optional[Callable[..., bool]] = None,
        extra_options: Optional[Dict[str, Any]] = None,
        **kwargs: Any
    ) -> None:
        """
        Initialize HTTP sensor with polling configuration.

        Parameters:
        - endpoint: API endpoint to monitor (required, templated)
        - http_conn_id: Airflow connection ID for HTTP configuration
        - method: HTTP method for polling requests (GET, POST, etc.)
        - request_params: Request parameters/data (templated, becomes data in hook.run())
        - headers: HTTP headers dictionary (templated)
        - response_check: Custom response validation function returning bool
        - extra_options: Additional request options (timeout, verify, etc.)
        - kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
        """

    def poke(self, context: Dict[Any, Any]) -> bool:
        """
        Execute HTTP request and evaluate success condition.
        
        Behavior:
        - Executes HTTP request using configured hook
        - Returns False for 404 errors (continues polling)
        - Raises exception for other HTTP errors (fails sensor)
        - Uses response_check function if provided for custom validation
        - Returns True if no response_check or response_check returns True

        Parameters:
        - context: Airflow execution context with task_instance, ds, etc.

        Returns:
        True if sensor condition met (stop polling), False to continue polling
        
        Raises:
        AirflowException: For HTTP errors other than 404, or connection issues
        """

Utility Functions

Helper functions for dynamic callable management and parameter filtering.

def determine_kwargs(
    func: Callable, 
    args: Union[Tuple, List], 
    kwargs: Dict
) -> Dict:
    """
    Inspect callable signature to determine which kwargs to pass.

    Parameters:
    - func: The callable to inspect
    - args: Positional arguments to skip in signature
    - kwargs: Keyword arguments to filter

    Returns:
    Dictionary with compatible keyword arguments
    """

def make_kwargs_callable(func: Callable) -> Callable:
    """
    Create callable that accepts any arguments but only forwards required ones.

    Parameters:
    - func: Function to wrap

    Returns:
    Wrapper function that filters arguments based on signature
    """

Authentication and Security

The HTTP provider supports multiple authentication methods:

  • HTTPBasicAuth: Username/password authentication via connection credentials
  • Custom Authentication: Pass custom auth objects via auth_type parameter
  • Header-based Authentication: API keys and tokens via headers in connection extras
  • SSL Configuration: Certificate validation and client certificates via extra_options

Error Handling

Comprehensive error handling for robust HTTP operations:

Exception Types

  • AirflowException: Raised for HTTP status errors (non-2XX/3XX codes)

    • Format: "{status_code}:{reason}" (e.g., "404:Not Found")
    • Automatically raised by check_response() method
    • Can be disabled via extra_options={'check_response': False}
  • requests.exceptions.ConnectionError: Network connectivity issues

    • Automatically retried when using run_with_advanced_retry()
    • Includes DNS resolution failures, network timeouts, connection refused
  • requests.exceptions.HTTPError: Base class for HTTP-related errors

    • Caught and converted to AirflowException in check_response()

Error Handling Strategies

  • Connection Errors: Automatic retry capabilities with Tenacity integration via run_with_advanced_retry()
  • HTTP Status Errors: Configurable response validation with custom check functions
  • Timeout Handling: Request timeout configuration via extra_options={'timeout': seconds}
  • Custom Validation: Response check functions for application-specific validation
  • SSL Errors: Certificate validation control via extra_options={'verify': False}

HttpSensor Specific Behavior

  • 404 Errors: Returns False (continue polling) instead of failing
  • Other HTTP Errors: Raises AirflowException (fails sensor)
  • Custom Response Validation: Uses response_check function for conditional success

Connection Configuration

HTTP connections are configured in Airflow with these components:

Required Fields

  • Host: Base URL for HTTP requests (can include protocol like https://api.example.com)

Optional Fields

  • Login/Password: Credentials for HTTPBasicAuth authentication
  • Schema: Protocol specification (http or https) - defaults to http if not in host
  • Port: Port number for non-standard ports

Extra Configuration (JSON)

The Extra field accepts JSON configuration for advanced options:

Headers

{
  "headers": {
    "User-Agent": "Airflow-HTTP-Provider",
    "Accept": "application/json",
    "Authorization": "Bearer token123"
  }
}

SSL and Security Options

{
  "verify": true,
  "cert": "/path/to/client.pem",
  "timeout": 60
}

Proxy Configuration

{
  "proxies": {
    "http": "http://proxy:8080",
    "https": "https://proxy:8080"
  }
}

Complete Example Connection

{
  "headers": {
    "User-Agent": "Airflow-HTTP-Provider/2.1.0",
    "Accept": "application/json",
    "Content-Type": "application/json"
  },
  "verify": true,
  "timeout": 30,
  "proxies": {
    "https": "https://corporate-proxy:8080"
  }
}

URL Construction Logic

The final URL is constructed as:

  1. If host contains ://, use as base URL directly
  2. Otherwise: {schema}://{host}:{port} (schema defaults to http, port is optional)
  3. Endpoint is appended with smart slash handling: base_url + '/' + endpoint