CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-sql

Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

triggers.mddocs/

SQL Triggers

SQL triggers enable asynchronous execution of SQL operations without blocking the Airflow scheduler. They provide efficient handling of long-running database operations through Airflow's triggerer component.

Capabilities

SQL Execute Query Trigger

Executes SQL statements asynchronously using Airflow's trigger mechanism.

class SQLExecuteQueryTrigger:
    """
    Executes SQL code asynchronously.
    
    Args:
        sql (str or list): SQL statement(s) to execute
        conn_id (str): Database connection ID
        hook_params (dict, optional): Additional hook parameters
        **kwargs: Additional trigger arguments
    """
    
    def __init__(self, sql, conn_id, hook_params=None, **kwargs):
        pass
    
    def serialize(self):
        """
        Serialize trigger configuration for storage.
        
        Returns:
            tuple: (class_path, kwargs) for trigger reconstruction
        """
        pass
    
    def get_hook(self):
        """
        Get database hook for connection.
        
        Returns:
            Database hook instance
        """
        pass
    
    async def run(self):
        """
        Execute the SQL asynchronously.
        
        Yields:
            TriggerEvent: Results or status updates
        """
        pass

Usage Examples

Basic Asynchronous SQL Execution

from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger
from airflow.sensors.base import BaseSensorOperator

class AsyncSQLOperator(BaseSensorOperator):
    def __init__(self, sql, conn_id, **kwargs):
        super().__init__(**kwargs)
        self.sql = sql
        self.conn_id = conn_id
    
    def execute(self, context):
        # Defer to trigger for async execution
        self.defer(
            trigger=SQLExecuteQueryTrigger(
                sql=self.sql,
                conn_id=self.conn_id
            ),
            method_name='execute_complete'
        )
    
    def execute_complete(self, context, event):
        # Handle trigger completion
        if event['status'] == 'success':
            self.log.info(f"SQL executed successfully: {event['results']}")
            return event['results']
        else:
            raise Exception(f"SQL execution failed: {event['error']}")

# Use the async operator
async_sql = AsyncSQLOperator(
    task_id='async_sql_execution',
    sql='SELECT * FROM large_table WHERE date = {{ ds }}',
    conn_id='my_database'
)

Long-Running Query with Progress Updates

from airflow.triggers.base import TriggerEvent
from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger

class LongRunningQueryTrigger(SQLExecuteQueryTrigger):
    """Custom trigger with progress reporting."""
    
    async def run(self):
        hook = self.get_hook()
        
        try:
            # Start the query
            self.log.info("Starting long-running query...")
            yield TriggerEvent({'status': 'started', 'message': 'Query started'})
            
            # Execute query (this could take a long time)
            results = hook.run(self.sql)
            
            # Report completion
            yield TriggerEvent({
                'status': 'success',
                'results': results,
                'message': f'Query completed with {len(results) if results else 0} results'
            })
            
        except Exception as e:
            yield TriggerEvent({
                'status': 'error',
                'error': str(e),
                'message': f'Query failed: {str(e)}'
            })

# Use custom trigger
custom_trigger_task = AsyncSQLOperator(
    task_id='long_query',
    sql='SELECT * FROM very_large_table ORDER BY created_date',
    conn_id='my_database'
)

Batch Processing with Triggers

class BatchSQLTrigger(SQLExecuteQueryTrigger):
    """Trigger for batch SQL processing."""
    
    def __init__(self, sql_statements, conn_id, batch_size=100, **kwargs):
        super().__init__(sql_statements, conn_id, **kwargs)
        self.batch_size = batch_size
    
    async def run(self):
        hook = self.get_hook()
        
        try:
            total_statements = len(self.sql)
            processed = 0
            
            # Process in batches
            for i in range(0, total_statements, self.batch_size):
                batch = self.sql[i:i + self.batch_size]
                
                # Execute batch
                for stmt in batch:
                    hook.run(stmt, autocommit=True)
                    processed += 1
                
                # Report progress
                yield TriggerEvent({
                    'status': 'progress',
                    'processed': processed,
                    'total': total_statements,
                    'message': f'Processed {processed}/{total_statements} statements'
                })
            
            # Report completion
            yield TriggerEvent({
                'status': 'success',
                'processed': processed,
                'message': f'All {processed} statements processed successfully'
            })
            
        except Exception as e:
            yield TriggerEvent({
                'status': 'error',
                'processed': processed,
                'error': str(e),
                'message': f'Batch processing failed at statement {processed}: {str(e)}'
            })

