or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dialects.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md
tile.json

tessl/pypi-apache-airflow-providers-common-sql

Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-common-sql@1.27.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-common-sql@1.27.0

index.mddocs/

Apache Airflow Common SQL Provider

A comprehensive provider package offering common SQL functionality for Apache Airflow, providing hooks, operators, sensors, and triggers for SQL database operations. This package serves as a foundational component for database-related workflows in Airflow, offering reusable SQL utilities that can be extended by specific database provider packages.

Package Information

  • Package Name: apache-airflow-providers-common-sql
  • Language: Python
  • Installation: pip install apache-airflow-providers-common-sql
  • Version: 1.27.5

Core Imports

from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.common.sql.sensors.sql import SqlSensor
from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger

Basic Usage

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

# Create a simple SQL execution task
with DAG(
    'sql_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
) as dag:
    
    sql_task = SQLExecuteQueryOperator(
        task_id='run_sql_query',
        conn_id='my_database_conn',
        sql='SELECT COUNT(*) FROM users WHERE active = true;',
        autocommit=True
    )

Architecture

The Common SQL Provider follows Airflow's standard provider architecture:

  • Hooks: Handle database connections and low-level operations (DbApiHook as base class)
  • Operators: Execute SQL tasks within DAGs (query execution, data validation, transfers)
  • Sensors: Monitor database states and conditions (SqlSensor for periodic checks)
  • Triggers: Enable asynchronous database operations (SQLExecuteQueryTrigger)
  • Dialects: Provide database-specific SQL formatting and operations

This design enables database-agnostic workflows while supporting specialized database providers that extend these base components.

Capabilities

Database Hooks

Core hook functionality for establishing database connections, executing queries, and managing database operations. The DbApiHook serves as the foundation for all database interactions.

class DbApiHook:
    def get_conn(self): ...
    def get_df(self, sql, parameters=None, **kwargs): ...
    def get_records(self, sql, parameters=None): ...
    def run(self, sql, autocommit=False, parameters=None, handler=None): ...
    def insert_rows(self, table, rows, target_fields=None, commit_every=1000): ...
    def bulk_dump(self, table, tmp_file): ...
    def bulk_load(self, table, tmp_file): ...
    def test_connection(self): ...

Database Hooks

SQL Operators

Task operators for executing SQL queries, performing data validation, and transferring data between databases. Includes specialized operators for data quality checks and conditional workflows.

class SQLExecuteQueryOperator:
    def __init__(self, sql, conn_id, autocommit=False, parameters=None, **kwargs): ...

class SQLCheckOperator:
    def __init__(self, sql, conn_id, **kwargs): ...

class GenericTransfer:
    def __init__(self, sql, destination_table, source_conn_id, destination_conn_id, **kwargs): ...

SQL Operators

SQL Sensors

Monitoring sensors that periodically check database conditions and states. Enables data-driven workflow orchestration based on SQL query results.

class SqlSensor:
    def __init__(self, conn_id, sql, parameters=None, success=None, failure=None, **kwargs): ...

SQL Sensors

SQL Triggers

Asynchronous triggers for executing SQL operations without blocking the Airflow scheduler. Enables efficient handling of long-running database operations.

class SQLExecuteQueryTrigger:
    def __init__(self, sql, conn_id, hook_params=None, **kwargs): ...
    def serialize(self): ...
    def get_hook(self): ...

SQL Triggers

SQL Dialects

Database-specific SQL formatting and operations. Provides abstraction layer for handling differences between SQL databases including query formatting and data type handling.

class Dialect:
    def escape_word(self, word): ...
    def generate_insert_sql(self, table, values, target_fields, replace=False): ...
    def get_column_names(self, table): ...
    def get_primary_keys(self, table): ...

SQL Dialects

Types

from typing import Any, Dict, List, Optional, Union, Callable, Protocol, Mapping, Iterable, Sequence
from operator import itemgetter

# Connection protocol for database connectors
class ConnectorProtocol(Protocol):
    def connect(self, host: str, port: int, username: str, schema: str) -> Any: ...

# Common type aliases
SQL = Union[str, List[str]]
Parameters = Optional[Union[Mapping[str, Any], Iterable]]
Handler = Optional[Callable[[Any], Any]]
ResultProcessor = Callable[[Any], Any]
SuccessCriteria = Optional[Callable[[Any], bool]]
FailureCriteria = Optional[Callable[[Any], bool]]
Selector = Callable[[tuple], Any]

# Version compatibility flags
AIRFLOW_V_3_0_PLUS: bool
AIRFLOW_V_3_1_PLUS: bool

# SQL placeholders
SQL_PLACEHOLDERS: frozenset[str]  # {"%s", "?"}