CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-sql

Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

operators.mddocs/

SQL Operators

SQL operators execute database operations as tasks within Airflow DAGs. This includes query execution, data validation, conditional workflows, and data transfers between databases.

Capabilities

Base SQL Operator

Foundation class providing database hook functionality for SQL operators.

class BaseSQLOperator:
    """
    Base class providing DB hook functionality for SQL operators.
    """
    
    def get_hook(self):
        """
        Get hook for connection.
        
        Returns:
            Database hook instance
        """
    
    def get_db_hook(self):
        """
        Get database hook.
        
        Returns:
            Database hook instance
        """

SQL Query Execution

Execute SQL queries and commands within Airflow tasks.

class SQLExecuteQueryOperator:
    """
    Executes SQL code in a database.
    
    Args:
        sql (str or list): SQL query/queries to execute (templated)
        autocommit (bool): Auto-commit mode for queries (default: False)
        parameters (Mapping or Iterable, optional): Query parameters (templated)
        handler (callable, optional): Result handler function (default: fetch_all_handler)
        output_processor (callable, optional): Function to process results
        conn_id (str, optional): Database connection ID
        database (str, optional): Database name to override connection default
        split_statements (bool, optional): Split multiple statements
        return_last (bool): Return only last query result (default: True)
        show_return_value_in_logs (bool): Log returned values (default: False)
        requires_result_fetch (bool): Ensure results are fetched (default: False)
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, *, sql, autocommit=False, parameters=None, handler=None, 
                 output_processor=None, conn_id=None, database=None, split_statements=None,
                 return_last=True, show_return_value_in_logs=False, requires_result_fetch=False, **kwargs):
        pass

Data Quality Check Operators

Operators for validating data quality and consistency.

class SQLCheckOperator:
    """
    Performs checks using SQL statements.
    
    Args:
        sql (str): SQL query that should return a single row (templated)
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, sql, conn_id, **kwargs):
        pass

class SQLValueCheckOperator:
    """
    Checks that SQL query returns expected value.
    
    Args:
        sql (str): SQL query to execute (templated)
        pass_value: Expected value for comparison
        tolerance (float, optional): Tolerance for numeric comparisons
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, sql, pass_value, tolerance=None, conn_id=None, **kwargs):
        pass

class SQLColumnCheckOperator:
    """
    Performs data quality checks on table columns.
    
    Args:
        table (str): Table name to check
        column_mapping (dict): Column checks mapping
        partition_clause (str, optional): SQL partition clause
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, table, column_mapping, partition_clause=None, conn_id=None, **kwargs):
        pass

class SQLTableCheckOperator:
    """
    Performs data quality checks on tables.
    
    Args:
        table (str): Table name to check
        checks (dict): Table-level checks mapping
        partition_clause (str, optional): SQL partition clause
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, table, checks, partition_clause=None, conn_id=None, **kwargs):
        pass

class SQLIntervalCheckOperator:
    """
    Checks data over time intervals.
    
    Args:
        table (str): Table name to check
        metrics_thresholds (dict): Metrics and threshold definitions
        date_filter_column (str): Column for date filtering
        days_back (int): Number of days to look back
        ratio_formula (str): Formula for ratio calculations
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, table, metrics_thresholds, date_filter_column='ds', 
                 days_back=-7, ratio_formula='max_over_min', conn_id=None, **kwargs):
        pass

class SQLThresholdCheckOperator:
    """
    Checks if metrics are within thresholds.
    
    Args:
        sql (str): SQL query returning metrics (templated)
        min_threshold (float): Minimum threshold value
        max_threshold (float): Maximum threshold value
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, sql, min_threshold, max_threshold, conn_id=None, **kwargs):
        pass

Conditional Workflow Operators

Operators for implementing conditional logic based on SQL query results.

class BranchSQLOperator:
    """
    Branches workflow based on SQL query results.
    
    Args:
        sql (str): SQL query to execute (templated)
        follow_task_ids_if_true (list): Task IDs to follow if condition is true
        follow_task_ids_if_false (list): Task IDs to follow if condition is false
        conn_id (str): Database connection ID
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, sql, follow_task_ids_if_true, follow_task_ids_if_false, 
                 conn_id=None, **kwargs):
        pass

