Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
SQL sensors monitor database conditions and states by periodically executing SQL queries until specified criteria are met. They enable data-driven workflow orchestration based on database changes.
Monitors database state by repeatedly executing SQL queries until success criteria are met.
class SqlSensor:
"""
Runs SQL statement repeatedly until criteria is met.
Args:
conn_id (str): Database connection ID
sql (str): SQL statement to execute (templated)
parameters (Mapping, optional): Query parameters (templated)
success (callable, optional): Success criteria function
failure (callable, optional): Failure criteria function
selector (callable): Function to transform result row (default: itemgetter(0))
fail_on_empty (bool): Fail if query returns no rows (default: False)
hook_params (Mapping, optional): Additional hook parameters
**kwargs: Additional sensor arguments (poke_interval, timeout, etc.)
"""
def __init__(self, *, conn_id, sql, parameters=None, success=None, failure=None,
selector=None, fail_on_empty=False, hook_params=None, **kwargs):
pass
def poke(self, context):
"""
Check if sensor condition is met.
Args:
context (dict): Airflow task context
Returns:
bool: True if condition is met, False otherwise
"""
passfrom airflow.providers.common.sql.sensors.sql import SqlSensor
# Wait for new records to appear
file_sensor = SqlSensor(
task_id='wait_for_new_data',
conn_id='my_database',
sql='SELECT COUNT(*) FROM uploads WHERE date = {{ ds }} AND status = "completed"',
success=lambda x: x[0][0] > 0, # Success when count > 0
poke_interval=60, # Check every minute
timeout=3600 # Timeout after 1 hour
)# Wait for data quality checks to pass
quality_sensor = SqlSensor(
task_id='wait_for_quality',
conn_id='my_database',
sql='''
SELECT
SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as null_emails,
SUM(CASE WHEN age < 0 OR age > 150 THEN 1 ELSE 0 END) as invalid_ages
FROM users
WHERE created_date = {{ ds }}
''',
success=lambda x: x[0][0] == 0 and x[0][1] == 0, # No null emails or invalid ages
poke_interval=300, # Check every 5 minutes
timeout=7200 # Timeout after 2 hours
)# Wait for processing to complete with custom success criteria
def check_processing_complete(records):
\"\"\"Check if all processing stages are complete.\"\"\"
if not records:
return False
row = records[0]
pending_count = row[0]
error_count = row[1]
# Success when no pending items and no errors
return pending_count == 0 and error_count == 0
processing_sensor = SqlSensor(
task_id='wait_for_processing',
conn_id='my_database',
sql='''
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'error') as errors
FROM job_queue
WHERE batch_id = {{ params.batch_id }}
''',
parameters={'batch_id': '{{ dag_run.conf.batch_id }}'},
success=check_processing_complete,
poke_interval=120, # Check every 2 minutes
timeout=10800 # Timeout after 3 hours
)# Monitor with both success and failure criteria
def check_success(records):
return records and records[0][0] >= 1000 # At least 1000 processed
def check_failure(records):
return records and records[0][1] > 10 # More than 10 errors
monitor_sensor = SqlSensor(
task_id='monitor_batch_job',
conn_id='my_database',
sql='''
SELECT
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM batch_processing
WHERE job_id = {{ params.job_id }}
''',
success=check_success,
failure=check_failure, # Fail task if too many errors
poke_interval=180,
timeout=14400
)# Use templated parameters for dynamic monitoring
dynamic_sensor = SqlSensor(
task_id='wait_for_threshold',
conn_id='my_database',
sql='''
SELECT COUNT(*)
FROM events
WHERE event_type = %(event_type)s
AND timestamp >= %(start_time)s
AND timestamp <= %(end_time)s
''',
parameters={
'event_type': '{{ dag_run.conf.event_type }}',
'start_time': '{{ ds }} 00:00:00',
'end_time': '{{ ds }} 23:59:59'
},
success=lambda x: x[0][0] >= 100, # At least 100 events
poke_interval=300
)# Monitor database connectivity and basic health
health_sensor = SqlSensor(
task_id='check_db_health',
conn_id='my_database',
sql='SELECT 1', # Simple connectivity check
success=lambda x: x is not None and len(x) > 0,
failure=lambda x: x is None,
poke_interval=30,
timeout=300
)Success and failure criteria are callable functions that receive the query results and return a boolean:
def success_criteria(records):
\"\"\"
Define success condition.
Args:
records (list): Query result records (list of tuples)
Returns:
bool: True if success condition is met
\"\"\"
return len(records) > 0 and records[0][0] > threshold
def failure_criteria(records):
\"\"\"
Define failure condition.
Args:
records (list): Query result records (list of tuples)
Returns:
bool: True if failure condition is met (task should fail)
\"\"\"
return records and records[0][1] > error_threshold# Wait for specific record count
success=lambda x: x and x[0][0] >= expected_count# Wait for metric to exceed threshold
success=lambda x: x and x[0][0] > threshold_value# Wait for specific status
success=lambda x: x and x[0][0] == 'COMPLETED'# Handle cases where query might return no results
success=lambda x: x is not None and len(x) > 0 and x[0][0] > 0Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-sql