or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdhooks.mdindex.mdoperators.md

hooks.mddocs/

0

# System Integration Hooks

1

2

Hooks provide standardized interfaces for connecting to external systems, databases, APIs, and services. They abstract connection management, authentication, and common operations while supporting Airflow's connection management system.

3

4

## Capabilities

5

6

### Base Hook Interface

7

8

Abstract foundation for all hooks providing connection management, environment variable support, and standardized external system interaction patterns.

9

10

```python { .api }

11

class BaseHook:

12

CONN_ENV_PREFIX = 'AIRFLOW_CONN_'

13

14

def __init__(self, source):

15

"""

16

Abstract base class for hooks, meant as an interface to interact with external systems.

17

18

Parameters:

19

- source (Any): Source parameter (implementation dependent)

20

"""

21

22

@classmethod

23

def get_connections(cls, conn_id):

24

"""

25

Get all connections with the given connection ID.

26

27

Parameters:

28

- conn_id (str): Connection identifier

29

30

Returns:

31

- list: List of Connection objects

32

33

Raises:

34

- AirflowException: If connection not found

35

"""

36

37

@classmethod

38

def get_connection(cls, conn_id):

39

"""

40

Get a single connection, preferring environment variables.

41

42

Parameters:

43

- conn_id (str): Connection identifier

44

45

Returns:

46

- Connection: Connection object

47

48

Note:

49

Checks for environment variable AIRFLOW_CONN_{CONN_ID} first

50

"""

51

52

@classmethod

53

def get_hook(cls, conn_id):

54

"""

55

Get hook instance for the given connection ID.

56

57

Parameters:

58

- conn_id (str): Connection identifier

59

60

Returns:

61

- BaseHook: Hook instance

62

"""

63

64

def get_conn(self):

65

"""

66

Returns a connection object (must be implemented by subclasses).

67

68

Returns:

69

- object: Connection object specific to the hook implementation

70

"""

71

72

def get_records(self, sql):

73

"""

74

Execute SQL and return records (must be implemented by subclasses).

75

76

Parameters:

77

- sql (str): SQL query to execute

78

79

Returns:

80

- list: Query results

81

"""

82

83

def get_pandas_df(self, sql):

84

"""

85

Execute SQL and return pandas DataFrame (must be implemented by subclasses).

86

87

Parameters:

88

- sql (str): SQL query to execute

89

90

Returns:

91

- pandas.DataFrame: Query results as DataFrame

92

"""

93

94

def run(self, sql):

95

"""

96

Execute SQL command (must be implemented by subclasses).

97

98

Parameters:

99

- sql (str): SQL command to execute

100

"""

101

```

102

103

**Usage Example**:

104

105

```python

106

from airflow.hooks.base_hook import BaseHook

107

108

# Get connection using connection ID

109

conn = BaseHook.get_connection('my_database_conn')

110

print(f"Host: {conn.host}, Port: {conn.port}")

111

112

# Environment variable connection (AIRFLOW_CONN_MY_API)

113

api_conn = BaseHook.get_connection('my_api')

114

115

# Custom hook implementation

116

class CustomHook(BaseHook):

117

def __init__(self, conn_id='default_conn'):

118

self.conn_id = conn_id

119

self.connection = self.get_connection(conn_id)

120

121

def get_conn(self):

122

# Implementation specific connection logic

123

return self.connection

124

125

def test_connection(self):

126

conn = self.get_conn()

127

# Test connection logic

128

return True

129

```

130

131

### Database Connectivity

132

133

Standardized database operations following Python DB-API 2.0 specification with support for connection pooling, transactions, and bulk operations.

134

135

```python { .api }

136

class DbApiHook(BaseHook):

137

conn_name_attr = None

138

default_conn_name = 'default_conn_id'

139

supports_autocommit = False

140

connector = None

141

142

def get_conn(self):

143

"""

144

Returns a connection object.

145

146

Returns:

147

- object: Database connection object

148

"""

149

150

def get_pandas_df(self, sql, parameters=None):

151

"""

152

Execute SQL and return pandas DataFrame.

153

154

Parameters:

155

- sql (str): SQL query to execute

156

- parameters (dict, optional): Query parameters

157

158

Returns:

159

- pandas.DataFrame: Query results as DataFrame

160

"""

161

162

def get_records(self, sql, parameters=None):

163

"""

164

Execute SQL and return a set of records.

165

166

Parameters:

167

- sql (str): SQL query to execute

168

- parameters (dict, optional): Query parameters

169

170

Returns:

171

- list: List of tuples (query results)

172

"""

173

174

def get_first(self, sql, parameters=None):

175

"""

176

Execute SQL and return the first record.

177

178

Parameters:

179

- sql (str): SQL query to execute

180

- parameters (dict, optional): Query parameters

181

182

Returns:

183

- tuple: First record or None if no results

184

"""

185

186

def run(self, sql, autocommit=False, parameters=None):

187

"""

188

Execute SQL command(s).

189

190

Parameters:

191

- sql (str or list): SQL statement(s) to execute

192

- autocommit (bool): Whether to use autocommit

193

- parameters (dict, optional): Query parameters

194

"""

195

196

def get_cursor(self):

197

"""

198

Returns a cursor object.

199

200

Returns:

201

- object: Database cursor object

202

"""

203

204

def insert_rows(self, table, rows, target_fields=None, commit_every=1000):

205

"""

206

Insert rows into table.

207

208

Parameters:

209

- table (str): Target table name

210

- rows (list): List of tuples to insert

211

- target_fields (list, optional): Target field names

212

- commit_every (int): Commit frequency

213

"""

214

215

def bulk_load(self, table, tmp_file):

216

"""

217

Load tab-delimited file into database table.

218

219

Note:

220

Abstract method, must be implemented by subclasses

221

222

Parameters:

223

- table (str): Target table name

224

- tmp_file (str): Path to tab-delimited file

225

"""

226

```

