Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
SQL triggers enable asynchronous execution of SQL operations without blocking the Airflow scheduler. They provide efficient handling of long-running database operations through Airflow's triggerer component.
Executes SQL statements asynchronously using Airflow's trigger mechanism.
class SQLExecuteQueryTrigger:
"""
Executes SQL code asynchronously.
Args:
sql (str or list): SQL statement(s) to execute
conn_id (str): Database connection ID
hook_params (dict, optional): Additional hook parameters
**kwargs: Additional trigger arguments
"""
def __init__(self, sql, conn_id, hook_params=None, **kwargs):
pass
def serialize(self):
"""
Serialize trigger configuration for storage.
Returns:
tuple: (class_path, kwargs) for trigger reconstruction
"""
pass
def get_hook(self):
"""
Get database hook for connection.
Returns:
Database hook instance
"""
pass
async def run(self):
"""
Execute the SQL asynchronously.
Yields:
TriggerEvent: Results or status updates
"""
passfrom airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger
from airflow.sensors.base import BaseSensorOperator
class AsyncSQLOperator(BaseSensorOperator):
def __init__(self, sql, conn_id, **kwargs):
super().__init__(**kwargs)
self.sql = sql
self.conn_id = conn_id
def execute(self, context):
# Defer to trigger for async execution
self.defer(
trigger=SQLExecuteQueryTrigger(
sql=self.sql,
conn_id=self.conn_id
),
method_name='execute_complete'
)
def execute_complete(self, context, event):
# Handle trigger completion
if event['status'] == 'success':
self.log.info(f"SQL executed successfully: {event['results']}")
return event['results']
else:
raise Exception(f"SQL execution failed: {event['error']}")
# Use the async operator
async_sql = AsyncSQLOperator(
task_id='async_sql_execution',
sql='SELECT * FROM large_table WHERE date = {{ ds }}',
conn_id='my_database'
)from airflow.triggers.base import TriggerEvent
from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger
class LongRunningQueryTrigger(SQLExecuteQueryTrigger):
"""Custom trigger with progress reporting."""
async def run(self):
hook = self.get_hook()
try:
# Start the query
self.log.info("Starting long-running query...")
yield TriggerEvent({'status': 'started', 'message': 'Query started'})
# Execute query (this could take a long time)
results = hook.run(self.sql)
# Report completion
yield TriggerEvent({
'status': 'success',
'results': results,
'message': f'Query completed with {len(results) if results else 0} results'
})
except Exception as e:
yield TriggerEvent({
'status': 'error',
'error': str(e),
'message': f'Query failed: {str(e)}'
})
# Use custom trigger
custom_trigger_task = AsyncSQLOperator(
task_id='long_query',
sql='SELECT * FROM very_large_table ORDER BY created_date',
conn_id='my_database'
)class BatchSQLTrigger(SQLExecuteQueryTrigger):
"""Trigger for batch SQL processing."""
def __init__(self, sql_statements, conn_id, batch_size=100, **kwargs):
super().__init__(sql_statements, conn_id, **kwargs)
self.batch_size = batch_size
async def run(self):
hook = self.get_hook()
try:
total_statements = len(self.sql)
processed = 0
# Process in batches
for i in range(0, total_statements, self.batch_size):
batch = self.sql[i:i + self.batch_size]
# Execute batch
for stmt in batch:
hook.run(stmt, autocommit=True)
processed += 1
# Report progress
yield TriggerEvent({
'status': 'progress',
'processed': processed,
'total': total_statements,
'message': f'Processed {processed}/{total_statements} statements'
})
# Report completion
yield TriggerEvent({
'status': 'success',
'processed': processed,
'message': f'All {processed} statements processed successfully'
})
except Exception as e:
yield TriggerEvent({
'status': 'error',
'processed': processed,
'error': str(e),
'message': f'Batch processing failed at statement {processed}: {str(e)}'
})
# Use batch trigger
batch_statements = [
"INSERT INTO logs (message) VALUES ('Batch item 1')",
"INSERT INTO logs (message) VALUES ('Batch item 2')",
# ... many more statements
]
batch_task = AsyncSQLOperator(
task_id='batch_processing',
sql=batch_statements,
conn_id='my_database'
)class ConditionalSQLTrigger(SQLExecuteQueryTrigger):
"""Conditionally execute SQL based on query results."""
def __init__(self, check_sql, execute_sql, conn_id, condition_func=None, **kwargs):
super().__init__(execute_sql, conn_id, **kwargs)
self.check_sql = check_sql
self.condition_func = condition_func or (lambda x: bool(x))
async def run(self):
hook = self.get_hook()
try:
# First, check condition
check_results = hook.get_records(self.check_sql)
if self.condition_func(check_results):
# Condition met, execute main SQL
yield TriggerEvent({
'status': 'executing',
'message': 'Condition met, executing SQL'
})
results = hook.run(self.sql)
yield TriggerEvent({
'status': 'success',
'results': results,
'message': 'SQL executed successfully'
})
else:
# Condition not met, skip execution
yield TriggerEvent({
'status': 'skipped',
'message': 'Condition not met, skipping SQL execution'
})
except Exception as e:
yield TriggerEvent({
'status': 'error',
'error': str(e),
'message': f'Conditional execution failed: {str(e)}'
})
# Use conditional trigger
conditional_task = AsyncSQLOperator(
task_id='conditional_sql',
sql='UPDATE inventory SET processed = true WHERE date = {{ ds }}',
conn_id='my_database'
)from airflow.providers.common.sql.sensors.sql import SqlSensor
class AsyncSQLSensor(SqlSensor):
"""SQL sensor using async triggers."""
def execute(self, context):
# Use trigger for async monitoring
self.defer(
trigger=SQLExecuteQueryTrigger(
sql=self.sql,
conn_id=self.conn_id,
hook_params=self.hook_params
),
method_name='execute_complete'
)
def execute_complete(self, context, event):
if event['status'] == 'success':
results = event['results']
# Apply success criteria
if self.success and callable(self.success):
if self.success(results):
self.log.info("Sensor condition met")
return True
else:
# Condition not met, defer again
self.defer(
trigger=SQLExecuteQueryTrigger(
sql=self.sql,
conn_id=self.conn_id,
hook_params=self.hook_params
),
method_name='execute_complete'
)
return results
else:
raise Exception(f"Sensor failed: {event['error']}")
# Use async sensor
async_sensor = AsyncSQLSensor(
task_id='async_wait_for_data',
conn_id='my_database',
sql='SELECT COUNT(*) FROM processing_queue WHERE status = "pending"',
success=lambda x: x[0][0] == 0, # Wait for queue to be empty
poke_interval=60
)# Customize hook behavior through hook_params
trigger = SQLExecuteQueryTrigger(
sql='SELECT * FROM data',
conn_id='my_conn',
hook_params={
'schema': 'custom_schema',
'autocommit': True,
'isolation_level': 'READ_COMMITTED'
}
)Triggers must be serializable to be stored and reconstructed by the triggerer:
# The serialize method returns class path and arguments
class_path, kwargs = trigger.serialize()
# This allows the triggerer to reconstruct the trigger:
# trigger_class = import_from_path(class_path)
# reconstructed_trigger = trigger_class(**kwargs)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-sql