CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-sql

Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sensors.mddocs/

SQL Sensors

SQL sensors monitor database conditions and states by periodically executing SQL queries until specified criteria are met. They enable data-driven workflow orchestration based on database changes.

Capabilities

SQL Sensor

Monitors database state by repeatedly executing SQL queries until success criteria are met.

class SqlSensor:
    """
    Runs SQL statement repeatedly until criteria is met.
    
    Args:
        conn_id (str): Database connection ID
        sql (str): SQL statement to execute (templated)
        parameters (Mapping, optional): Query parameters (templated)
        success (callable, optional): Success criteria function
        failure (callable, optional): Failure criteria function
        selector (callable): Function to transform result row (default: itemgetter(0))
        fail_on_empty (bool): Fail if query returns no rows (default: False)
        hook_params (Mapping, optional): Additional hook parameters
        **kwargs: Additional sensor arguments (poke_interval, timeout, etc.)
    """
    
    def __init__(self, *, conn_id, sql, parameters=None, success=None, failure=None,
                 selector=None, fail_on_empty=False, hook_params=None, **kwargs):
        pass
    
    def poke(self, context):
        """
        Check if sensor condition is met.
        
        Args:
            context (dict): Airflow task context
            
        Returns:
            bool: True if condition is met, False otherwise
        """
        pass

Usage Examples

Basic File Monitoring

from airflow.providers.common.sql.sensors.sql import SqlSensor

# Wait for new records to appear
file_sensor = SqlSensor(
    task_id='wait_for_new_data',
    conn_id='my_database',
    sql='SELECT COUNT(*) FROM uploads WHERE date = {{ ds }} AND status = "completed"',
    success=lambda x: x[0][0] > 0,  # Success when count > 0
    poke_interval=60,  # Check every minute
    timeout=3600  # Timeout after 1 hour
)

Data Quality Monitoring

# Wait for data quality checks to pass
quality_sensor = SqlSensor(
    task_id='wait_for_quality',
    conn_id='my_database',
    sql='''
    SELECT 
        SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as null_emails,
        SUM(CASE WHEN age < 0 OR age > 150 THEN 1 ELSE 0 END) as invalid_ages
    FROM users 
    WHERE created_date = {{ ds }}
    ''',
    success=lambda x: x[0][0] == 0 and x[0][1] == 0,  # No null emails or invalid ages
    poke_interval=300,  # Check every 5 minutes
    timeout=7200  # Timeout after 2 hours
)

Complex Condition Monitoring

# Wait for processing to complete with custom success criteria
def check_processing_complete(records):
    \"\"\"Check if all processing stages are complete.\"\"\"
    if not records:
        return False
    
    row = records[0]
    pending_count = row[0]
    error_count = row[1]
    
    # Success when no pending items and no errors
    return pending_count == 0 and error_count == 0

processing_sensor = SqlSensor(
    task_id='wait_for_processing',
    conn_id='my_database',
    sql='''
    SELECT 
        COUNT(*) FILTER (WHERE status = 'pending') as pending,
        COUNT(*) FILTER (WHERE status = 'error') as errors
    FROM job_queue 
    WHERE batch_id = {{ params.batch_id }}
    ''',
    parameters={'batch_id': '{{ dag_run.conf.batch_id }}'},
    success=check_processing_complete,
    poke_interval=120,  # Check every 2 minutes
    timeout=10800  # Timeout after 3 hours
)

Monitoring with Failure Conditions

# Monitor with both success and failure criteria
def check_success(records):
    return records and records[0][0] >= 1000  # At least 1000 processed

def check_failure(records):
    return records and records[0][1] > 10  # More than 10 errors

monitor_sensor = SqlSensor(
    task_id='monitor_batch_job',
    conn_id='my_database',
    sql='''
    SELECT 
        SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
        SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
    FROM batch_processing 
    WHERE job_id = {{ params.job_id }}
    ''',
    success=check_success,
    failure=check_failure,  # Fail task if too many errors
    poke_interval=180,
    timeout=14400
)

Parameterized Monitoring

# Use templated parameters for dynamic monitoring
dynamic_sensor = SqlSensor(
    task_id='wait_for_threshold',
    conn_id='my_database',
    sql='''
    SELECT COUNT(*) 
    FROM events 
    WHERE event_type = %(event_type)s 
      AND timestamp >= %(start_time)s
      AND timestamp <= %(end_time)s
    ''',
    parameters={
        'event_type': '{{ dag_run.conf.event_type }}',
        'start_time': '{{ ds }} 00:00:00',
        'end_time': '{{ ds }} 23:59:59'
    },
    success=lambda x: x[0][0] >= 100,  # At least 100 events
    poke_interval=300
)

Database Connection Monitoring

# Monitor database connectivity and basic health
health_sensor = SqlSensor(
    task_id='check_db_health',
    conn_id='my_database',
    sql='SELECT 1',  # Simple connectivity check
    success=lambda x: x is not None and len(x) > 0,
    failure=lambda x: x is None,
    poke_interval=30,
    timeout=300
)

Success and Failure Criteria Functions

Success and failure criteria are callable functions that receive the query results and return a boolean:

def success_criteria(records):
    \"\"\"
    Define success condition.
    
    Args:
        records (list): Query result records (list of tuples)
        
    Returns:
        bool: True if success condition is met
    \"\"\"
    return len(records) > 0 and records[0][0] > threshold

def failure_criteria(records):
    \"\"\"
    Define failure condition.
    
    Args:
        records (list): Query result records (list of tuples)
        
    Returns:
        bool: True if failure condition is met (task should fail)
    \"\"\"
    return records and records[0][1] > error_threshold

Common Patterns

Count-based Monitoring

# Wait for specific record count
success=lambda x: x and x[0][0] >= expected_count

Threshold Monitoring

# Wait for metric to exceed threshold
success=lambda x: x and x[0][0] > threshold_value

Status-based Monitoring

# Wait for specific status
success=lambda x: x and x[0][0] == 'COMPLETED'

Empty Result Handling

# Handle cases where query might return no results
success=lambda x: x is not None and len(x) > 0 and x[0][0] > 0

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-sql

docs

dialects.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json