Apache Airflow HTTP Provider package enabling HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Apache Airflow HTTP Provider package enables HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.
pip install apache-airflow-providers-httpBasic HTTP functionality:
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensorCommon for utility functions:
from airflow.providers.http import determine_kwargs, make_kwargs_callableAdditional imports for type annotations:
from typing import Any, Callable, Dict, Optional, Union
import requests
from requests.auth import HTTPBasicAuth
from airflow.exceptions import AirflowExceptionfrom airflow.providers.http.hooks.http import HttpHook
# Create HTTP hook
hook = HttpHook(method='GET', http_conn_id='my_http_conn')
# Make a simple GET request
response = hook.run(endpoint='api/data')
print(response.text)
# Make a POST request with data
hook_post = HttpHook(method='POST', http_conn_id='my_http_conn')
response = hook_post.run(
endpoint='api/submit',
data={'key': 'value'},
headers={'Content-Type': 'application/json'}
)from datetime import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
dag = DAG(
'http_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
# Simple HTTP task with response validation
def check_response(response, **context):
"""Custom response validation function"""
json_data = response.json()
return json_data.get('status') == 'success'
def filter_response(response, **context):
"""Extract specific data from response"""
return response.json().get('result_data')
http_task = SimpleHttpOperator(
task_id='call_api',
endpoint='api/process/{{ ds }}', # Templated with execution date
method='POST',
data={
'action': 'process',
'date': '{{ ds }}', # Templated field
'run_id': '{{ run_id }}' # Templated field
},
headers={
'Content-Type': 'application/json',
'X-Run-ID': '{{ run_id }}' # Templated field
},
response_check=check_response,
response_filter=filter_response,
http_conn_id='my_http_conn',
log_response=True,
dag=dag
)from airflow.providers.http.sensors.http import HttpSensor
# Wait for API to be ready
sensor = HttpSensor(
task_id='wait_for_api',
endpoint='health',
http_conn_id='my_http_conn',
response_check=lambda response: response.json()['status'] == 'healthy',
poke_interval=30,
timeout=300,
dag=dag
)The Apache Airflow HTTP Provider follows Airflow's standard provider pattern:
Template fields enable dynamic content using Jinja2 templating with Airflow context variables:
endpoint: API endpoint path (e.g., "api/data/{{ ds }}")data: Request payload or parameters (supports nested templating)headers: HTTP headers dictionary (keys and values can be templated)endpoint: API endpoint path for pollingrequest_params: Request parameters (becomes data in hook.run())headers: HTTP headers dictionary{{ ds }}: Execution date (YYYY-MM-DD){{ run_id }}: Unique run identifier{{ task_instance }}: Access to task instance object{{ macros.datetime }}: Date/time manipulation functionsThe HttpHook provides the foundational HTTP connectivity layer for Airflow workflows.
class HttpHook(BaseHook):
# Class attributes
conn_name_attr = 'http_conn_id'
default_conn_name = 'http_default'
conn_type = 'http'
hook_name = 'HTTP'
def __init__(
self,
method: str = 'POST',
http_conn_id: str = 'http_default',
auth_type: Any = HTTPBasicAuth
) -> None:
"""
Initialize HTTP hook with connection and method settings.
Parameters:
- method: HTTP method to use (GET, POST, PUT, DELETE, HEAD)
- http_conn_id: Airflow connection ID for HTTP configuration
- auth_type: Authentication type (HTTPBasicAuth or custom)
"""
def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session:
"""
Create HTTP session with connection configuration.
Parameters:
- headers: Additional headers to include in session
Returns:
Configured requests.Session object
"""
def run(
self,
endpoint: Optional[str],
data: Optional[Union[Dict[str, Any], str]] = None,
headers: Optional[Dict[str, Any]] = None,
extra_options: Optional[Dict[str, Any]] = None,
**request_kwargs: Any
) -> Any:
"""
Execute HTTP request with specified parameters.
Method-specific behavior:
- GET: data becomes URL parameters (params)
- HEAD: data is ignored (no body or params)
- POST/PUT/PATCH: data becomes request body
URL construction: base_url + '/' + endpoint (with smart slash handling)
Parameters:
- endpoint: API endpoint to call (relative path)
- data: Request payload (POST/PUT) or URL parameters (GET)
- headers: HTTP headers for the request
- extra_options: Additional options (timeout, verify, stream, etc.)
- request_kwargs: Additional arguments passed to requests.Request (json, files, etc.)
Returns:
requests.Response object from the HTTP call
Raises:
AirflowException: On HTTP errors (non-2XX/3XX status codes)
requests.exceptions.ConnectionError: On connection issues
"""
def check_response(self, response: requests.Response) -> None:
"""
Validate response status code, raise exception on HTTP errors.
Parameters:
- response: Response object to validate
Raises:
AirflowException: For non-2XX/3XX status codes
"""
def run_and_check(
self,
session: requests.Session,
prepped_request: requests.PreparedRequest,
extra_options: Dict[Any, Any]
) -> Any:
"""
Execute prepared request using session and validate response.
Handles request execution with configurable options like timeout,
SSL verification, proxies, and response checking.
Parameters:
- session: Configured requests session to use
- prepped_request: Prepared request object from session.prepare_request()
- extra_options: Request execution options (stream, verify, proxies, cert, timeout, etc.)
Returns:
requests.Response object from successful request
Raises:
requests.exceptions.ConnectionError: On connection issues (will be retried if using run_with_advanced_retry)
AirflowException: On HTTP errors if check_response is enabled
"""
def run_with_advanced_retry(
self,
_retry_args: Dict[Any, Any],
*args: Any,
**kwargs: Any
) -> Any:
"""
Execute run method with Tenacity-based retry logic.
Parameters:
- _retry_args: Tenacity retry configuration dict
- args, kwargs: Arguments passed to run method
Returns:
Result from successful HTTP request after retries
"""The SimpleHttpOperator enables HTTP requests as Airflow tasks with full templating support.
class SimpleHttpOperator(BaseOperator):
# Template configuration
template_fields = ['endpoint', 'data', 'headers']
template_fields_renderers = {'headers': 'json', 'data': 'py'}
template_ext = ()
ui_color = '#f4a460'
def __init__(
self,
*,
endpoint: Optional[str] = None,
method: str = 'POST',
data: Any = None,
headers: Optional[Dict[str, str]] = None,
response_check: Optional[Callable[..., bool]] = None,
response_filter: Optional[Callable[..., Any]] = None,
extra_options: Optional[Dict[str, Any]] = None,
http_conn_id: str = 'http_default',
log_response: bool = False,
**kwargs: Any
) -> None:
"""
Initialize HTTP operator with request configuration.
Parameters:
- endpoint: API endpoint (templated)
- method: HTTP method to use
- data: Request data/parameters (templated)
- headers: HTTP headers (templated)
- response_check: Function to validate response (returns bool)
- response_filter: Function to transform response data
- extra_options: Additional request options
- http_conn_id: Airflow connection ID
- log_response: Whether to log response content
- kwargs: Additional BaseOperator arguments
"""
def execute(self, context: Dict[str, Any]) -> Any:
"""
Execute HTTP request using HttpHook.
Parameters:
- context: Airflow execution context
Returns:
Response text or filtered response data
"""The HttpSensor monitors HTTP endpoints until specified conditions are met.
class HttpSensor(BaseSensorOperator):
# Template configuration
template_fields = ('endpoint', 'request_params', 'headers')
def __init__(
self,
*,
endpoint: str,
http_conn_id: str = 'http_default',
method: str = 'GET',
request_params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
response_check: Optional[Callable[..., bool]] = None,
extra_options: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> None:
"""
Initialize HTTP sensor with polling configuration.
Parameters:
- endpoint: API endpoint to monitor (required, templated)
- http_conn_id: Airflow connection ID for HTTP configuration
- method: HTTP method for polling requests (GET, POST, etc.)
- request_params: Request parameters/data (templated, becomes data in hook.run())
- headers: HTTP headers dictionary (templated)
- response_check: Custom response validation function returning bool
- extra_options: Additional request options (timeout, verify, etc.)
- kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
"""
def poke(self, context: Dict[Any, Any]) -> bool:
"""
Execute HTTP request and evaluate success condition.
Behavior:
- Executes HTTP request using configured hook
- Returns False for 404 errors (continues polling)
- Raises exception for other HTTP errors (fails sensor)
- Uses response_check function if provided for custom validation
- Returns True if no response_check or response_check returns True
Parameters:
- context: Airflow execution context with task_instance, ds, etc.
Returns:
True if sensor condition met (stop polling), False to continue polling
Raises:
AirflowException: For HTTP errors other than 404, or connection issues
"""Helper functions for dynamic callable management and parameter filtering.
def determine_kwargs(
func: Callable,
args: Union[Tuple, List],
kwargs: Dict
) -> Dict:
"""
Inspect callable signature to determine which kwargs to pass.
Parameters:
- func: The callable to inspect
- args: Positional arguments to skip in signature
- kwargs: Keyword arguments to filter
Returns:
Dictionary with compatible keyword arguments
"""
def make_kwargs_callable(func: Callable) -> Callable:
"""
Create callable that accepts any arguments but only forwards required ones.
Parameters:
- func: Function to wrap
Returns:
Wrapper function that filters arguments based on signature
"""The HTTP provider supports multiple authentication methods:
Comprehensive error handling for robust HTTP operations:
AirflowException: Raised for HTTP status errors (non-2XX/3XX codes)
check_response() methodextra_options={'check_response': False}requests.exceptions.ConnectionError: Network connectivity issues
run_with_advanced_retry()requests.exceptions.HTTPError: Base class for HTTP-related errors
run_with_advanced_retry()extra_options={'timeout': seconds}extra_options={'verify': False}HTTP connections are configured in Airflow with these components:
https://api.example.com)http or https) - defaults to http if not in hostThe Extra field accepts JSON configuration for advanced options:
{
"headers": {
"User-Agent": "Airflow-HTTP-Provider",
"Accept": "application/json",
"Authorization": "Bearer token123"
}
}{
"verify": true,
"cert": "/path/to/client.pem",
"timeout": 60
}{
"proxies": {
"http": "http://proxy:8080",
"https": "https://proxy:8080"
}
}{
"headers": {
"User-Agent": "Airflow-HTTP-Provider/2.1.0",
"Accept": "application/json",
"Content-Type": "application/json"
},
"verify": true,
"timeout": 30,
"proxies": {
"https": "https://corporate-proxy:8080"
}
}The final URL is constructed as:
host contains ://, use as base URL directly{schema}://{host}:{port} (schema defaults to http, port is optional)base_url + '/' + endpoint