227

228

**Usage Examples**:

229

230

```python

231

from airflow.hooks.dbapi_hook import DbApiHook

232

233

# Custom database hook implementation

234

class PostgresHook(DbApiHook):

235

conn_name_attr = 'postgres_conn_id'

236

default_conn_name = 'postgres_default'

237

supports_autocommit = True

238

239

def get_conn(self):

240

import psycopg2

241

conn = self.get_connection(self.postgres_conn_id)

242

return psycopg2.connect(

243

host=conn.host,

244

port=conn.port,

245

user=conn.login,

246

password=conn.password,

247

database=conn.schema

248

)

249

250

# Usage in task

251

def query_database(**context):

252

hook = PostgresHook(postgres_conn_id='my_postgres')

253

254

# Execute query and get records

255

records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=(True,))

256

print(f"Found {len(records)} active users")

257

258

# Get pandas DataFrame

259

df = hook.get_pandas_df("SELECT user_id, name, email FROM users")

260

print(df.head())

261

262

# Execute insert/update

263

hook.run(

264

"UPDATE users SET last_login = NOW() WHERE user_id = %s",

265

parameters=(user_id,)

266

)

267

268

# Bulk insert

269

new_users = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]

270

hook.insert_rows('users', new_users, target_fields=['id', 'name'])

271

272

# Using with PythonOperator

273

db_task = PythonOperator(

274

task_id='database_operations',

275

python_callable=query_database,

276

provide_context=True,

277

dag=dag

278

)

279

```

280

281

### HTTP API Integration

282

283

HTTP client functionality with session management, authentication, error handling, and response processing for REST API interactions.

284

285

```python { .api }

286

class HttpHook(BaseHook):

287

def __init__(self, method='POST', http_conn_id='http_default'):

288

"""

289

Interact with HTTP servers using the requests library.

290

291

Parameters:

292

- method (str): HTTP method to use (default: 'POST')

293

- http_conn_id (str): Connection ID for HTTP connection (default: 'http_default')

294

"""

295

296

def get_conn(self, headers):

297

"""

298

Returns HTTP session for use with requests.

299

300

Parameters:

301

- headers (dict): HTTP headers to include

302

303

Returns:

304

- requests.Session: HTTP session object

305

"""

306

307

def run(self, endpoint, data=None, headers=None, extra_options=None):

308

"""

309

Perform the HTTP request.

310

311

Parameters:

312

- endpoint (str): API endpoint to call

313

- data (dict, optional): Request data/parameters

314

- headers (dict, optional): HTTP headers

315

- extra_options (dict, optional): Additional options (stream, verify, proxies, cert, timeout, allow_redirects)

316

317

Returns:

318

- requests.Response: Response object

319

"""

320

321

def run_and_check(self, session, prepped_request, extra_options):

322

"""

323

Execute request with options and error checking.

324

325

Parameters:

326

- session (requests.Session): HTTP session

327

- prepped_request (requests.PreparedRequest): Prepared request

328

- extra_options (dict): Request options

329

330

Returns:

331

- requests.Response: Response object

332

333

Raises:

334

- AirflowException: On HTTP errors

335

"""

336

```

337

338

**Usage Examples**:

339

340

