CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-sql

Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

hooks.mddocs/

Database Hooks

Database hooks provide the foundation for connecting to and interacting with SQL databases in Airflow. The DbApiHook serves as an abstract base class that provides common database operations, while specific database providers extend this class for their particular database systems.

Capabilities

Core Database Hook

The primary abstract base class for all SQL database hooks, providing connection management, query execution, and data operations.

class DbApiHook:
    """
    Abstract base class for SQL hooks that provides common database operations.
    
    Attributes:
        conn_name_attr (str): Name of the default connection attribute
        default_conn_name (str): Default connection ID
        supports_autocommit (bool): Whether database supports autocommit
        supports_executemany (bool): Whether database supports executemany
        placeholder (str): SQL placeholder character for parameters
        connection: Database connection object
        sqlalchemy_url: SQLAlchemy URL object
        inspector: SQLAlchemy inspector for schema operations
        dialect: Database dialect object for SQL formatting
        reserved_words: Set of database reserved words
    """
    
    def get_conn(self):
        """
        Get database connection.
        
        Returns:
            Database connection object
        """
    
    def get_uri(self):
        """
        Extract URI from connection.
        
        Returns:
            str: Database connection URI
        """
    
    def get_sqlalchemy_engine(self, engine_kwargs=None):
        """
        Get SQLAlchemy engine for the connection.
        
        Args:
            engine_kwargs (dict, optional): Additional engine parameters
            
        Returns:
            sqlalchemy.engine.Engine: SQLAlchemy engine
        """
    
    def get_df(self, sql, parameters=None, **kwargs):
        """
        Execute SQL query and return results as a DataFrame.
        
        Args:
            sql (str): SQL query to execute
            parameters (dict, optional): Query parameters
            **kwargs: Additional arguments for DataFrame creation
            
        Returns:
            pandas.DataFrame or polars.DataFrame: Query results as DataFrame
        """
    
    def get_df_by_chunks(self, sql, parameters=None, chunksize=None, **kwargs):
        """
        Execute SQL query and return results as chunked DataFrames.
        
        Args:
            sql (str): SQL query to execute
            parameters (dict, optional): Query parameters
            chunksize (int, optional): Number of rows per chunk
            **kwargs: Additional arguments for DataFrame creation
            
        Returns:
            Generator yielding DataFrame chunks
        """
    
    def get_records(self, sql, parameters=None):
        """
        Execute SQL query and return results as list of tuples.
        
        Args:
            sql (str): SQL query to execute
            parameters (dict, optional): Query parameters
            
        Returns:
            list: List of result tuples
        """
    
    def get_first(self, sql, parameters=None):
        """
        Execute SQL query and return first row.
        
        Args:
            sql (str): SQL query to execute
            parameters (dict, optional): Query parameters
            
        Returns:
            tuple or None: First result row or None if no results
        """
    
    def run(self, sql, autocommit=False, parameters=None, handler=None, split_statements=True, return_last=True):
        """
        Execute SQL command(s) with various options.
        
        Args:
            sql (str or list): SQL statement(s) to execute
            autocommit (bool): Whether to commit automatically
            parameters (dict, optional): Query parameters
            handler (callable, optional): Result handler function
            split_statements (bool): Whether to split multiple statements
            return_last (bool): Whether to return only last result
            
        Returns:
            Any: Query results based on handler and return_last settings
        """
    
    def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):
        """
        Insert rows into database table.
        
        Args:
            table (str): Target table name
            rows (list): List of row tuples to insert
            target_fields (list, optional): Target column names
            commit_every (int): Commit after this many rows
            replace (bool): Whether to use REPLACE instead of INSERT
        """
    
    def bulk_dump(self, table, tmp_file):
        """
        Dump table contents to file.
        
        Args:
            table (str): Table name to dump
            tmp_file (str): Target file path
        """
    
    def bulk_load(self, table, tmp_file):
        """
        Load file contents into table.
        
        Args:
            table (str): Target table name
            tmp_file (str): Source file path
        """
    
    def test_connection(self):
        """
        Test database connection.
        
        Returns:
            tuple: (success: bool, message: str)
        """

Database Connection Protocol

Protocol interface defining the expected database connection interface.

class ConnectorProtocol:
    """
    Protocol defining database connection interface.
    """
    
    def connect(self, host: str, port: int, username: str, schema: str):
        """
        Connect to database.
        
        Args:
            host (str): Database host
            port (int): Database port
            username (str): Username for connection
            schema (str): Database schema/name
            
        Returns:
            Database connection object
        """

Result Handler Functions

Pre-built handler functions for processing query results.

def fetch_all_handler(cursor):
    """
    Handler to fetch all query results.
    
    Args:
        cursor: Database cursor object
        
    Returns:
        list: All query results
    """

def fetch_one_handler(cursor):
    """
    Handler to fetch first query result.
    
    Args:
        cursor: Database cursor object
        
    Returns:
        Any: First query result or None
    """

def return_single_query_results(sql, return_last, split_statements, result):
    """
    Determine when to return single vs multiple query results.
    
    Args:
        sql (str or list): Original SQL statement(s)
        return_last (bool): Whether to return only last result
        split_statements (bool): Whether statements were split
        result: Query results
        
    Returns:
        Any: Processed results based on parameters
    """

Constants

# Set of supported SQL placeholders
SQL_PLACEHOLDERS: frozenset = frozenset({"%s", "?"})

Usage Examples

Basic Database Operations

from airflow.providers.common.sql.hooks.sql import DbApiHook

# Custom hook extending DbApiHook
class MyDatabaseHook(DbApiHook):
    conn_name_attr = 'my_conn_id'
    default_conn_name = 'my_default_conn'
    supports_autocommit = True
    
    def get_conn(self):
        # Implementation specific to your database
        pass

# Use the hook
hook = MyDatabaseHook(conn_id='my_database')

# Execute query and get records
results = hook.get_records('SELECT * FROM users WHERE active = %s', parameters=[True])

# Execute query and get DataFrame
df = hook.get_df('SELECT name, email FROM users LIMIT 10')

# Insert data
rows = [('John', 'john@example.com'), ('Jane', 'jane@example.com')]
hook.insert_rows('users', rows, target_fields=['name', 'email'])

# Test connection
success, message = hook.test_connection()

Using Result Handlers

from airflow.providers.common.sql.hooks.handlers import fetch_all_handler, fetch_one_handler

# Execute with custom handler
result = hook.run(
    'SELECT COUNT(*) FROM orders WHERE date = %s',
    parameters=['2023-01-01'],
    handler=fetch_one_handler
)

# Get only the count value
count = result[0] if result else 0

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-sql

docs

dialects.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json