0
# System Integration Hooks
1
2
Hooks provide standardized interfaces for connecting to external systems, databases, APIs, and services. They abstract connection management, authentication, and common operations while supporting Airflow's connection management system.
3
4
## Capabilities
5
6
### Base Hook Interface
7
8
Abstract foundation for all hooks providing connection management, environment variable support, and standardized external system interaction patterns.
9
10
```python { .api }
11
class BaseHook:
12
CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
13
14
def __init__(self, source):
15
"""
16
Abstract base class for hooks, meant as an interface to interact with external systems.
17
18
Parameters:
19
- source (Any): Source parameter (implementation dependent)
20
"""
21
22
@classmethod
23
def get_connections(cls, conn_id):
24
"""
25
Get all connections with the given connection ID.
26
27
Parameters:
28
- conn_id (str): Connection identifier
29
30
Returns:
31
- list: List of Connection objects
32
33
Raises:
34
- AirflowException: If connection not found
35
"""
36
37
@classmethod
38
def get_connection(cls, conn_id):
39
"""
40
Get a single connection, preferring environment variables.
41
42
Parameters:
43
- conn_id (str): Connection identifier
44
45
Returns:
46
- Connection: Connection object
47
48
Note:
49
Checks for environment variable AIRFLOW_CONN_{CONN_ID} first
50
"""
51
52
@classmethod
53
def get_hook(cls, conn_id):
54
"""
55
Get hook instance for the given connection ID.
56
57
Parameters:
58
- conn_id (str): Connection identifier
59
60
Returns:
61
- BaseHook: Hook instance
62
"""
63
64
def get_conn(self):
65
"""
66
Returns a connection object (must be implemented by subclasses).
67
68
Returns:
69
- object: Connection object specific to the hook implementation
70
"""
71
72
def get_records(self, sql):
73
"""
74
Execute SQL and return records (must be implemented by subclasses).
75
76
Parameters:
77
- sql (str): SQL query to execute
78
79
Returns:
80
- list: Query results
81
"""
82
83
def get_pandas_df(self, sql):
84
"""
85
Execute SQL and return pandas DataFrame (must be implemented by subclasses).
86
87
Parameters:
88
- sql (str): SQL query to execute
89
90
Returns:
91
- pandas.DataFrame: Query results as DataFrame
92
"""
93
94
def run(self, sql):
95
"""
96
Execute SQL command (must be implemented by subclasses).
97
98
Parameters:
99
- sql (str): SQL command to execute
100
"""
101
```
102
103
**Usage Example**:
104
105
```python
106
from airflow.hooks.base_hook import BaseHook
107
108
# Get connection using connection ID
109
conn = BaseHook.get_connection('my_database_conn')
110
print(f"Host: {conn.host}, Port: {conn.port}")
111
112
# Environment variable connection (AIRFLOW_CONN_MY_API)
113
api_conn = BaseHook.get_connection('my_api')
114
115
# Custom hook implementation
116
class CustomHook(BaseHook):
117
def __init__(self, conn_id='default_conn'):
118
self.conn_id = conn_id
119
self.connection = self.get_connection(conn_id)
120
121
def get_conn(self):
122
# Implementation specific connection logic
123
return self.connection
124
125
def test_connection(self):
126
conn = self.get_conn()
127
# Test connection logic
128
return True
129
```
130
131
### Database Connectivity
132
133
Standardized database operations following Python DB-API 2.0 specification with support for connection pooling, transactions, and bulk operations.
134
135
```python { .api }
136
class DbApiHook(BaseHook):
137
conn_name_attr = None
138
default_conn_name = 'default_conn_id'
139
supports_autocommit = False
140
connector = None
141
142
def get_conn(self):
143
"""
144
Returns a connection object.
145
146
Returns:
147
- object: Database connection object
148
"""
149
150
def get_pandas_df(self, sql, parameters=None):
151
"""
152
Execute SQL and return pandas DataFrame.
153
154
Parameters:
155
- sql (str): SQL query to execute
156
- parameters (dict, optional): Query parameters
157
158
Returns:
159
- pandas.DataFrame: Query results as DataFrame
160
"""
161
162
def get_records(self, sql, parameters=None):
163
"""
164
Execute SQL and return a set of records.
165
166
Parameters:
167
- sql (str): SQL query to execute
168
- parameters (dict, optional): Query parameters
169
170
Returns:
171
- list: List of tuples (query results)
172
"""
173
174
def get_first(self, sql, parameters=None):
175
"""
176
Execute SQL and return the first record.
177
178
Parameters:
179
- sql (str): SQL query to execute
180
- parameters (dict, optional): Query parameters
181
182
Returns:
183
- tuple: First record or None if no results
184
"""
185
186
def run(self, sql, autocommit=False, parameters=None):
187
"""
188
Execute SQL command(s).
189
190
Parameters:
191
- sql (str or list): SQL statement(s) to execute
192
- autocommit (bool): Whether to use autocommit
193
- parameters (dict, optional): Query parameters
194
"""
195
196
def get_cursor(self):
197
"""
198
Returns a cursor object.
199
200
Returns:
201
- object: Database cursor object
202
"""
203
204
def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
205
"""
206
Insert rows into table.
207
208
Parameters:
209
- table (str): Target table name
210
- rows (list): List of tuples to insert
211
- target_fields (list, optional): Target field names
212
- commit_every (int): Commit frequency
213
"""
214
215
def bulk_load(self, table, tmp_file):
216
"""
217
Load tab-delimited file into database table.
218
219
Note:
220
Abstract method, must be implemented by subclasses
221
222
Parameters:
223
- table (str): Target table name
224
- tmp_file (str): Path to tab-delimited file
225
"""
226
```
227
228
**Usage Examples**:
229
230
```python
231
from airflow.hooks.dbapi_hook import DbApiHook
232
233
# Custom database hook implementation
234
class PostgresHook(DbApiHook):
235
conn_name_attr = 'postgres_conn_id'
236
default_conn_name = 'postgres_default'
237
supports_autocommit = True
238
239
def get_conn(self):
240
import psycopg2
241
conn = self.get_connection(self.postgres_conn_id)
242
return psycopg2.connect(
243
host=conn.host,
244
port=conn.port,
245
user=conn.login,
246
password=conn.password,
247
database=conn.schema
248
)
249
250
# Usage in task
251
def query_database(**context):
252
hook = PostgresHook(postgres_conn_id='my_postgres')
253
254
# Execute query and get records
255
records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=(True,))
256
print(f"Found {len(records)} active users")
257
258
# Get pandas DataFrame
259
df = hook.get_pandas_df("SELECT user_id, name, email FROM users")
260
print(df.head())
261
262
# Execute insert/update
263
hook.run(
264
"UPDATE users SET last_login = NOW() WHERE user_id = %s",
265
parameters=(user_id,)
266
)
267
268
# Bulk insert
269
new_users = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
270
hook.insert_rows('users', new_users, target_fields=['id', 'name'])
271
272
# Using with PythonOperator
273
db_task = PythonOperator(
274
task_id='database_operations',
275
python_callable=query_database,
276
provide_context=True,
277
dag=dag
278
)
279
```
280
281
### HTTP API Integration
282
283
HTTP client functionality with session management, authentication, error handling, and response processing for REST API interactions.
284
285
```python { .api }
286
class HttpHook(BaseHook):
287
def __init__(self, method='POST', http_conn_id='http_default'):
288
"""
289
Interact with HTTP servers using the requests library.
290
291
Parameters:
292
- method (str): HTTP method to use (default: 'POST')
293
- http_conn_id (str): Connection ID for HTTP connection (default: 'http_default')
294
"""
295
296
def get_conn(self, headers):
297
"""
298
Returns HTTP session for use with requests.
299
300
Parameters:
301
- headers (dict): HTTP headers to include
302
303
Returns:
304
- requests.Session: HTTP session object
305
"""
306
307
def run(self, endpoint, data=None, headers=None, extra_options=None):
308
"""
309
Perform the HTTP request.
310
311
Parameters:
312
- endpoint (str): API endpoint to call
313
- data (dict, optional): Request data/parameters
314
- headers (dict, optional): HTTP headers
315
- extra_options (dict, optional): Additional options (stream, verify, proxies, cert, timeout, allow_redirects)
316
317
Returns:
318
- requests.Response: Response object
319
"""
320
321
def run_and_check(self, session, prepped_request, extra_options):
322
"""
323
Execute request with options and error checking.
324
325
Parameters:
326
- session (requests.Session): HTTP session
327
- prepped_request (requests.PreparedRequest): Prepared request
328
- extra_options (dict): Request options
329
330
Returns:
331
- requests.Response: Response object
332
333
Raises:
334
- AirflowException: On HTTP errors
335
"""
336
```
337
338
**Usage Examples**:
339
340
```python
341
from airflow.hooks.http_hook import HttpHook
342
from airflow.utils import AirflowException
343
import json
344
345
def call_api(**context):
346
# Basic API call
347
http_hook = HttpHook(method='GET', http_conn_id='api_default')
348
349
# GET request
350
response = http_hook.run(
351
endpoint='users/123',
352
headers={'Accept': 'application/json'}
353
)
354
355
if response.status_code == 200:
356
user_data = response.json()
357
print(f"User: {user_data['name']}")
358
else:
359
raise AirflowException(f"API call failed: {response.status_code}")
360
361
def post_data(**context):
362
# POST request with data
363
http_hook = HttpHook(method='POST', http_conn_id='api_default')
364
365
payload = {
366
'name': 'New User',
367
'email': 'newuser@example.com',
368
'date': context['ds']
369
}
370
371
response = http_hook.run(
372
endpoint='users',
373
data=json.dumps(payload),
374
headers={
375
'Content-Type': 'application/json',
376
'Accept': 'application/json'
377
}
378
)
379
380
if response.status_code == 201:
381
created_user = response.json()
382
print(f"Created user with ID: {created_user['id']}")
383
return created_user['id']
384
else:
385
raise AirflowException(f"Failed to create user: {response.text}")
386
387
def authenticated_request(**context):
388
# API call with authentication and custom options
389
http_hook = HttpHook(method='GET', http_conn_id='secure_api')
390
391
response = http_hook.run(
392
endpoint='protected/data',
393
headers={
394
'Authorization': 'Bearer your-token-here',
395
'Accept': 'application/json'
396
},
397
extra_options={
398
'timeout': 30,
399
'verify': True, # SSL verification
400
'stream': False
401
}
402
)
403
404
return response.json()
405
406
# File upload example
407
def upload_file(**context):
408
http_hook = HttpHook(method='POST', http_conn_id='file_api')
409
410
with open('/path/to/file.csv', 'rb') as f:
411
files = {'file': f}
412
response = http_hook.run(
413
endpoint='upload',
414
data={'description': 'Daily report'},
415
files=files
416
)
417
418
return response.json()
419
420
# Error handling with retry logic
421
def robust_api_call(**context):
422
http_hook = HttpHook(method='GET', http_conn_id='api_default')
423
424
max_retries = 3
425
for attempt in range(max_retries):
426
try:
427
response = http_hook.run(
428
endpoint='health',
429
extra_options={'timeout': 10}
430
)
431
432
if response.status_code == 200:
433
return response.json()
434
elif response.status_code >= 500:
435
# Server error, retry
436
if attempt < max_retries - 1:
437
print(f"Server error, retrying... (attempt {attempt + 1})")
438
continue
439
else:
440
raise AirflowException(f"Server error after {max_retries} attempts")
441
else:
442
# Client error, don't retry
443
raise AirflowException(f"Client error: {response.status_code}")
444
445
except Exception as e:
446
if attempt < max_retries - 1:
447
print(f"Request failed, retrying... (attempt {attempt + 1}): {e}")
448
continue
449
else:
450
raise AirflowException(f"Request failed after {max_retries} attempts: {e}")
451
452
# Using hooks in operators
453
api_call_task = PythonOperator(
454
task_id='call_external_api',
455
python_callable=call_api,
456
provide_context=True,
457
dag=dag
458
)
459
460
data_upload_task = PythonOperator(
461
task_id='upload_report',
462
python_callable=upload_file,
463
provide_context=True,
464
dag=dag
465
)
466
```
467
468
## Connection Management
469
470
Hooks integrate with Airflow's connection management system:
471
472
```python
473
# Connection via Airflow UI or environment variables
474
# Environment variable format: AIRFLOW_CONN_{CONN_ID}
475
# Example: AIRFLOW_CONN_MY_DB=postgresql://user:pass@host:5432/dbname
476
477
# Using connections in custom hooks
478
class CustomApiHook(HttpHook):
479
def __init__(self, api_conn_id='custom_api_default'):
480
super().__init__(http_conn_id=api_conn_id)
481
self.api_conn_id = api_conn_id
482
483
def get_auth_headers(self):
484
conn = self.get_connection(self.api_conn_id)
485
return {
486
'Authorization': f'Bearer {conn.password}',
487
'X-API-Key': conn.extra_dejson.get('api_key')
488
}
489
490
def call_api(self, endpoint, **kwargs):
491
headers = kwargs.get('headers', {})
492
headers.update(self.get_auth_headers())
493
kwargs['headers'] = headers
494
495
return self.run(endpoint, **kwargs)
496
```
497
498
## Error Handling Best Practices
499
500
```python
501
from airflow.utils import AirflowException
502
503
def safe_database_operation(**context):
504
hook = None
505
try:
506
hook = PostgresHook('postgres_conn')
507
508
# Perform database operations
509
result = hook.get_records("SELECT COUNT(*) FROM important_table")
510
511
if not result or result[0][0] == 0:
512
raise AirflowException("No data found in important_table")
513
514
return result[0][0]
515
516
except Exception as e:
517
# Log the error and re-raise as AirflowException
518
print(f"Database operation failed: {e}")
519
raise AirflowException(f"Database operation failed: {e}")
520
521
finally:
522
# Cleanup if needed
523
if hook:
524
# Close connections, cleanup resources
525
pass
526
```