```python

341

from airflow.hooks.http_hook import HttpHook

342

from airflow.utils import AirflowException

343

import json

344

345

def call_api(**context):

346

# Basic API call

347

http_hook = HttpHook(method='GET', http_conn_id='api_default')

348

349

# GET request

350

response = http_hook.run(

351

endpoint='users/123',

352

headers={'Accept': 'application/json'}

353

)

354

355

if response.status_code == 200:

356

user_data = response.json()

357

print(f"User: {user_data['name']}")

358

else:

359

raise AirflowException(f"API call failed: {response.status_code}")

360

361

def post_data(**context):

362

# POST request with data

363

http_hook = HttpHook(method='POST', http_conn_id='api_default')

364

365

payload = {

366

'name': 'New User',

367

'email': 'newuser@example.com',

368

'date': context['ds']

369

}

370

371

response = http_hook.run(

372

endpoint='users',

373

data=json.dumps(payload),

374

headers={

375

'Content-Type': 'application/json',

376

'Accept': 'application/json'

377

}

378

)

379

380

if response.status_code == 201:

381

created_user = response.json()

382

print(f"Created user with ID: {created_user['id']}")

383

return created_user['id']

384

else:

385

raise AirflowException(f"Failed to create user: {response.text}")

386

387

def authenticated_request(**context):

388

# API call with authentication and custom options

389

http_hook = HttpHook(method='GET', http_conn_id='secure_api')

390

391

response = http_hook.run(

392

endpoint='protected/data',

393

headers={

394

'Authorization': 'Bearer your-token-here',

395

'Accept': 'application/json'

396

},

397

extra_options={

398

'timeout': 30,

399

'verify': True, # SSL verification

400

'stream': False

401

}

402

)

403

404

return response.json()

405

406

# File upload example

407

def upload_file(**context):

408

http_hook = HttpHook(method='POST', http_conn_id='file_api')

409

410

with open('/path/to/file.csv', 'rb') as f:

411

files = {'file': f}

412

response = http_hook.run(

413

endpoint='upload',

414

data={'description': 'Daily report'},

415

files=files

416

)

417

418

return response.json()

419

420

# Error handling with retry logic

421

def robust_api_call(**context):

422

http_hook = HttpHook(method='GET', http_conn_id='api_default')

423

424

max_retries = 3

425

for attempt in range(max_retries):

426

try:

427

response = http_hook.run(

428

endpoint='health',

429

extra_options={'timeout': 10}

430

)

431

432

if response.status_code == 200:

433

return response.json()

434

elif response.status_code >= 500:

435

# Server error, retry

436

if attempt < max_retries - 1:

437

print(f"Server error, retrying... (attempt {attempt + 1})")

438

continue

439

else:

440

raise AirflowException(f"Server error after {max_retries} attempts")

441

else:

442

# Client error, don't retry

443

raise AirflowException(f"Client error: {response.status_code}")

444

445

except Exception as e:

446

if attempt < max_retries - 1:

447

print(f"Request failed, retrying... (attempt {attempt + 1}): {e}")

448

continue

449

else:

450

raise AirflowException(f"Request failed after {max_retries} attempts: {e}")

451

452

# Using hooks in operators

453

api_call_task = PythonOperator(

454

task_id='call_external_api',

455

python_callable=call_api,

456

provide_context=True,

457

dag=dag

458

)

459

460

data_upload_task = PythonOperator(

461

task_id='upload_report',

462

python_callable=upload_file,

463

provide_context=True,

464

dag=dag

465

)

466

```

467

468

## Connection Management

469

470

Hooks integrate with Airflow's connection management system:

471

472

```python

473

# Connection via Airflow UI or environment variables

474

# Environment variable format: AIRFLOW_CONN_{CONN_ID}

475

# Example: AIRFLOW_CONN_MY_DB=postgresql://user:pass@host:5432/dbname

476

477

# Using connections in custom hooks

478

class CustomApiHook(HttpHook):

479

def __init__(self, api_conn_id='custom_api_default'):

480

super().__init__(http_conn_id=api_conn_id)

481

self.api_conn_id = api_conn_id

482

483

def get_auth_headers(self):

484

conn = self.get_connection(self.api_conn_id)

485

return {

486

'Authorization': f'Bearer {conn.password}',

487

'X-API-Key': conn.extra_dejson.get('api_key')

488

}

489

490

def call_api(self, endpoint, **kwargs):

491

headers = kwargs.get('headers', {})

492

headers.update(self.get_auth_headers())

493

kwargs['headers'] = headers

494

495

return self.run(endpoint, **kwargs)

496

```

497

498

## Error Handling Best Practices

499

500

```python

501

from airflow.utils import AirflowException

502

503

def safe_database_operation(**context):

504

hook = None

505

try:

506

hook = PostgresHook('postgres_conn')

507

508

# Perform database operations

509

result = hook.get_records("SELECT COUNT(*) FROM important_table")

510

511

if not result or result[0][0] == 0:

512

raise AirflowException("No data found in important_table")

513

514

return result[0][0]

515

516

except Exception as e:

517

# Log the error and re-raise as AirflowException

518

print(f"Database operation failed: {e}")

519

raise AirflowException(f"Database operation failed: {e}")

520

521

finally:

522

# Cleanup if needed

523

if hook:

524

# Close connections, cleanup resources

525

pass

526

```