or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-http

Apache Airflow HTTP Provider package enabling HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-http@2021.4.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-http@2021.4.0

0

# Apache Airflow HTTP Provider

1

2

Apache Airflow HTTP Provider package enables HTTP interactions through hooks, operators, and sensors for making HTTP requests, checking endpoints, and handling responses in Airflow workflows.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-http

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-http`

9

10

## Core Imports

11

12

Basic HTTP functionality:

13

14

```python

15

from airflow.providers.http.hooks.http import HttpHook

16

from airflow.providers.http.operators.http import SimpleHttpOperator

17

from airflow.providers.http.sensors.http import HttpSensor

18

```

19

20

Common for utility functions:

21

22

```python

23

from airflow.providers.http import determine_kwargs, make_kwargs_callable

24

```

25

26

Additional imports for type annotations:

27

28

```python

29

from typing import Any, Callable, Dict, Optional, Union

30

import requests

31

from requests.auth import HTTPBasicAuth

32

from airflow.exceptions import AirflowException

33

```

34

35

## Basic Usage

36

37

### Making HTTP Requests with HttpHook

38

39

```python

40

from airflow.providers.http.hooks.http import HttpHook

41

42

# Create HTTP hook

43

hook = HttpHook(method='GET', http_conn_id='my_http_conn')

44

45

# Make a simple GET request

46

response = hook.run(endpoint='api/data')

47

print(response.text)

48

49

# Make a POST request with data

50

hook_post = HttpHook(method='POST', http_conn_id='my_http_conn')

51

response = hook_post.run(

52

endpoint='api/submit',

53

data={'key': 'value'},

54

headers={'Content-Type': 'application/json'}

55

)

56

```

57

58

### Using HTTP Operator in DAGs

59

60

```python

61

from datetime import datetime

62

from airflow import DAG

63

from airflow.providers.http.operators.http import SimpleHttpOperator

64

65

dag = DAG(

66

'http_example',

67

start_date=datetime(2023, 1, 1),

68

schedule_interval=None

69

)

70

71

# Simple HTTP task with response validation

72

def check_response(response, **context):

73

"""Custom response validation function"""

74

json_data = response.json()

75

return json_data.get('status') == 'success'

76

77

def filter_response(response, **context):

78

"""Extract specific data from response"""

79

return response.json().get('result_data')

80

81

http_task = SimpleHttpOperator(

82

task_id='call_api',

83

endpoint='api/process/{{ ds }}', # Templated with execution date

84

method='POST',

85

data={

86

'action': 'process',

87

'date': '{{ ds }}', # Templated field

88

'run_id': '{{ run_id }}' # Templated field

89

},

90

headers={

91

'Content-Type': 'application/json',

92

'X-Run-ID': '{{ run_id }}' # Templated field

93

},

94

response_check=check_response,

95

response_filter=filter_response,

96

http_conn_id='my_http_conn',

97

log_response=True,

98

dag=dag

99

)

100

```

101

102

### Monitoring Endpoints with HttpSensor

103

104

```python

105

from airflow.providers.http.sensors.http import HttpSensor

106

107

# Wait for API to be ready

108

sensor = HttpSensor(

109

task_id='wait_for_api',

110

endpoint='health',

111

http_conn_id='my_http_conn',

112

response_check=lambda response: response.json()['status'] == 'healthy',

113

poke_interval=30,

114

timeout=300,

115

dag=dag

116

)

117

```

118

119

## Architecture

120

121

The Apache Airflow HTTP Provider follows Airflow's standard provider pattern:

122

123

- **HttpHook**: Core component for HTTP connections and request execution, handles authentication, session management, and error handling

124

- **SimpleHttpOperator**: Task-level component that wraps HttpHook for use in DAGs, provides templating and response processing capabilities

125

- **HttpSensor**: Monitoring component for polling HTTP endpoints until conditions are met, extends BaseSensorOperator with HTTP-specific logic

126

- **Utility Functions**: Helper functions for dynamic callable management and parameter filtering

127

128

## Template Fields

129

130

Template fields enable dynamic content using Jinja2 templating with Airflow context variables:

131

132

### SimpleHttpOperator Template Fields

133

- `endpoint`: API endpoint path (e.g., `"api/data/{{ ds }}"`)

134

- `data`: Request payload or parameters (supports nested templating)

135

- `headers`: HTTP headers dictionary (keys and values can be templated)

136

137

### HttpSensor Template Fields

138

- `endpoint`: API endpoint path for polling

139

- `request_params`: Request parameters (becomes `data` in hook.run())

140

- `headers`: HTTP headers dictionary

141

142

### Common Template Variables

143

- `{{ ds }}`: Execution date (YYYY-MM-DD)

144

- `{{ run_id }}`: Unique run identifier

145

- `{{ task_instance }}`: Access to task instance object

146

- `{{ macros.datetime }}`: Date/time manipulation functions

147

148

## HTTP Hook Capabilities

149

150

The HttpHook provides the foundational HTTP connectivity layer for Airflow workflows.

151

152

```python { .api }

