Apache Airflow HTTP Provider package enabling HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-http@2021.4.0Apache Airflow HTTP Provider package enables HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.
pip install apache-airflow-providers-httpBasic HTTP functionality:
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensorCommon for utility functions:
from airflow.providers.http import determine_kwargs, make_kwargs_callableAdditional imports for type annotations:
from typing import Any, Callable, Dict, Optional, Union
import requests
from requests.auth import HTTPBasicAuth
from airflow.exceptions import AirflowExceptionfrom airflow.providers.http.hooks.http import HttpHook
# Create HTTP hook
hook = HttpHook(method='GET', http_conn_id='my_http_conn')
# Make a simple GET request
response = hook.run(endpoint='api/data')
print(response.text)
# Make a POST request with data
hook_post = HttpHook(method='POST', http_conn_id='my_http_conn')
response = hook_post.run(
endpoint='api/submit',
data={'key': 'value'},
headers={'Content-Type': 'application/json'}
)from datetime import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
dag = DAG(
'http_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
# Simple HTTP task with response validation
def check_response(response, **context):
"""Custom response validation function"""
json_data = response.json()
return json_data.get('status') == 'success'
def filter_response(response, **context):
"""Extract specific data from response"""
return response.json().get('result_data')
http_task = SimpleHttpOperator(
task_id='call_api',
endpoint='api/process/{{ ds }}', # Templated with execution date
method='POST',
data={
'action': 'process',
'date': '{{ ds }}', # Templated field
'run_id': '{{ run_id }}' # Templated field
},
headers={
'Content-Type': 'application/json',
'X-Run-ID': '{{ run_id }}' # Templated field
},
response_check=check_response,
response_filter=filter_response,
http_conn_id='my_http_conn',
log_response=True,
dag=dag
)from airflow.providers.http.sensors.http import HttpSensor
# Wait for API to be ready
sensor = HttpSensor(
task_id='wait_for_api',
endpoint='health',
http_conn_id='my_http_conn',
response_check=lambda response: response.json()['status'] == 'healthy',
poke_interval=30,
timeout=300,
dag=dag
)The Apache Airflow HTTP Provider follows Airflow's standard provider pattern:
Template fields enable dynamic content using Jinja2 templating with Airflow context variables:
endpoint: API endpoint path (e.g., "api/data/{{ ds }}")data: Request payload or parameters (supports nested templating)headers: HTTP headers dictionary (keys and values can be templated)endpoint: API endpoint path for pollingrequest_params: Request parameters (becomes data in hook.run())headers: HTTP headers dictionary{{ ds }}: Execution date (YYYY-MM-DD){{ run_id }}: Unique run identifier{{ task_instance }}: Access to task instance object{{ macros.datetime }}: Date/time manipulation functionsThe HttpHook provides the foundational HTTP connectivity layer for Airflow workflows.
class HttpHook(BaseHook):
# Class attributes
conn_name_attr = 'http_conn_id'
default_conn_name = 'http_default'
conn_type = 'http'
hook_name = 'HTTP'
def __init__(
self,
method: str = 'POST',
http_conn_id: str = 'http_default',
auth_type: Any = HTTPBasicAuth
) -> None:
"""
Initialize HTTP hook with connection and method settings.
Parameters:
- method: HTTP method to use (GET, POST, PUT, DELETE, HEAD)
- http_conn_id: Airflow connection ID for HTTP configuration
- auth_type: Authentication type (HTTPBasicAuth or custom)
"""
def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session:
"""
Create HTTP session with connection configuration.
Parameters:
- headers: Additional headers to include in session
Returns:
Configured requests.Session object
"""
def run(
self,
endpoint: Optional[str],
data: Optional[Union[Dict[str, Any], str]] = None,
headers: Optional[Dict[str, Any]] = None,
extra_options: Optional[Dict[str, Any]] = None,
**request_kwargs: Any
) -> Any:
"""
Execute HTTP request with specified parameters.
Method-specific behavior:
- GET: data becomes URL parameters (params)
- HEAD: data is ignored (no body or params)
- POST/PUT/PATCH: data becomes request body
URL construction: base_url + '/' + endpoint (with smart slash handling)
Parameters:
- endpoint: API endpoint to call (relative path)
- data: Request payload (POST/PUT) or URL parameters (GET)
- headers: HTTP headers for the request
- extra_options: Additional options (timeout, verify, stream, etc.)
- request_kwargs: Additional arguments passed to requests.Request (json, files, etc.)
Returns:
requests.Response object from the HTTP call
Raises:
AirflowException: On HTTP errors (non-2XX/3XX status codes)
requests.exceptions.ConnectionError: On connection issues
"""
def check_response(self, response: requests.Response) -> None:
"""
Validate response status code, raise exception on HTTP errors.
Parameters:
- response: Response object to validate
Raises:
AirflowException: For non-2XX/3XX status codes
"""
def run_and_check(
self,
session: requests.Session,
prepped_request: requests.PreparedRequest,
extra_options: Dict[Any, Any]
) -> Any:
"""
Execute prepared request using session and validate response.
Handles request execution with configurable options like timeout,
SSL verification, proxies, and response checking.
Parameters:
- session: Configured requests session to use
- prepped_request: Prepared request object from session.prepare_request()
- extra_options: Request execution options (stream, verify, proxies, cert, timeout, etc.)
Returns:
requests.Response object from successful request
Raises:
requests.exceptions.ConnectionError: On connection issues (will be retried if using run_with_advanced_retry)
AirflowException: On HTTP errors if check_response is enabled
"""
def run_with_advanced_retry(
self,
_retry_args: Dict[Any, Any],
*args: Any,
**kwargs: Any
) -> Any:
"""
Execute run method with Tenacity-based retry logic.
Parameters:
- _retry_args: Tenacity retry configuration dict
- args, kwargs: Arguments passed to run method
Returns:
Result from successful HTTP request after retries
"""The SimpleHttpOperator enables HTTP requests as Airflow tasks with full templating support.
class SimpleHttpOperator(BaseOperator):
# Template configuration
template_fields = ['endpoint', 'data', 'headers']
template_fields_renderers = {'headers': 'json', 'data': 'py'}
template_ext = ()
ui_color = '#f4a460'
def __init__(
self,
*,
endpoint: Optional[str] = None,
method: str = 'POST',
data: Any = None,
headers: Optional[Dict[str, str]] = None,
response_check: Optional[Callable[..., bool]] = None,
response_filter: Optional[Callable[..., Any]] = None,
extra_options: Optional[Dict[str, Any]] = None,
http_conn_id: str = 'http_default',
log_response: bool = False,
**kwargs: Any
) -> None:
"""
Initialize HTTP operator with request configuration.
Parameters:
- endpoint: API endpoint (templated)
- method: HTTP method to use
- data: Request data/parameters (templated)
- headers: HTTP headers (templated)
- response_check: Function to validate response (returns bool)
- response_filter: Function to transform response data
- extra_options: Additional request options
- http_conn_id: Airflow connection ID
- log_response: Whether to log response content
- kwargs: Additional BaseOperator arguments
"""
def execute(self, context: Dict[str, Any]) -> Any:
"""
Execute HTTP request using HttpHook.
Parameters:
- context: Airflow execution context
Returns:
Response text or filtered response data
"""The HttpSensor monitors HTTP endpoints until specified conditions are met.
class HttpSensor(BaseSensorOperator):
# Template configuration
template_fields = ('endpoint', 'request_params', 'headers')
def __init__(
self,
*,
endpoint: str,
http_conn_id: str = 'http_default',
method: str = 'GET',
request_params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
response_check: Optional[Callable[..., bool]] = None,
extra_options: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> None:
"""
Initialize HTTP sensor with polling configuration.
Parameters:
- endpoint: API endpoint to monitor (required, templated)
- http_conn_id: Airflow connection ID for HTTP configuration
- method: HTTP method for polling requests (GET, POST, etc.)
- request_params: Request parameters/data (templated, becomes data in hook.run())
- headers: HTTP headers dictionary (templated)
- response_check: Custom response validation function returning bool
- extra_options: Additional request options (timeout, verify, etc.)
- kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
"""
def poke(self, context: Dict[Any, Any]) -> bool:
"""
Execute HTTP request and evaluate success condition.
Behavior:
- Executes HTTP request using configured hook
- Returns False for 404 errors (continues polling)
- Raises exception for other HTTP errors (fails sensor)
- Uses response_check function if provided for custom validation
- Returns True if no response_check or response_check returns True
Parameters:
- context: Airflow execution context with task_instance, ds, etc.
Returns:
True if sensor condition met (stop polling), False to continue polling
Raises:
AirflowException: For HTTP errors other than 404, or connection issues
"""Helper functions for dynamic callable management and parameter filtering.
def determine_kwargs(
func: Callable,
args: Union[Tuple, List],
kwargs: Dict
) -> Dict:
"""
Inspect callable signature to determine which kwargs to pass.
Parameters:
- func: The callable to inspect
- args: Positional arguments to skip in signature
- kwargs: Keyword arguments to filter
Returns:
Dictionary with compatible keyword arguments
"""
def make_kwargs_callable(func: Callable) -> Callable:
"""
Create callable that accepts any arguments but only forwards required ones.
Parameters:
- func: Function to wrap
Returns:
Wrapper function that filters arguments based on signature
"""The HTTP provider supports multiple authentication methods:
Comprehensive error handling for robust HTTP operations:
AirflowException: Raised for HTTP status errors (non-2XX/3XX codes)
check_response() methodextra_options={'check_response': False}requests.exceptions.ConnectionError: Network connectivity issues
run_with_advanced_retry()requests.exceptions.HTTPError: Base class for HTTP-related errors
run_with_advanced_retry()extra_options={'timeout': seconds}extra_options={'verify': False}HTTP connections are configured in Airflow with these components:
https://api.example.com)http or https) - defaults to http if not in hostThe Extra field accepts JSON configuration for advanced options:
{
"headers": {
"User-Agent": "Airflow-HTTP-Provider",
"Accept": "application/json",
"Authorization": "Bearer token123"
}
}{
"verify": true,
"cert": "/path/to/client.pem",
"timeout": 60
}{
"proxies": {
"http": "http://proxy:8080",
"https": "https://proxy:8080"
}
}{
"headers": {
"User-Agent": "Airflow-HTTP-Provider/2.1.0",
"Accept": "application/json",
"Content-Type": "application/json"
},
"verify": true,
"timeout": 30,
"proxies": {
"https": "https://corporate-proxy:8080"
}
}The final URL is constructed as:
host contains ://, use as base URL directly{schema}://{host}:{port} (schema defaults to http, port is optional)base_url + '/' + endpoint