Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Hooks provide standardized interfaces for connecting to external systems, databases, APIs, and services. They abstract connection management, authentication, and common operations while supporting Airflow's connection management system.
Abstract foundation for all hooks providing connection management, environment variable support, and standardized external system interaction patterns.
class BaseHook:
CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
def __init__(self, source):
"""
Abstract base class for hooks, meant as an interface to interact with external systems.
Parameters:
- source (Any): Source parameter (implementation dependent)
"""
@classmethod
def get_connections(cls, conn_id):
"""
Get all connections with the given connection ID.
Parameters:
- conn_id (str): Connection identifier
Returns:
- list: List of Connection objects
Raises:
- AirflowException: If connection not found
"""
@classmethod
def get_connection(cls, conn_id):
"""
Get a single connection, preferring environment variables.
Parameters:
- conn_id (str): Connection identifier
Returns:
- Connection: Connection object
Note:
Checks for environment variable AIRFLOW_CONN_{CONN_ID} first
"""
@classmethod
def get_hook(cls, conn_id):
"""
Get hook instance for the given connection ID.
Parameters:
- conn_id (str): Connection identifier
Returns:
- BaseHook: Hook instance
"""
def get_conn(self):
"""
Returns a connection object (must be implemented by subclasses).
Returns:
- object: Connection object specific to the hook implementation
"""
def get_records(self, sql):
"""
Execute SQL and return records (must be implemented by subclasses).
Parameters:
- sql (str): SQL query to execute
Returns:
- list: Query results
"""
def get_pandas_df(self, sql):
"""
Execute SQL and return pandas DataFrame (must be implemented by subclasses).
Parameters:
- sql (str): SQL query to execute
Returns:
- pandas.DataFrame: Query results as DataFrame
"""
def run(self, sql):
"""
Execute SQL command (must be implemented by subclasses).
Parameters:
- sql (str): SQL command to execute
"""Usage Example:
from airflow.hooks.base_hook import BaseHook
# Get connection using connection ID
conn = BaseHook.get_connection('my_database_conn')
print(f"Host: {conn.host}, Port: {conn.port}")
# Environment variable connection (AIRFLOW_CONN_MY_API)
api_conn = BaseHook.get_connection('my_api')
# Custom hook implementation
class CustomHook(BaseHook):
def __init__(self, conn_id='default_conn'):
self.conn_id = conn_id
self.connection = self.get_connection(conn_id)
def get_conn(self):
# Implementation specific connection logic
return self.connection
def test_connection(self):
conn = self.get_conn()
# Test connection logic
return TrueStandardized database operations following Python DB-API 2.0 specification with support for connection pooling, transactions, and bulk operations.
class DbApiHook(BaseHook):
conn_name_attr = None
default_conn_name = 'default_conn_id'
supports_autocommit = False
connector = None
def get_conn(self):
"""
Returns a connection object.
Returns:
- object: Database connection object
"""
def get_pandas_df(self, sql, parameters=None):
"""
Execute SQL and return pandas DataFrame.
Parameters:
- sql (str): SQL query to execute
- parameters (dict, optional): Query parameters
Returns:
- pandas.DataFrame: Query results as DataFrame
"""
def get_records(self, sql, parameters=None):
"""
Execute SQL and return a set of records.
Parameters:
- sql (str): SQL query to execute
- parameters (dict, optional): Query parameters
Returns:
- list: List of tuples (query results)
"""
def get_first(self, sql, parameters=None):
"""
Execute SQL and return the first record.
Parameters:
- sql (str): SQL query to execute
- parameters (dict, optional): Query parameters
Returns:
- tuple: First record or None if no results
"""
def run(self, sql, autocommit=False, parameters=None):
"""
Execute SQL command(s).
Parameters:
- sql (str or list): SQL statement(s) to execute
- autocommit (bool): Whether to use autocommit
- parameters (dict, optional): Query parameters
"""
def get_cursor(self):
"""
Returns a cursor object.
Returns:
- object: Database cursor object
"""
def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
"""
Insert rows into table.
Parameters:
- table (str): Target table name
- rows (list): List of tuples to insert
- target_fields (list, optional): Target field names
- commit_every (int): Commit frequency
"""
def bulk_load(self, table, tmp_file):
"""
Load tab-delimited file into database table.
Note:
Abstract method, must be implemented by subclasses
Parameters:
- table (str): Target table name
- tmp_file (str): Path to tab-delimited file
"""Usage Examples:
from airflow.hooks.dbapi_hook import DbApiHook
# Custom database hook implementation
class PostgresHook(DbApiHook):
conn_name_attr = 'postgres_conn_id'
default_conn_name = 'postgres_default'
supports_autocommit = True
def get_conn(self):
import psycopg2
conn = self.get_connection(self.postgres_conn_id)
return psycopg2.connect(
host=conn.host,
port=conn.port,
user=conn.login,
password=conn.password,
database=conn.schema
)
# Usage in task
def query_database(**context):
hook = PostgresHook(postgres_conn_id='my_postgres')
# Execute query and get records
records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=(True,))
print(f"Found {len(records)} active users")
# Get pandas DataFrame
df = hook.get_pandas_df("SELECT user_id, name, email FROM users")
print(df.head())
# Execute insert/update
hook.run(
"UPDATE users SET last_login = NOW() WHERE user_id = %s",
parameters=(user_id,)
)
# Bulk insert
new_users = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
hook.insert_rows('users', new_users, target_fields=['id', 'name'])
# Using with PythonOperator
db_task = PythonOperator(
task_id='database_operations',
python_callable=query_database,
provide_context=True,
dag=dag
)HTTP client functionality with session management, authentication, error handling, and response processing for REST API interactions.
class HttpHook(BaseHook):
def __init__(self, method='POST', http_conn_id='http_default'):
"""
Interact with HTTP servers using the requests library.
Parameters:
- method (str): HTTP method to use (default: 'POST')
- http_conn_id (str): Connection ID for HTTP connection (default: 'http_default')
"""
def get_conn(self, headers):
"""
Returns HTTP session for use with requests.
Parameters:
- headers (dict): HTTP headers to include
Returns:
- requests.Session: HTTP session object
"""
def run(self, endpoint, data=None, headers=None, extra_options=None):
"""
Perform the HTTP request.
Parameters:
- endpoint (str): API endpoint to call
- data (dict, optional): Request data/parameters
- headers (dict, optional): HTTP headers
- extra_options (dict, optional): Additional options (stream, verify, proxies, cert, timeout, allow_redirects)
Returns:
- requests.Response: Response object
"""
def run_and_check(self, session, prepped_request, extra_options):
"""
Execute request with options and error checking.
Parameters:
- session (requests.Session): HTTP session
- prepped_request (requests.PreparedRequest): Prepared request
- extra_options (dict): Request options
Returns:
- requests.Response: Response object
Raises:
- AirflowException: On HTTP errors
"""Usage Examples:
from airflow.hooks.http_hook import HttpHook
from airflow.utils import AirflowException
import json
def call_api(**context):
# Basic API call
http_hook = HttpHook(method='GET', http_conn_id='api_default')
# GET request
response = http_hook.run(
endpoint='users/123',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
user_data = response.json()
print(f"User: {user_data['name']}")
else:
raise AirflowException(f"API call failed: {response.status_code}")
def post_data(**context):
# POST request with data
http_hook = HttpHook(method='POST', http_conn_id='api_default')
payload = {
'name': 'New User',
'email': 'newuser@example.com',
'date': context['ds']
}
response = http_hook.run(
endpoint='users',
data=json.dumps(payload),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
}
)
if response.status_code == 201:
created_user = response.json()
print(f"Created user with ID: {created_user['id']}")
return created_user['id']
else:
raise AirflowException(f"Failed to create user: {response.text}")
def authenticated_request(**context):
# API call with authentication and custom options
http_hook = HttpHook(method='GET', http_conn_id='secure_api')
response = http_hook.run(
endpoint='protected/data',
headers={
'Authorization': 'Bearer your-token-here',
'Accept': 'application/json'
},
extra_options={
'timeout': 30,
'verify': True, # SSL verification
'stream': False
}
)
return response.json()
# File upload example
def upload_file(**context):
http_hook = HttpHook(method='POST', http_conn_id='file_api')
with open('/path/to/file.csv', 'rb') as f:
files = {'file': f}
response = http_hook.run(
endpoint='upload',
data={'description': 'Daily report'},
files=files
)
return response.json()
# Error handling with retry logic
def robust_api_call(**context):
http_hook = HttpHook(method='GET', http_conn_id='api_default')
max_retries = 3
for attempt in range(max_retries):
try:
response = http_hook.run(
endpoint='health',
extra_options={'timeout': 10}
)
if response.status_code == 200:
return response.json()
elif response.status_code >= 500:
# Server error, retry
if attempt < max_retries - 1:
print(f"Server error, retrying... (attempt {attempt + 1})")
continue
else:
raise AirflowException(f"Server error after {max_retries} attempts")
else:
# Client error, don't retry
raise AirflowException(f"Client error: {response.status_code}")
except Exception as e:
if attempt < max_retries - 1:
print(f"Request failed, retrying... (attempt {attempt + 1}): {e}")
continue
else:
raise AirflowException(f"Request failed after {max_retries} attempts: {e}")
# Using hooks in operators
api_call_task = PythonOperator(
task_id='call_external_api',
python_callable=call_api,
provide_context=True,
dag=dag
)
data_upload_task = PythonOperator(
task_id='upload_report',
python_callable=upload_file,
provide_context=True,
dag=dag
)Hooks integrate with Airflow's connection management system:
# Connection via Airflow UI or environment variables
# Environment variable format: AIRFLOW_CONN_{CONN_ID}
# Example: AIRFLOW_CONN_MY_DB=postgresql://user:pass@host:5432/dbname
# Using connections in custom hooks
class CustomApiHook(HttpHook):
def __init__(self, api_conn_id='custom_api_default'):
super().__init__(http_conn_id=api_conn_id)
self.api_conn_id = api_conn_id
def get_auth_headers(self):
conn = self.get_connection(self.api_conn_id)
return {
'Authorization': f'Bearer {conn.password}',
'X-API-Key': conn.extra_dejson.get('api_key')
}
def call_api(self, endpoint, **kwargs):
headers = kwargs.get('headers', {})
headers.update(self.get_auth_headers())
kwargs['headers'] = headers
return self.run(endpoint, **kwargs)from airflow.utils import AirflowException
def safe_database_operation(**context):
hook = None
try:
hook = PostgresHook('postgres_conn')
# Perform database operations
result = hook.get_records("SELECT COUNT(*) FROM important_table")
if not result or result[0][0] == 0:
raise AirflowException("No data found in important_table")
return result[0][0]
except Exception as e:
# Log the error and re-raise as AirflowException
print(f"Database operation failed: {e}")
raise AirflowException(f"Database operation failed: {e}")
finally:
# Cleanup if needed
if hook:
# Close connections, cleanup resources
passInstall with Tessl CLI
npx tessl i tessl/pypi-airflow