153

class HttpHook(BaseHook):

154

# Class attributes

155

conn_name_attr = 'http_conn_id'

156

default_conn_name = 'http_default'

157

conn_type = 'http'

158

hook_name = 'HTTP'

159

160

def __init__(

161

self,

162

method: str = 'POST',

163

http_conn_id: str = 'http_default',

164

auth_type: Any = HTTPBasicAuth

165

) -> None:

166

"""

167

Initialize HTTP hook with connection and method settings.

168

169

Parameters:

170

- method: HTTP method to use (GET, POST, PUT, DELETE, HEAD)

171

- http_conn_id: Airflow connection ID for HTTP configuration

172

- auth_type: Authentication type (HTTPBasicAuth or custom)

173

"""

174

175

def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session:

176

"""

177

Create HTTP session with connection configuration.

178

179

Parameters:

180

- headers: Additional headers to include in session

181

182

Returns:

183

Configured requests.Session object

184

"""

185

186

def run(

187

self,

188

endpoint: Optional[str],

189

data: Optional[Union[Dict[str, Any], str]] = None,

190

headers: Optional[Dict[str, Any]] = None,

191

extra_options: Optional[Dict[str, Any]] = None,

192

**request_kwargs: Any

193

) -> Any:

194

"""

195

Execute HTTP request with specified parameters.

196

197

Method-specific behavior:

198

- GET: data becomes URL parameters (params)

199

- HEAD: data is ignored (no body or params)

200

- POST/PUT/PATCH: data becomes request body

201

202

URL construction: base_url + '/' + endpoint (with smart slash handling)

203

204

Parameters:

205

- endpoint: API endpoint to call (relative path)

206

- data: Request payload (POST/PUT) or URL parameters (GET)

207

- headers: HTTP headers for the request

208

- extra_options: Additional options (timeout, verify, stream, etc.)

209

- request_kwargs: Additional arguments passed to requests.Request (json, files, etc.)

210

211

Returns:

212

requests.Response object from the HTTP call

213

214

Raises:

215

AirflowException: On HTTP errors (non-2XX/3XX status codes)

216

requests.exceptions.ConnectionError: On connection issues

217

"""

218

219

def check_response(self, response: requests.Response) -> None:

220

"""

221

Validate response status code, raise exception on HTTP errors.

222

223

Parameters:

224

- response: Response object to validate

225

226

Raises:

227

AirflowException: For non-2XX/3XX status codes

228

"""

229

230

def run_and_check(

231

self,

232

session: requests.Session,

233

prepped_request: requests.PreparedRequest,

234

extra_options: Dict[Any, Any]

235

) -> Any:

236

"""

237

Execute prepared request using session and validate response.

238

239

Handles request execution with configurable options like timeout,

240

SSL verification, proxies, and response checking.

241

242

Parameters:

243

- session: Configured requests session to use

244

- prepped_request: Prepared request object from session.prepare_request()

245

- extra_options: Request execution options (stream, verify, proxies, cert, timeout, etc.)

246

247

Returns:

248

requests.Response object from successful request

249

250

Raises:

251

requests.exceptions.ConnectionError: On connection issues (will be retried if using run_with_advanced_retry)

252

AirflowException: On HTTP errors if check_response is enabled

253

"""

254

255

def run_with_advanced_retry(

256

self,

257

_retry_args: Dict[Any, Any],

258

*args: Any,

259

**kwargs: Any

260

) -> Any:

261