Data Transfer Operators

Operators for transferring data between different database connections.

class GenericTransfer:
    """
    Transfers data between different database connections.
    
    Args:
        sql (str): SQL query for source data (templated)
        destination_table (str): Target table name (templated)
        source_conn_id (str): Source connection ID (templated)
        destination_conn_id (str): Destination connection ID (templated)
        source_hook_params (dict, optional): Source hook parameters
        destination_hook_params (dict, optional): Destination hook parameters
        preoperator (str or list, optional): SQL to execute before transfer (templated)
        insert_args (dict, optional): Additional arguments for insert operation (templated)
        page_size (int, optional): Number of records for paginated mode
        **kwargs: Additional operator arguments
    """
    
    def __init__(self, *, sql, destination_table, source_conn_id, destination_conn_id,
                 source_hook_params=None, destination_hook_params=None, preoperator=None, 
                 insert_args=None, page_size=None, **kwargs):
        pass

Utility Functions

def default_output_processor(results):
    """
    Default output processor for query results.
    
    Args:
        results: Raw query results
        
    Returns:
        Any: Processed results
    """

Usage Examples

Basic SQL Execution

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

sql_task = SQLExecuteQueryOperator(
    task_id='execute_sql',
    conn_id='my_postgres_conn',
    sql='INSERT INTO logs (message, timestamp) VALUES (%(msg)s, %(ts)s)',
    parameters={'msg': 'Task completed', 'ts': '2023-01-01 12:00:00'},
    autocommit=True
)

Data Quality Checks

from airflow.providers.common.sql.operators.sql import (
    SQLCheckOperator, 
    SQLValueCheckOperator,
    SQLColumnCheckOperator
)

# Check that query returns non-empty result
check_data = SQLCheckOperator(
    task_id='check_data',
    conn_id='my_database',
    sql='SELECT COUNT(*) FROM users WHERE created_date = {{ ds }}'
)

# Check specific value
value_check = SQLValueCheckOperator(
    task_id='check_total',
    conn_id='my_database',
    sql='SELECT SUM(amount) FROM orders WHERE date = {{ ds }}',
    pass_value=10000,
    tolerance=0.1  # 10% tolerance
)

# Column quality checks
column_check = SQLColumnCheckOperator(
    task_id='check_columns',
    conn_id='my_database',
    table='users',
    column_mapping={
        'id': {'null_check': {'equal_to': 0}},
        'email': {'unique_check': {'equal_to': 0}},
        'age': {'min': {'greater_than': 0}, 'max': {'less_than': 150}}
    }
)

Conditional Workflows

from airflow.providers.common.sql.operators.sql import BranchSQLOperator

branch_task = BranchSQLOperator(
    task_id='check_condition',
    conn_id='my_database',
    sql='SELECT COUNT(*) FROM new_orders WHERE date = {{ ds }}',
    follow_task_ids_if_true=['process_orders'],
    follow_task_ids_if_false=['send_notification']
)

Data Transfer

from airflow.providers.common.sql.operators.generic_transfer import GenericTransfer

transfer_task = GenericTransfer(
    task_id='transfer_data',
    sql='SELECT * FROM source_table WHERE date = {{ ds }}',
    destination_table='target_table',
    source_conn_id='source_db',
    destination_conn_id='target_db',
    preoperator='TRUNCATE TABLE target_table'
)

Multiple SQL Statements

sql_multi = SQLExecuteQueryOperator(
    task_id='multi_sql',
    conn_id='my_database',
    sql=[
        'UPDATE inventory SET last_updated = NOW()',
        'INSERT INTO audit_log (action, timestamp) VALUES ("inventory_update", NOW())',
        'CALL update_stats_procedure()'
    ],
    split_statements=True,
    autocommit=True
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-sql

docs

dialects.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json