Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-common-sql@1.27.0A comprehensive provider package offering common SQL functionality for Apache Airflow, providing hooks, operators, sensors, and triggers for SQL database operations. This package serves as a foundational component for database-related workflows in Airflow, offering reusable SQL utilities that can be extended by specific database provider packages.
pip install apache-airflow-providers-common-sqlfrom airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.common.sql.sensors.sql import SqlSensor
from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTriggerfrom airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
# Create a simple SQL execution task
with DAG(
'sql_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
) as dag:
sql_task = SQLExecuteQueryOperator(
task_id='run_sql_query',
conn_id='my_database_conn',
sql='SELECT COUNT(*) FROM users WHERE active = true;',
autocommit=True
)The Common SQL Provider follows Airflow's standard provider architecture:
This design enables database-agnostic workflows while supporting specialized database providers that extend these base components.
Core hook functionality for establishing database connections, executing queries, and managing database operations. The DbApiHook serves as the foundation for all database interactions.
class DbApiHook:
def get_conn(self): ...
def get_df(self, sql, parameters=None, **kwargs): ...
def get_records(self, sql, parameters=None): ...
def run(self, sql, autocommit=False, parameters=None, handler=None): ...
def insert_rows(self, table, rows, target_fields=None, commit_every=1000): ...
def bulk_dump(self, table, tmp_file): ...
def bulk_load(self, table, tmp_file): ...
def test_connection(self): ...Task operators for executing SQL queries, performing data validation, and transferring data between databases. Includes specialized operators for data quality checks and conditional workflows.
class SQLExecuteQueryOperator:
def __init__(self, sql, conn_id, autocommit=False, parameters=None, **kwargs): ...
class SQLCheckOperator:
def __init__(self, sql, conn_id, **kwargs): ...
class GenericTransfer:
def __init__(self, sql, destination_table, source_conn_id, destination_conn_id, **kwargs): ...Monitoring sensors that periodically check database conditions and states. Enables data-driven workflow orchestration based on SQL query results.
class SqlSensor:
def __init__(self, conn_id, sql, parameters=None, success=None, failure=None, **kwargs): ...Asynchronous triggers for executing SQL operations without blocking the Airflow scheduler. Enables efficient handling of long-running database operations.
class SQLExecuteQueryTrigger:
def __init__(self, sql, conn_id, hook_params=None, **kwargs): ...
def serialize(self): ...
def get_hook(self): ...Database-specific SQL formatting and operations. Provides abstraction layer for handling differences between SQL databases including query formatting and data type handling.
class Dialect:
def escape_word(self, word): ...
def generate_insert_sql(self, table, values, target_fields, replace=False): ...
def get_column_names(self, table): ...
def get_primary_keys(self, table): ...from typing import Any, Dict, List, Optional, Union, Callable, Protocol, Mapping, Iterable, Sequence
from operator import itemgetter
# Connection protocol for database connectors
class ConnectorProtocol(Protocol):
def connect(self, host: str, port: int, username: str, schema: str) -> Any: ...
# Common type aliases
SQL = Union[str, List[str]]
Parameters = Optional[Union[Mapping[str, Any], Iterable]]
Handler = Optional[Callable[[Any], Any]]
ResultProcessor = Callable[[Any], Any]
SuccessCriteria = Optional[Callable[[Any], bool]]
FailureCriteria = Optional[Callable[[Any], bool]]
Selector = Callable[[tuple], Any]
# Version compatibility flags
AIRFLOW_V_3_0_PLUS: bool
AIRFLOW_V_3_1_PLUS: bool
# SQL placeholders
SQL_PLACEHOLDERS: frozenset[str] # {"%s", "?"}