CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airflow

Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

hooks.mddocs/

System Integration Hooks

Hooks provide standardized interfaces for connecting to external systems, databases, APIs, and services. They abstract connection management, authentication, and common operations while supporting Airflow's connection management system.

Capabilities

Base Hook Interface

Abstract foundation for all hooks providing connection management, environment variable support, and standardized external system interaction patterns.

class BaseHook:
    CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
    
    def __init__(self, source):
        """
        Abstract base class for hooks, meant as an interface to interact with external systems.
        
        Parameters:
        - source (Any): Source parameter (implementation dependent)
        """

    @classmethod
    def get_connections(cls, conn_id):
        """
        Get all connections with the given connection ID.
        
        Parameters:
        - conn_id (str): Connection identifier
        
        Returns:
        - list: List of Connection objects
        
        Raises:
        - AirflowException: If connection not found
        """

    @classmethod
    def get_connection(cls, conn_id):
        """
        Get a single connection, preferring environment variables.
        
        Parameters:
        - conn_id (str): Connection identifier
        
        Returns:
        - Connection: Connection object
        
        Note:
        Checks for environment variable AIRFLOW_CONN_{CONN_ID} first
        """

    @classmethod
    def get_hook(cls, conn_id):
        """
        Get hook instance for the given connection ID.
        
        Parameters:
        - conn_id (str): Connection identifier
        
        Returns:
        - BaseHook: Hook instance
        """

    def get_conn(self):
        """
        Returns a connection object (must be implemented by subclasses).
        
        Returns:
        - object: Connection object specific to the hook implementation
        """

    def get_records(self, sql):
        """
        Execute SQL and return records (must be implemented by subclasses).
        
        Parameters:
        - sql (str): SQL query to execute
        
        Returns:
        - list: Query results
        """

    def get_pandas_df(self, sql):
        """
        Execute SQL and return pandas DataFrame (must be implemented by subclasses).
        
        Parameters:
        - sql (str): SQL query to execute
        
        Returns:
        - pandas.DataFrame: Query results as DataFrame
        """

    def run(self, sql):
        """
        Execute SQL command (must be implemented by subclasses).
        
        Parameters:
        - sql (str): SQL command to execute
        """

Usage Example:

from airflow.hooks.base_hook import BaseHook

# Get connection using connection ID
conn = BaseHook.get_connection('my_database_conn')
print(f"Host: {conn.host}, Port: {conn.port}")

# Environment variable connection (AIRFLOW_CONN_MY_API)
api_conn = BaseHook.get_connection('my_api')

# Custom hook implementation
class CustomHook(BaseHook):
    def __init__(self, conn_id='default_conn'):
        self.conn_id = conn_id
        self.connection = self.get_connection(conn_id)
    
    def get_conn(self):
        # Implementation specific connection logic
        return self.connection
    
    def test_connection(self):
        conn = self.get_conn()
        # Test connection logic
        return True

Database Connectivity

Standardized database operations following Python DB-API 2.0 specification with support for connection pooling, transactions, and bulk operations.

class DbApiHook(BaseHook):
    conn_name_attr = None
    default_conn_name = 'default_conn_id'
    supports_autocommit = False
    connector = None

    def get_conn(self):
        """
        Returns a connection object.
        
        Returns:
        - object: Database connection object
        """

    def get_pandas_df(self, sql, parameters=None):
        """
        Execute SQL and return pandas DataFrame.
        
        Parameters:
        - sql (str): SQL query to execute
        - parameters (dict, optional): Query parameters
        
        Returns:
        - pandas.DataFrame: Query results as DataFrame
        """

    def get_records(self, sql, parameters=None):
        """
        Execute SQL and return a set of records.
        
        Parameters:
        - sql (str): SQL query to execute
        - parameters (dict, optional): Query parameters
        
        Returns:
        - list: List of tuples (query results)
        """

    def get_first(self, sql, parameters=None):
        """
        Execute SQL and return the first record.
        
        Parameters:
        - sql (str): SQL query to execute
        - parameters (dict, optional): Query parameters
        
        Returns:
        - tuple: First record or None if no results
        """

    def run(self, sql, autocommit=False, parameters=None):
        """
        Execute SQL command(s).
        
        Parameters:
        - sql (str or list): SQL statement(s) to execute
        - autocommit (bool): Whether to use autocommit
        - parameters (dict, optional): Query parameters
        """

    def get_cursor(self):
        """
        Returns a cursor object.
        
        Returns:
        - object: Database cursor object
        """

    def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
        """
        Insert rows into table.
        
        Parameters:
        - table (str): Target table name
        - rows (list): List of tuples to insert
        - target_fields (list, optional): Target field names
        - commit_every (int): Commit frequency
        """

    def bulk_load(self, table, tmp_file):
        """
        Load tab-delimited file into database table.
        
        Note:
        Abstract method, must be implemented by subclasses
        
        Parameters:
        - table (str): Target table name
        - tmp_file (str): Path to tab-delimited file
        """

