Apache Airflow HTTP Provider package enabling HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-http@2021.4.00
# Apache Airflow HTTP Provider
1
2
Apache Airflow HTTP Provider package enables HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-http
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-http`
9
10
## Core Imports
11
12
Basic HTTP functionality:
13
14
```python
15
from airflow.providers.http.hooks.http import HttpHook
16
from airflow.providers.http.operators.http import SimpleHttpOperator
17
from airflow.providers.http.sensors.http import HttpSensor
18
```
19
20
Common for utility functions:
21
22
```python
23
from airflow.providers.http import determine_kwargs, make_kwargs_callable
24
```
25
26
Additional imports for type annotations:
27
28
```python
29
from typing import Any, Callable, Dict, Optional, Union
30
import requests
31
from requests.auth import HTTPBasicAuth
32
from airflow.exceptions import AirflowException
33
```
34
35
## Basic Usage
36
37
### Making HTTP Requests with HttpHook
38
39
```python
40
from airflow.providers.http.hooks.http import HttpHook
41
42
# Create HTTP hook
43
hook = HttpHook(method='GET', http_conn_id='my_http_conn')
44
45
# Make a simple GET request
46
response = hook.run(endpoint='api/data')
47
print(response.text)
48
49
# Make a POST request with data
50
hook_post = HttpHook(method='POST', http_conn_id='my_http_conn')
51
response = hook_post.run(
52
endpoint='api/submit',
53
data={'key': 'value'},
54
headers={'Content-Type': 'application/json'}
55
)
56
```
57
58
### Using HTTP Operator in DAGs
59
60
```python
61
from datetime import datetime
62
from airflow import DAG
63
from airflow.providers.http.operators.http import SimpleHttpOperator
64
65
dag = DAG(
66
'http_example',
67
start_date=datetime(2023, 1, 1),
68
schedule_interval=None
69
)
70
71
# Simple HTTP task with response validation
72
def check_response(response, **context):
73
"""Custom response validation function"""
74
json_data = response.json()
75
return json_data.get('status') == 'success'
76
77
def filter_response(response, **context):
78
"""Extract specific data from response"""
79
return response.json().get('result_data')
80
81
http_task = SimpleHttpOperator(
82
task_id='call_api',
83
endpoint='api/process/{{ ds }}', # Templated with execution date
84
method='POST',
85
data={
86
'action': 'process',
87
'date': '{{ ds }}', # Templated field
88
'run_id': '{{ run_id }}' # Templated field
89
},
90
headers={
91
'Content-Type': 'application/json',
92
'X-Run-ID': '{{ run_id }}' # Templated field
93
},
94
response_check=check_response,
95
response_filter=filter_response,
96
http_conn_id='my_http_conn',
97
log_response=True,
98
dag=dag
99
)
100
```
101
102
### Monitoring Endpoints with HttpSensor
103
104
```python
105
from airflow.providers.http.sensors.http import HttpSensor
106
107
# Wait for API to be ready
108
sensor = HttpSensor(
109
task_id='wait_for_api',
110
endpoint='health',
111
http_conn_id='my_http_conn',
112
response_check=lambda response: response.json()['status'] == 'healthy',
113
poke_interval=30,
114
timeout=300,
115
dag=dag
116
)
117
```
118
119
## Architecture
120
121
The Apache Airflow HTTP Provider follows Airflow's standard provider pattern:
122
123
- **HttpHook**: Core component for HTTP connections and request execution, handles authentication, session management, and error handling
124
- **SimpleHttpOperator**: Task-level component that wraps HttpHook for use in DAGs, provides templating and response processing capabilities
125
- **HttpSensor**: Monitoring component for polling HTTP endpoints until conditions are met, extends BaseSensorOperator with HTTP-specific logic
126
- **Utility Functions**: Helper functions for dynamic callable management and parameter filtering
127
128
## Template Fields
129
130
Template fields enable dynamic content using Jinja2 templating with Airflow context variables:
131
132
### SimpleHttpOperator Template Fields
133
- `endpoint`: API endpoint path (e.g., `"api/data/{{ ds }}"`)
134
- `data`: Request payload or parameters (supports nested templating)
135
- `headers`: HTTP headers dictionary (keys and values can be templated)
136
137
### HttpSensor Template Fields
138
- `endpoint`: API endpoint path for polling
139
- `request_params`: Request parameters (becomes `data` in hook.run())
140
- `headers`: HTTP headers dictionary
141
142
### Common Template Variables
143
- `{{ ds }}`: Execution date (YYYY-MM-DD)
144
- `{{ run_id }}`: Unique run identifier
145
- `{{ task_instance }}`: Access to task instance object
146
- `{{ macros.datetime }}`: Date/time manipulation functions
147
148
## HTTP Hook Capabilities
149
150
The HttpHook provides the foundational HTTP connectivity layer for Airflow workflows.
151
152
```python { .api }
153
class HttpHook(BaseHook):
154
# Class attributes
155
conn_name_attr = 'http_conn_id'
156
default_conn_name = 'http_default'
157
conn_type = 'http'
158
hook_name = 'HTTP'
159
160
def __init__(
161
self,
162
method: str = 'POST',
163
http_conn_id: str = 'http_default',
164
auth_type: Any = HTTPBasicAuth
165
) -> None:
166
"""
167
Initialize HTTP hook with connection and method settings.
168
169
Parameters:
170
- method: HTTP method to use (GET, POST, PUT, DELETE, HEAD)
171
- http_conn_id: Airflow connection ID for HTTP configuration
172
- auth_type: Authentication type (HTTPBasicAuth or custom)
173
"""
174
175
def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session:
176
"""
177
Create HTTP session with connection configuration.
178
179
Parameters:
180
- headers: Additional headers to include in session
181
182
Returns:
183
Configured requests.Session object
184
"""
185
186
def run(
187
self,
188
endpoint: Optional[str],
189
data: Optional[Union[Dict[str, Any], str]] = None,
190
headers: Optional[Dict[str, Any]] = None,
191
extra_options: Optional[Dict[str, Any]] = None,
192
**request_kwargs: Any
193
) -> Any:
194
"""
195
Execute HTTP request with specified parameters.
196
197
Method-specific behavior:
198
- GET: data becomes URL parameters (params)
199
- HEAD: data is ignored (no body or params)
200
- POST/PUT/PATCH: data becomes request body
201
202
URL construction: base_url + '/' + endpoint (with smart slash handling)
203
204
Parameters:
205
- endpoint: API endpoint to call (relative path)
206
- data: Request payload (POST/PUT) or URL parameters (GET)
207
- headers: HTTP headers for the request
208
- extra_options: Additional options (timeout, verify, stream, etc.)
209
- request_kwargs: Additional arguments passed to requests.Request (json, files, etc.)
210
211
Returns:
212
requests.Response object from the HTTP call
213
214
Raises:
215
AirflowException: On HTTP errors (non-2XX/3XX status codes)
216
requests.exceptions.ConnectionError: On connection issues
217
"""
218
219
def check_response(self, response: requests.Response) -> None:
220
"""
221
Validate response status code, raise exception on HTTP errors.
222
223
Parameters:
224
- response: Response object to validate
225
226
Raises:
227
AirflowException: For non-2XX/3XX status codes
228
"""
229
230
def run_and_check(
231
self,
232
session: requests.Session,
233
prepped_request: requests.PreparedRequest,
234
extra_options: Dict[Any, Any]
235
) -> Any:
236
"""
237
Execute prepared request using session and validate response.
238
239
Handles request execution with configurable options like timeout,
240
SSL verification, proxies, and response checking.
241
242
Parameters:
243
- session: Configured requests session to use
244
- prepped_request: Prepared request object from session.prepare_request()
245
- extra_options: Request execution options (stream, verify, proxies, cert, timeout, etc.)
246
247
Returns:
248
requests.Response object from successful request
249
250
Raises:
251
requests.exceptions.ConnectionError: On connection issues (will be retried if using run_with_advanced_retry)
252
AirflowException: On HTTP errors if check_response is enabled
253
"""
254
255
def run_with_advanced_retry(
256
self,
257
_retry_args: Dict[Any, Any],
258
*args: Any,
259
**kwargs: Any
260
) -> Any:
261
"""
262
Execute run method with Tenacity-based retry logic.
263
264
Parameters:
265
- _retry_args: Tenacity retry configuration dict
266
- args, kwargs: Arguments passed to run method
267
268
Returns:
269
Result from successful HTTP request after retries
270
"""
271
```
272
273
## HTTP Operator Capabilities
274
275
The SimpleHttpOperator enables HTTP requests as Airflow tasks with full templating support.
276
277
```python { .api }
278
class SimpleHttpOperator(BaseOperator):
279
# Template configuration
280
template_fields = ['endpoint', 'data', 'headers']
281
template_fields_renderers = {'headers': 'json', 'data': 'py'}
282
template_ext = ()
283
ui_color = '#f4a460'
284
285
def __init__(
286
self,
287
*,
288
endpoint: Optional[str] = None,
289
method: str = 'POST',
290
data: Any = None,
291
headers: Optional[Dict[str, str]] = None,
292
response_check: Optional[Callable[..., bool]] = None,
293
response_filter: Optional[Callable[..., Any]] = None,
294
extra_options: Optional[Dict[str, Any]] = None,
295
http_conn_id: str = 'http_default',
296
log_response: bool = False,
297
**kwargs: Any
298
) -> None:
299
"""
300
Initialize HTTP operator with request configuration.
301
302
Parameters:
303
- endpoint: API endpoint (templated)
304
- method: HTTP method to use
305
- data: Request data/parameters (templated)
306
- headers: HTTP headers (templated)
307
- response_check: Function to validate response (returns bool)
308
- response_filter: Function to transform response data
309
- extra_options: Additional request options
310
- http_conn_id: Airflow connection ID
311
- log_response: Whether to log response content
312
- kwargs: Additional BaseOperator arguments
313
"""
314
315
def execute(self, context: Dict[str, Any]) -> Any:
316
"""
317
Execute HTTP request using HttpHook.
318
319
Parameters:
320
- context: Airflow execution context
321
322
Returns:
323
Response text or filtered response data
324
"""
325
```
326
327
## HTTP Sensor Capabilities
328
329
The HttpSensor monitors HTTP endpoints until specified conditions are met.
330
331
```python { .api }
332
class HttpSensor(BaseSensorOperator):
333
# Template configuration
334
template_fields = ('endpoint', 'request_params', 'headers')
335
336
def __init__(
337
self,
338
*,
339
endpoint: str,
340
http_conn_id: str = 'http_default',
341
method: str = 'GET',
342
request_params: Optional[Dict[str, Any]] = None,
343
headers: Optional[Dict[str, Any]] = None,
344
response_check: Optional[Callable[..., bool]] = None,
345
extra_options: Optional[Dict[str, Any]] = None,
346
**kwargs: Any
347
) -> None:
348
"""
349
Initialize HTTP sensor with polling configuration.
350
351
Parameters:
352
- endpoint: API endpoint to monitor (required, templated)
353
- http_conn_id: Airflow connection ID for HTTP configuration
354
- method: HTTP method for polling requests (GET, POST, etc.)
355
- request_params: Request parameters/data (templated, becomes data in hook.run())
356
- headers: HTTP headers dictionary (templated)
357
- response_check: Custom response validation function returning bool
358
- extra_options: Additional request options (timeout, verify, etc.)
359
- kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
360
"""
361
362
def poke(self, context: Dict[Any, Any]) -> bool:
363
"""
364
Execute HTTP request and evaluate success condition.
365
366
Behavior:
367
- Executes HTTP request using configured hook
368
- Returns False for 404 errors (continues polling)
369
- Raises exception for other HTTP errors (fails sensor)
370
- Uses response_check function if provided for custom validation
371
- Returns True if no response_check or response_check returns True
372
373
Parameters:
374
- context: Airflow execution context with task_instance, ds, etc.
375
376
Returns:
377
True if sensor condition met (stop polling), False to continue polling
378
379
Raises:
380
AirflowException: For HTTP errors other than 404, or connection issues
381
"""
382
```
383
384
## Utility Functions
385
386
Helper functions for dynamic callable management and parameter filtering.
387
388
```python { .api }
389
def determine_kwargs(
390
func: Callable,
391
args: Union[Tuple, List],
392
kwargs: Dict
393
) -> Dict:
394
"""
395
Inspect callable signature to determine which kwargs to pass.
396
397
Parameters:
398
- func: The callable to inspect
399
- args: Positional arguments to skip in signature
400
- kwargs: Keyword arguments to filter
401
402
Returns:
403
Dictionary with compatible keyword arguments
404
"""
405
406
def make_kwargs_callable(func: Callable) -> Callable:
407
"""
408
Create callable that accepts any arguments but only forwards required ones.
409
410
Parameters:
411
- func: Function to wrap
412
413
Returns:
414
Wrapper function that filters arguments based on signature
415
"""
416
```
417
418
## Authentication and Security
419
420
The HTTP provider supports multiple authentication methods:
421
422
- **HTTPBasicAuth**: Username/password authentication via connection credentials
423
- **Custom Authentication**: Pass custom auth objects via auth_type parameter
424
- **Header-based Authentication**: API keys and tokens via headers in connection extras
425
- **SSL Configuration**: Certificate validation and client certificates via extra_options
426
427
## Error Handling
428
429
Comprehensive error handling for robust HTTP operations:
430
431
### Exception Types
432
433
- **`AirflowException`**: Raised for HTTP status errors (non-2XX/3XX codes)
434
- Format: "{status_code}:{reason}" (e.g., "404:Not Found")
435
- Automatically raised by `check_response()` method
436
- Can be disabled via `extra_options={'check_response': False}`
437
438
- **`requests.exceptions.ConnectionError`**: Network connectivity issues
439
- Automatically retried when using `run_with_advanced_retry()`
440
- Includes DNS resolution failures, network timeouts, connection refused
441
442
- **`requests.exceptions.HTTPError`**: Base class for HTTP-related errors
443
- Caught and converted to AirflowException in check_response()
444
445
### Error Handling Strategies
446
447
- **Connection Errors**: Automatic retry capabilities with Tenacity integration via `run_with_advanced_retry()`
448
- **HTTP Status Errors**: Configurable response validation with custom check functions
449
- **Timeout Handling**: Request timeout configuration via `extra_options={'timeout': seconds}`
450
- **Custom Validation**: Response check functions for application-specific validation
451
- **SSL Errors**: Certificate validation control via `extra_options={'verify': False}`
452
453
### HttpSensor Specific Behavior
454
455
- **404 Errors**: Returns False (continue polling) instead of failing
456
- **Other HTTP Errors**: Raises AirflowException (fails sensor)
457
- **Custom Response Validation**: Uses response_check function for conditional success
458
459
## Connection Configuration
460
461
HTTP connections are configured in Airflow with these components:
462
463
### Required Fields
464
- **Host**: Base URL for HTTP requests (can include protocol like `https://api.example.com`)
465
466
### Optional Fields
467
- **Login/Password**: Credentials for HTTPBasicAuth authentication
468
- **Schema**: Protocol specification (`http` or `https`) - defaults to `http` if not in host
469
- **Port**: Port number for non-standard ports
470
471
### Extra Configuration (JSON)
472
The Extra field accepts JSON configuration for advanced options:
473
474
#### Headers
475
```json
476
{
477
"headers": {
478
"User-Agent": "Airflow-HTTP-Provider",
479
"Accept": "application/json",
480
"Authorization": "Bearer token123"
481
}
482
}
483
```
484
485
#### SSL and Security Options
486
```json
487
{
488
"verify": true,
489
"cert": "/path/to/client.pem",
490
"timeout": 60
491
}
492
```
493
494
#### Proxy Configuration
495
```json
496
{
497
"proxies": {
498
"http": "http://proxy:8080",
499
"https": "https://proxy:8080"
500
}
501
}
502
```
503
504
### Complete Example Connection
505
```json
506
{
507
"headers": {
508
"User-Agent": "Airflow-HTTP-Provider/2.1.0",
509
"Accept": "application/json",
510
"Content-Type": "application/json"
511
},
512
"verify": true,
513
"timeout": 30,
514
"proxies": {
515
"https": "https://corporate-proxy:8080"
516
}
517
}
518
```
519
520
### URL Construction Logic
521
The final URL is constructed as:
522
1. If `host` contains `://`, use as base URL directly
523
2. Otherwise: `{schema}://{host}:{port}` (schema defaults to `http`, port is optional)
524
3. Endpoint is appended with smart slash handling: `base_url + '/' + endpoint`