"""

262

Execute run method with Tenacity-based retry logic.

263

264

Parameters:

265

- _retry_args: Tenacity retry configuration dict

266

- args, kwargs: Arguments passed to run method

267

268

Returns:

269

Result from successful HTTP request after retries

270

"""

271

```

272

273

## HTTP Operator Capabilities

274

275

The SimpleHttpOperator enables HTTP requests as Airflow tasks with full templating support.

276

277

```python { .api }

278

class SimpleHttpOperator(BaseOperator):

279

# Template configuration

280

template_fields = ['endpoint', 'data', 'headers']

281

template_fields_renderers = {'headers': 'json', 'data': 'py'}

282

template_ext = ()

283

ui_color = '#f4a460'

284

285

def __init__(

286

self,

287

*,

288

endpoint: Optional[str] = None,

289

method: str = 'POST',

290

data: Any = None,

291

headers: Optional[Dict[str, str]] = None,

292

response_check: Optional[Callable[..., bool]] = None,

293

response_filter: Optional[Callable[..., Any]] = None,

294

extra_options: Optional[Dict[str, Any]] = None,

295

http_conn_id: str = 'http_default',

296

log_response: bool = False,

297

**kwargs: Any

298

) -> None:

299

"""

300

Initialize HTTP operator with request configuration.

301

302

Parameters:

303

- endpoint: API endpoint (templated)

304

- method: HTTP method to use

305

- data: Request data/parameters (templated)

306

- headers: HTTP headers (templated)

307

- response_check: Function to validate response (returns bool)

308

- response_filter: Function to transform response data

309

- extra_options: Additional request options

310

- http_conn_id: Airflow connection ID

311

- log_response: Whether to log response content

312

- kwargs: Additional BaseOperator arguments

313

"""

314

315

def execute(self, context: Dict[str, Any]) -> Any:

316

"""

317

Execute HTTP request using HttpHook.

318

319

Parameters:

320

- context: Airflow execution context

321

322

Returns:

323

Response text or filtered response data

324

"""

325

```

326

327

## HTTP Sensor Capabilities

328

329

The HttpSensor monitors HTTP endpoints until specified conditions are met.

330

331

```python { .api }

332

class HttpSensor(BaseSensorOperator):

333

# Template configuration

334

template_fields = ('endpoint', 'request_params', 'headers')

335

336

def __init__(

337

self,

338

*,

339

endpoint: str,

340

http_conn_id: str = 'http_default',

341

method: str = 'GET',

342

request_params: Optional[Dict[str, Any]] = None,

343

headers: Optional[Dict[str, Any]] = None,

344

response_check: Optional[Callable[..., bool]] = None,

345

extra_options: Optional[Dict[str, Any]] = None,

346

**kwargs: Any

347

) -> None:

348

"""

349

Initialize HTTP sensor with polling configuration.

350

351

Parameters:

352

- endpoint: API endpoint to monitor (required, templated)

353

- http_conn_id: Airflow connection ID for HTTP configuration

354

- method: HTTP method for polling requests (GET, POST, etc.)

355

- request_params: Request parameters/data (templated, becomes data in hook.run())

356

- headers: HTTP headers dictionary (templated)

357

- response_check: Custom response validation function returning bool

358

- extra_options: Additional request options (timeout, verify, etc.)

359

- kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)

360

"""

361

362

def poke(self, context: Dict[Any, Any]) -> bool:

363

"""

364

Execute HTTP request and evaluate success condition.

365

366

Behavior:

367

- Executes HTTP request using configured hook

368

- Returns False for 404 errors (continues polling)

369

- Raises exception for other HTTP errors (fails sensor)

370

- Uses response_check function if provided for custom validation

371

- Returns True if no response_check or response_check returns True

372

373

Parameters:

374

- context: Airflow execution context with task_instance, ds, etc.

375

376

Returns:

377

True if sensor condition met (stop polling), False to continue polling

378

379

Raises:

380

AirflowException: For HTTP errors other than 404, or connection issues

381

"""

382

```

383

384

## Utility Functions

385

386

Helper functions for dynamic callable management and parameter filtering.

387

388

```python { .api }

389

def determine_kwargs(

390

func: Callable,

391

args: Union[Tuple, List],

392

kwargs: Dict

393

) -> Dict:

394