# Use batch trigger
batch_statements = [
    "INSERT INTO logs (message) VALUES ('Batch item 1')",
    "INSERT INTO logs (message) VALUES ('Batch item 2')",
    # ... many more statements
]

batch_task = AsyncSQLOperator(
    task_id='batch_processing',
    sql=batch_statements,
    conn_id='my_database'
)

Conditional Async Execution

class ConditionalSQLTrigger(SQLExecuteQueryTrigger):
    """Conditionally execute SQL based on query results."""
    
    def __init__(self, check_sql, execute_sql, conn_id, condition_func=None, **kwargs):
        super().__init__(execute_sql, conn_id, **kwargs)
        self.check_sql = check_sql
        self.condition_func = condition_func or (lambda x: bool(x))
    
    async def run(self):
        hook = self.get_hook()
        
        try:
            # First, check condition
            check_results = hook.get_records(self.check_sql)
            
            if self.condition_func(check_results):
                # Condition met, execute main SQL
                yield TriggerEvent({
                    'status': 'executing',
                    'message': 'Condition met, executing SQL'
                })
                
                results = hook.run(self.sql)
                
                yield TriggerEvent({
                    'status': 'success',
                    'results': results,
                    'message': 'SQL executed successfully'
                })
            else:
                # Condition not met, skip execution
                yield TriggerEvent({
                    'status': 'skipped',
                    'message': 'Condition not met, skipping SQL execution'
                })
                
        except Exception as e:
            yield TriggerEvent({
                'status': 'error',
                'error': str(e),
                'message': f'Conditional execution failed: {str(e)}'
            })

# Use conditional trigger
conditional_task = AsyncSQLOperator(
    task_id='conditional_sql',
    sql='UPDATE inventory SET processed = true WHERE date = {{ ds }}',
    conn_id='my_database'
)

Monitoring Long-Running Operations

from airflow.providers.common.sql.sensors.sql import SqlSensor

class AsyncSQLSensor(SqlSensor):
    """SQL sensor using async triggers."""
    
    def execute(self, context):
        # Use trigger for async monitoring
        self.defer(
            trigger=SQLExecuteQueryTrigger(
                sql=self.sql,
                conn_id=self.conn_id,
                hook_params=self.hook_params
            ),
            method_name='execute_complete'
        )
    
    def execute_complete(self, context, event):
        if event['status'] == 'success':
            results = event['results']
            
            # Apply success criteria
            if self.success and callable(self.success):
                if self.success(results):
                    self.log.info("Sensor condition met")
                    return True
                else:
                    # Condition not met, defer again
                    self.defer(
                        trigger=SQLExecuteQueryTrigger(
                            sql=self.sql,
                            conn_id=self.conn_id,
                            hook_params=self.hook_params
                        ),
                        method_name='execute_complete'
                    )
            
            return results
        else:
            raise Exception(f"Sensor failed: {event['error']}")

# Use async sensor
async_sensor = AsyncSQLSensor(
    task_id='async_wait_for_data',
    conn_id='my_database',
    sql='SELECT COUNT(*) FROM processing_queue WHERE status = "pending"',
    success=lambda x: x[0][0] == 0,  # Wait for queue to be empty
    poke_interval=60
)

Trigger Configuration

Hook Parameters

# Customize hook behavior through hook_params
trigger = SQLExecuteQueryTrigger(
    sql='SELECT * FROM data',
    conn_id='my_conn',
    hook_params={
        'schema': 'custom_schema',
        'autocommit': True,
        'isolation_level': 'READ_COMMITTED'
    }
)

Serialization

Triggers must be serializable to be stored and reconstructed by the triggerer:

# The serialize method returns class path and arguments
class_path, kwargs = trigger.serialize()

# This allows the triggerer to reconstruct the trigger:
# trigger_class = import_from_path(class_path)
# reconstructed_trigger = trigger_class(**kwargs)

Benefits of Using Triggers

  1. Non-blocking: Triggers don't block worker slots while waiting
  2. Scalable: The triggerer can handle many concurrent triggers
  3. Resource efficient: Reduces worker resource consumption
  4. Progress reporting: Can yield intermediate status updates
  5. Error handling: Structured error reporting and recovery

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-sql

docs

dialects.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json