Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
SQL operators execute database operations as tasks within Airflow DAGs. This includes query execution, data validation, conditional workflows, and data transfers between databases.
Foundation class providing database hook functionality for SQL operators.
class BaseSQLOperator:
"""
Base class providing DB hook functionality for SQL operators.
"""
def get_hook(self):
"""
Get hook for connection.
Returns:
Database hook instance
"""
def get_db_hook(self):
"""
Get database hook.
Returns:
Database hook instance
"""Execute SQL queries and commands within Airflow tasks.
class SQLExecuteQueryOperator:
"""
Executes SQL code in a database.
Args:
sql (str or list): SQL query/queries to execute (templated)
autocommit (bool): Auto-commit mode for queries (default: False)
parameters (Mapping or Iterable, optional): Query parameters (templated)
handler (callable, optional): Result handler function (default: fetch_all_handler)
output_processor (callable, optional): Function to process results
conn_id (str, optional): Database connection ID
database (str, optional): Database name to override connection default
split_statements (bool, optional): Split multiple statements
return_last (bool): Return only last query result (default: True)
show_return_value_in_logs (bool): Log returned values (default: False)
requires_result_fetch (bool): Ensure results are fetched (default: False)
**kwargs: Additional operator arguments
"""
def __init__(self, *, sql, autocommit=False, parameters=None, handler=None,
output_processor=None, conn_id=None, database=None, split_statements=None,
return_last=True, show_return_value_in_logs=False, requires_result_fetch=False, **kwargs):
passOperators for validating data quality and consistency.
class SQLCheckOperator:
"""
Performs checks using SQL statements.
Args:
sql (str): SQL query that should return a single row (templated)
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, sql, conn_id, **kwargs):
pass
class SQLValueCheckOperator:
"""
Checks that SQL query returns expected value.
Args:
sql (str): SQL query to execute (templated)
pass_value: Expected value for comparison
tolerance (float, optional): Tolerance for numeric comparisons
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, sql, pass_value, tolerance=None, conn_id=None, **kwargs):
pass
class SQLColumnCheckOperator:
"""
Performs data quality checks on table columns.
Args:
table (str): Table name to check
column_mapping (dict): Column checks mapping
partition_clause (str, optional): SQL partition clause
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, table, column_mapping, partition_clause=None, conn_id=None, **kwargs):
pass
class SQLTableCheckOperator:
"""
Performs data quality checks on tables.
Args:
table (str): Table name to check
checks (dict): Table-level checks mapping
partition_clause (str, optional): SQL partition clause
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, table, checks, partition_clause=None, conn_id=None, **kwargs):
pass
class SQLIntervalCheckOperator:
"""
Checks data over time intervals.
Args:
table (str): Table name to check
metrics_thresholds (dict): Metrics and threshold definitions
date_filter_column (str): Column for date filtering
days_back (int): Number of days to look back
ratio_formula (str): Formula for ratio calculations
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, table, metrics_thresholds, date_filter_column='ds',
days_back=-7, ratio_formula='max_over_min', conn_id=None, **kwargs):
pass
class SQLThresholdCheckOperator:
"""
Checks if metrics are within thresholds.
Args:
sql (str): SQL query returning metrics (templated)
min_threshold (float): Minimum threshold value
max_threshold (float): Maximum threshold value
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, sql, min_threshold, max_threshold, conn_id=None, **kwargs):
passOperators for implementing conditional logic based on SQL query results.
class BranchSQLOperator:
"""
Branches workflow based on SQL query results.
Args:
sql (str): SQL query to execute (templated)
follow_task_ids_if_true (list): Task IDs to follow if condition is true
follow_task_ids_if_false (list): Task IDs to follow if condition is false
conn_id (str): Database connection ID
**kwargs: Additional operator arguments
"""
def __init__(self, sql, follow_task_ids_if_true, follow_task_ids_if_false,
conn_id=None, **kwargs):
passOperators for transferring data between different database connections.
class GenericTransfer:
"""
Transfers data between different database connections.
Args:
sql (str): SQL query for source data (templated)
destination_table (str): Target table name (templated)
source_conn_id (str): Source connection ID (templated)
destination_conn_id (str): Destination connection ID (templated)
source_hook_params (dict, optional): Source hook parameters
destination_hook_params (dict, optional): Destination hook parameters
preoperator (str or list, optional): SQL to execute before transfer (templated)
insert_args (dict, optional): Additional arguments for insert operation (templated)
page_size (int, optional): Number of records for paginated mode
**kwargs: Additional operator arguments
"""
def __init__(self, *, sql, destination_table, source_conn_id, destination_conn_id,
source_hook_params=None, destination_hook_params=None, preoperator=None,
insert_args=None, page_size=None, **kwargs):
passdef default_output_processor(results):
"""
Default output processor for query results.
Args:
results: Raw query results
Returns:
Any: Processed results
"""from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
sql_task = SQLExecuteQueryOperator(
task_id='execute_sql',
conn_id='my_postgres_conn',
sql='INSERT INTO logs (message, timestamp) VALUES (%(msg)s, %(ts)s)',
parameters={'msg': 'Task completed', 'ts': '2023-01-01 12:00:00'},
autocommit=True
)from airflow.providers.common.sql.operators.sql import (
SQLCheckOperator,
SQLValueCheckOperator,
SQLColumnCheckOperator
)
# Check that query returns non-empty result
check_data = SQLCheckOperator(
task_id='check_data',
conn_id='my_database',
sql='SELECT COUNT(*) FROM users WHERE created_date = {{ ds }}'
)
# Check specific value
value_check = SQLValueCheckOperator(
task_id='check_total',
conn_id='my_database',
sql='SELECT SUM(amount) FROM orders WHERE date = {{ ds }}',
pass_value=10000,
tolerance=0.1 # 10% tolerance
)
# Column quality checks
column_check = SQLColumnCheckOperator(
task_id='check_columns',
conn_id='my_database',
table='users',
column_mapping={
'id': {'null_check': {'equal_to': 0}},
'email': {'unique_check': {'equal_to': 0}},
'age': {'min': {'greater_than': 0}, 'max': {'less_than': 150}}
}
)from airflow.providers.common.sql.operators.sql import BranchSQLOperator
branch_task = BranchSQLOperator(
task_id='check_condition',
conn_id='my_database',
sql='SELECT COUNT(*) FROM new_orders WHERE date = {{ ds }}',
follow_task_ids_if_true=['process_orders'],
follow_task_ids_if_false=['send_notification']
)from airflow.providers.common.sql.operators.generic_transfer import GenericTransfer
transfer_task = GenericTransfer(
task_id='transfer_data',
sql='SELECT * FROM source_table WHERE date = {{ ds }}',
destination_table='target_table',
source_conn_id='source_db',
destination_conn_id='target_db',
preoperator='TRUNCATE TABLE target_table'
)sql_multi = SQLExecuteQueryOperator(
task_id='multi_sql',
conn_id='my_database',
sql=[
'UPDATE inventory SET last_updated = NOW()',
'INSERT INTO audit_log (action, timestamp) VALUES ("inventory_update", NOW())',
'CALL update_stats_procedure()'
],
split_statements=True,
autocommit=True
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-sql