"""

395

Inspect callable signature to determine which kwargs to pass.

396

397

Parameters:

398

- func: The callable to inspect

399

- args: Positional arguments to skip in signature

400

- kwargs: Keyword arguments to filter

401

402

Returns:

403

Dictionary with compatible keyword arguments

404

"""

405

406

def make_kwargs_callable(func: Callable) -> Callable:

407

"""

408

Create callable that accepts any arguments but only forwards required ones.

409

410

Parameters:

411

- func: Function to wrap

412

413

Returns:

414

Wrapper function that filters arguments based on signature

415

"""

416

```

417

418

## Authentication and Security

419

420

The HTTP provider supports multiple authentication methods:

421

422

- **HTTPBasicAuth**: Username/password authentication via connection credentials

423

- **Custom Authentication**: Pass custom auth objects via auth_type parameter

424

- **Header-based Authentication**: API keys and tokens via headers in connection extras

425

- **SSL Configuration**: Certificate validation and client certificates via extra_options

426

427

## Error Handling

428

429

Comprehensive error handling for robust HTTP operations:

430

431

### Exception Types

432

433

- **`AirflowException`**: Raised for HTTP status errors (non-2XX/3XX codes)

434

- Format: "{status_code}:{reason}" (e.g., "404:Not Found")

435

- Automatically raised by `check_response()` method

436

- Can be disabled via `extra_options={'check_response': False}`

437

438

- **`requests.exceptions.ConnectionError`**: Network connectivity issues

439

- Automatically retried when using `run_with_advanced_retry()`

440

- Includes DNS resolution failures, network timeouts, connection refused

441

442

- **`requests.exceptions.HTTPError`**: Base class for HTTP-related errors

443

- Caught and converted to AirflowException in check_response()

444

445

### Error Handling Strategies

446

447

- **Connection Errors**: Automatic retry capabilities with Tenacity integration via `run_with_advanced_retry()`

448

- **HTTP Status Errors**: Configurable response validation with custom check functions

449

- **Timeout Handling**: Request timeout configuration via `extra_options={'timeout': seconds}`

450

- **Custom Validation**: Response check functions for application-specific validation

451

- **SSL Errors**: Certificate validation control via `extra_options={'verify': False}`

452

453

### HttpSensor Specific Behavior

454

455

- **404 Errors**: Returns False (continue polling) instead of failing

456

- **Other HTTP Errors**: Raises AirflowException (fails sensor)

457

- **Custom Response Validation**: Uses response_check function for conditional success

458

459

## Connection Configuration

460

461

HTTP connections are configured in Airflow with these components:

462

463

### Required Fields

464

- **Host**: Base URL for HTTP requests (can include protocol like `https://api.example.com`)

465

466

### Optional Fields

467

- **Login/Password**: Credentials for HTTPBasicAuth authentication

468

- **Schema**: Protocol specification (`http` or `https`) - defaults to `http` if not in host

469

- **Port**: Port number for non-standard ports

470

471

### Extra Configuration (JSON)

472

The Extra field accepts JSON configuration for advanced options:

473

474

#### Headers

475

```json

476

{

477

"headers": {

478

"User-Agent": "Airflow-HTTP-Provider",

479

"Accept": "application/json",

480

"Authorization": "Bearer token123"

481

}

482

}

483

```

484

485

#### SSL and Security Options

486

```json

487

{

488

"verify": true,

489

"cert": "/path/to/client.pem",

490

"timeout": 60

491

}

492

```

493

494

#### Proxy Configuration

495

```json

496

{

497

"proxies": {

498

"http": "http://proxy:8080",

499

"https": "https://proxy:8080"

500

}

501

}

502

```

503

504

### Complete Example Connection

505

```json

506

{

507

"headers": {

508

"User-Agent": "Airflow-HTTP-Provider/2.1.0",

509

"Accept": "application/json",

510

"Content-Type": "application/json"

511

},

512

"verify": true,

513

"timeout": 30,

514

"proxies": {

515

"https": "https://corporate-proxy:8080"

516

}

517

}

518

```

519

520

### URL Construction Logic

521

The final URL is constructed as:

522

1. If `host` contains `://`, use as base URL directly

523

2. Otherwise: `{schema}://{host}:{port}` (schema defaults to `http`, port is optional)

524

3. Endpoint is appended with smart slash handling: `base_url + '/' + endpoint`