Usage Examples:

from airflow.hooks.dbapi_hook import DbApiHook

# Custom database hook implementation
class PostgresHook(DbApiHook):
    conn_name_attr = 'postgres_conn_id'
    default_conn_name = 'postgres_default'
    supports_autocommit = True
    
    def get_conn(self):
        import psycopg2
        conn = self.get_connection(self.postgres_conn_id)
        return psycopg2.connect(
            host=conn.host,
            port=conn.port,
            user=conn.login,
            password=conn.password,
            database=conn.schema
        )

# Usage in task
def query_database(**context):
    hook = PostgresHook(postgres_conn_id='my_postgres')
    
    # Execute query and get records
    records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=(True,))
    print(f"Found {len(records)} active users")
    
    # Get pandas DataFrame
    df = hook.get_pandas_df("SELECT user_id, name, email FROM users")
    print(df.head())
    
    # Execute insert/update
    hook.run(
        "UPDATE users SET last_login = NOW() WHERE user_id = %s",
        parameters=(user_id,)
    )
    
    # Bulk insert
    new_users = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
    hook.insert_rows('users', new_users, target_fields=['id', 'name'])

# Using with PythonOperator
db_task = PythonOperator(
    task_id='database_operations',
    python_callable=query_database,
    provide_context=True,
    dag=dag
)

HTTP API Integration

HTTP client functionality with session management, authentication, error handling, and response processing for REST API interactions.

class HttpHook(BaseHook):
    def __init__(self, method='POST', http_conn_id='http_default'):
        """
        Interact with HTTP servers using the requests library.
        
        Parameters:
        - method (str): HTTP method to use (default: 'POST')
        - http_conn_id (str): Connection ID for HTTP connection (default: 'http_default')
        """

    def get_conn(self, headers):
        """
        Returns HTTP session for use with requests.
        
        Parameters:
        - headers (dict): HTTP headers to include
        
        Returns:
        - requests.Session: HTTP session object
        """

    def run(self, endpoint, data=None, headers=None, extra_options=None):
        """
        Perform the HTTP request.
        
        Parameters:
        - endpoint (str): API endpoint to call
        - data (dict, optional): Request data/parameters
        - headers (dict, optional): HTTP headers
        - extra_options (dict, optional): Additional options (stream, verify, proxies, cert, timeout, allow_redirects)
        
        Returns:
        - requests.Response: Response object
        """

    def run_and_check(self, session, prepped_request, extra_options):
        """
        Execute request with options and error checking.
        
        Parameters:
        - session (requests.Session): HTTP session
        - prepped_request (requests.PreparedRequest): Prepared request
        - extra_options (dict): Request options
        
        Returns:
        - requests.Response: Response object
        
        Raises:
        - AirflowException: On HTTP errors
        """

Usage Examples:

from airflow.hooks.http_hook import HttpHook
from airflow.utils import AirflowException
import json

def call_api(**context):
    # Basic API call
    http_hook = HttpHook(method='GET', http_conn_id='api_default')
    
    # GET request
    response = http_hook.run(
        endpoint='users/123',
        headers={'Accept': 'application/json'}
    )
    
    if response.status_code == 200:
        user_data = response.json()
        print(f"User: {user_data['name']}")
    else:
        raise AirflowException(f"API call failed: {response.status_code}")

