Provider package offering common SQL functionality for Apache Airflow including hooks, operators, sensors, and triggers for SQL database operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Database hooks provide the foundation for connecting to and interacting with SQL databases in Airflow. The DbApiHook serves as an abstract base class that provides common database operations, while specific database providers extend this class for their particular database systems.
The primary abstract base class for all SQL database hooks, providing connection management, query execution, and data operations.
class DbApiHook:
"""
Abstract base class for SQL hooks that provides common database operations.
Attributes:
conn_name_attr (str): Name of the default connection attribute
default_conn_name (str): Default connection ID
supports_autocommit (bool): Whether database supports autocommit
supports_executemany (bool): Whether database supports executemany
placeholder (str): SQL placeholder character for parameters
connection: Database connection object
sqlalchemy_url: SQLAlchemy URL object
inspector: SQLAlchemy inspector for schema operations
dialect: Database dialect object for SQL formatting
reserved_words: Set of database reserved words
"""
def get_conn(self):
"""
Get database connection.
Returns:
Database connection object
"""
def get_uri(self):
"""
Extract URI from connection.
Returns:
str: Database connection URI
"""
def get_sqlalchemy_engine(self, engine_kwargs=None):
"""
Get SQLAlchemy engine for the connection.
Args:
engine_kwargs (dict, optional): Additional engine parameters
Returns:
sqlalchemy.engine.Engine: SQLAlchemy engine
"""
def get_df(self, sql, parameters=None, **kwargs):
"""
Execute SQL query and return results as a DataFrame.
Args:
sql (str): SQL query to execute
parameters (dict, optional): Query parameters
**kwargs: Additional arguments for DataFrame creation
Returns:
pandas.DataFrame or polars.DataFrame: Query results as DataFrame
"""
def get_df_by_chunks(self, sql, parameters=None, chunksize=None, **kwargs):
"""
Execute SQL query and return results as chunked DataFrames.
Args:
sql (str): SQL query to execute
parameters (dict, optional): Query parameters
chunksize (int, optional): Number of rows per chunk
**kwargs: Additional arguments for DataFrame creation
Returns:
Generator yielding DataFrame chunks
"""
def get_records(self, sql, parameters=None):
"""
Execute SQL query and return results as list of tuples.
Args:
sql (str): SQL query to execute
parameters (dict, optional): Query parameters
Returns:
list: List of result tuples
"""
def get_first(self, sql, parameters=None):
"""
Execute SQL query and return first row.
Args:
sql (str): SQL query to execute
parameters (dict, optional): Query parameters
Returns:
tuple or None: First result row or None if no results
"""
def run(self, sql, autocommit=False, parameters=None, handler=None, split_statements=True, return_last=True):
"""
Execute SQL command(s) with various options.
Args:
sql (str or list): SQL statement(s) to execute
autocommit (bool): Whether to commit automatically
parameters (dict, optional): Query parameters
handler (callable, optional): Result handler function
split_statements (bool): Whether to split multiple statements
return_last (bool): Whether to return only last result
Returns:
Any: Query results based on handler and return_last settings
"""
def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):
"""
Insert rows into database table.
Args:
table (str): Target table name
rows (list): List of row tuples to insert
target_fields (list, optional): Target column names
commit_every (int): Commit after this many rows
replace (bool): Whether to use REPLACE instead of INSERT
"""
def bulk_dump(self, table, tmp_file):
"""
Dump table contents to file.
Args:
table (str): Table name to dump
tmp_file (str): Target file path
"""
def bulk_load(self, table, tmp_file):
"""
Load file contents into table.
Args:
table (str): Target table name
tmp_file (str): Source file path
"""
def test_connection(self):
"""
Test database connection.
Returns:
tuple: (success: bool, message: str)
"""Protocol interface defining the expected database connection interface.
class ConnectorProtocol:
"""
Protocol defining database connection interface.
"""
def connect(self, host: str, port: int, username: str, schema: str):
"""
Connect to database.
Args:
host (str): Database host
port (int): Database port
username (str): Username for connection
schema (str): Database schema/name
Returns:
Database connection object
"""Pre-built handler functions for processing query results.
def fetch_all_handler(cursor):
"""
Handler to fetch all query results.
Args:
cursor: Database cursor object
Returns:
list: All query results
"""
def fetch_one_handler(cursor):
"""
Handler to fetch first query result.
Args:
cursor: Database cursor object
Returns:
Any: First query result or None
"""
def return_single_query_results(sql, return_last, split_statements, result):
"""
Determine when to return single vs multiple query results.
Args:
sql (str or list): Original SQL statement(s)
return_last (bool): Whether to return only last result
split_statements (bool): Whether statements were split
result: Query results
Returns:
Any: Processed results based on parameters
"""# Set of supported SQL placeholders
SQL_PLACEHOLDERS: frozenset = frozenset({"%s", "?"})from airflow.providers.common.sql.hooks.sql import DbApiHook
# Custom hook extending DbApiHook
class MyDatabaseHook(DbApiHook):
conn_name_attr = 'my_conn_id'
default_conn_name = 'my_default_conn'
supports_autocommit = True
def get_conn(self):
# Implementation specific to your database
pass
# Use the hook
hook = MyDatabaseHook(conn_id='my_database')
# Execute query and get records
results = hook.get_records('SELECT * FROM users WHERE active = %s', parameters=[True])
# Execute query and get DataFrame
df = hook.get_df('SELECT name, email FROM users LIMIT 10')
# Insert data
rows = [('John', 'john@example.com'), ('Jane', 'jane@example.com')]
hook.insert_rows('users', rows, target_fields=['name', 'email'])
# Test connection
success, message = hook.test_connection()from airflow.providers.common.sql.hooks.handlers import fetch_all_handler, fetch_one_handler
# Execute with custom handler
result = hook.run(
'SELECT COUNT(*) FROM orders WHERE date = %s',
parameters=['2023-01-01'],
handler=fetch_one_handler
)
# Get only the count value
count = result[0] if result else 0Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-sql