def post_data(**context):
    # POST request with data
    http_hook = HttpHook(method='POST', http_conn_id='api_default')
    
    payload = {
        'name': 'New User',
        'email': 'newuser@example.com',
        'date': context['ds']
    }
    
    response = http_hook.run(
        endpoint='users',
        data=json.dumps(payload),
        headers={
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }
    )
    
    if response.status_code == 201:
        created_user = response.json()
        print(f"Created user with ID: {created_user['id']}")
        return created_user['id']
    else:
        raise AirflowException(f"Failed to create user: {response.text}")

def authenticated_request(**context):
    # API call with authentication and custom options
    http_hook = HttpHook(method='GET', http_conn_id='secure_api')
    
    response = http_hook.run(
        endpoint='protected/data',
        headers={
            'Authorization': 'Bearer your-token-here',
            'Accept': 'application/json'
        },
        extra_options={
            'timeout': 30,
            'verify': True,  # SSL verification
            'stream': False
        }
    )
    
    return response.json()

# File upload example
def upload_file(**context):
    http_hook = HttpHook(method='POST', http_conn_id='file_api')
    
    with open('/path/to/file.csv', 'rb') as f:
        files = {'file': f}
        response = http_hook.run(
            endpoint='upload',
            data={'description': 'Daily report'},
            files=files
        )
    
    return response.json()

# Error handling with retry logic
def robust_api_call(**context):
    http_hook = HttpHook(method='GET', http_conn_id='api_default')
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = http_hook.run(
                endpoint='health',
                extra_options={'timeout': 10}
            )
            
            if response.status_code == 200:
                return response.json()
            elif response.status_code >= 500:
                # Server error, retry
                if attempt < max_retries - 1:
                    print(f"Server error, retrying... (attempt {attempt + 1})")
                    continue
                else:
                    raise AirflowException(f"Server error after {max_retries} attempts")
            else:
                # Client error, don't retry
                raise AirflowException(f"Client error: {response.status_code}")
                
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"Request failed, retrying... (attempt {attempt + 1}): {e}")
                continue
            else:
                raise AirflowException(f"Request failed after {max_retries} attempts: {e}")

# Using hooks in operators
api_call_task = PythonOperator(
    task_id='call_external_api',
    python_callable=call_api,
    provide_context=True,
    dag=dag
)

data_upload_task = PythonOperator(
    task_id='upload_report',
    python_callable=upload_file,
    provide_context=True,
    dag=dag
)

Connection Management

Hooks integrate with Airflow's connection management system:

# Connection via Airflow UI or environment variables
# Environment variable format: AIRFLOW_CONN_{CONN_ID}
# Example: AIRFLOW_CONN_MY_DB=postgresql://user:pass@host:5432/dbname

# Using connections in custom hooks
class CustomApiHook(HttpHook):
    def __init__(self, api_conn_id='custom_api_default'):
        super().__init__(http_conn_id=api_conn_id)
        self.api_conn_id = api_conn_id
    
    def get_auth_headers(self):
        conn = self.get_connection(self.api_conn_id)
        return {
            'Authorization': f'Bearer {conn.password}',
            'X-API-Key': conn.extra_dejson.get('api_key')
        }
    
    def call_api(self, endpoint, **kwargs):
        headers = kwargs.get('headers', {})
        headers.update(self.get_auth_headers())
        kwargs['headers'] = headers
        
        return self.run(endpoint, **kwargs)

Error Handling Best Practices

from airflow.utils import AirflowException

def safe_database_operation(**context):
    hook = None
    try:
        hook = PostgresHook('postgres_conn')
        
        # Perform database operations
        result = hook.get_records("SELECT COUNT(*) FROM important_table")
        
        if not result or result[0][0] == 0:
            raise AirflowException("No data found in important_table")
            
        return result[0][0]
        
    except Exception as e:
        # Log the error and re-raise as AirflowException
        print(f"Database operation failed: {e}")
        raise AirflowException(f"Database operation failed: {e}")
    
    finally:
        # Cleanup if needed
        if hook:
            # Close connections, cleanup resources
            pass

Install with Tessl CLI

npx tessl i tessl/pypi-airflow

docs

core.md

hooks.md

index.md

operators.md

tile.json