Provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-sqlite@4.1.0A provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations. This package extends Airflow's SQL capabilities with SQLite-specific connection handling, enabling seamless integration of SQLite databases into data workflows.
pip install apache-airflow-providers-sqlite# Main hook import
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
# For type hints and advanced usage
import sqlite3
from airflow.models import Connection
from sqlalchemy.engine import Engine, Inspector
from sqlalchemy.engine.url import URLAdditional imports for DataFrame operations:
# For pandas DataFrames (optional dependency)
from pandas import DataFrame as PandasDataFrame
# For polars DataFrames (optional dependency)
from polars import DataFrame as PolarsDataFramefrom airflow.providers.sqlite.hooks.sqlite import SqliteHook
# Initialize hook with connection ID
hook = SqliteHook(sqlite_conn_id='sqlite_default')
# Execute a query and get all results
results = hook.get_records("SELECT * FROM users WHERE active = ?", parameters=[True])
# Execute a query and get first result only
first_result = hook.get_first("SELECT COUNT(*) FROM users")
# Run SQL commands (INSERT, UPDATE, DELETE)
hook.run("INSERT INTO users (name, email) VALUES (?, ?)", parameters=["John Doe", "john@example.com"])
# Get results as pandas DataFrame (requires pandas)
df = hook.get_df("SELECT * FROM users", df_type="pandas")
# Get results as polars DataFrame (requires polars)
df = hook.get_df("SELECT * FROM users", df_type="polars")
# Bulk insert multiple rows
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
hook.insert_rows(table="users", rows=rows, target_fields=["name", "email"])
# Test connection
status, message = hook.test_connection()
if status:
print("Connection successful")
else:
print(f"Connection failed: {message}")SQLite connections support various URI formats:
# File-based database (relative path)
sqlite:///path/to/database.db
# File-based database (absolute path)
sqlite:////absolute/path/to/database.db
# In-memory database
sqlite:///:memory:
# With query parameters
sqlite:///path/to/db.sqlite?mode=ro
sqlite:///path/to/db.sqlite?mode=rw
sqlite:///path/to/db.sqlite?cache=sharedThe SqliteHook class provides SQLite database integration.
class SqliteHook(DbApiHook):
"""
Interact with SQLite databases.
Class Attributes:
conn_name_attr: str = "sqlite_conn_id"
default_conn_name: str = "sqlite_default"
conn_type: str = "sqlite"
hook_name: str = "Sqlite"
"""
def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwargs):
"""
Initialize SQLite hook.
Args:
*args: If single positional arg provided, used as connection ID
schema (str, optional): Database schema (typically not used with SQLite)
log_sql (bool): Whether to log SQL statements (default: True)
**kwargs: Additional keyword arguments, including connection ID via conn_name_attr
"""Establish and manage SQLite database connections with proper URI handling.
def get_conn(self) -> sqlite3.dbapi2.Connection:
"""
Return SQLite connection object with proper URI conversion.
Converts SQLAlchemy URI format to sqlite3-compatible file URI format.
Handles file paths, in-memory databases, and query parameters.
Returns:
sqlite3.dbapi2.Connection: SQLite database connection
"""
def get_uri(self) -> str:
"""
Override DbApiHook get_uri method for SQLAlchemy engine compatibility.
Transforms Airflow connection URI to SQLAlchemy-compatible format,
handling SQLite-specific URI requirements.
Returns:
str: SQLAlchemy-compatible URI string
"""
def get_conn_id(self) -> str:
"""
Get the connection ID used by this hook.
Returns:
str: Connection ID
"""
def get_cursor(self):
"""
Get database cursor for executing SQL statements.
Returns:
sqlite3.Cursor: Database cursor object
"""
def test_connection(self):
"""
Test the SQLite database connection.
Returns:
tuple[bool, str]: (connection_success, status_message)
"""Execute SQL statements with parameter binding and transaction control.
def run(self, sql, autocommit: bool = False, parameters=None, handler=None,
split_statements: bool = False, return_last: bool = True):
"""
Execute SQL statement(s) with optional parameter binding.
Args:
sql (str | list[str]): SQL statement(s) to execute
autocommit (bool): Enable autocommit mode (default: False)
parameters (list | dict, optional): Query parameters for binding
handler (callable, optional): Result handler function
split_statements (bool): Split multiple statements (default: False)
return_last (bool): Return result from last statement only (default: True)
Returns:
any: Query results based on handler, or None for non-SELECT statements
"""
def get_records(self, sql: str, parameters=None) -> list[tuple]:
"""
Execute SQL query and return all records.
Args:
sql (str): SQL query to execute
parameters (list | dict, optional): Query parameters for binding
Returns:
list[tuple]: List of result tuples
"""
def get_first(self, sql: str, parameters=None):
"""
Execute SQL query and return first record.
Args:
sql (str): SQL query to execute
parameters (list | dict, optional): Query parameters for binding
Returns:
tuple | None: First result tuple or None if no results
"""Convert query results to pandas or polars DataFrames for data analysis.
def get_df(self, sql: str, parameters=None, *, df_type: str = "pandas", **kwargs):
"""
Execute SQL query and return results as DataFrame.
Args:
sql (str): SQL query to execute
parameters (list | dict, optional): Query parameters for binding
df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
**kwargs: Additional arguments passed to DataFrame constructor
Returns:
PandasDataFrame | PolarsDataFrame: DataFrame with query results
"""
def get_df_by_chunks(self, sql: str, parameters=None, *, chunksize: int,
df_type: str = "pandas", **kwargs):
"""
Execute SQL query and return results as DataFrame chunks.
Args:
sql (str): SQL query to execute
parameters (list | dict, optional): Query parameters for binding
chunksize (int): Number of rows per chunk (required)
df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
**kwargs: Additional arguments passed to DataFrame constructor
Yields:
PandasDataFrame | PolarsDataFrame: Iterator of DataFrame chunks
"""Efficiently insert multiple rows with batching and transaction control.
def insert_rows(self, table: str, rows, target_fields=None, commit_every: int = 1000,
replace: bool = False, *, executemany: bool = False,
fast_executemany: bool = False, autocommit: bool = False, **kwargs) -> None:
"""
Insert multiple rows into table with batching and optional replacement.
Args:
table (str): Target table name
rows (Iterable): Collection of row tuples to insert
target_fields (list[str], optional): Column names for insertion
commit_every (int): Commit transaction every N rows (default: 1000)
replace (bool): Use REPLACE INTO instead of INSERT INTO (default: False)
executemany (bool): Use cursor.executemany() for batch insertion (default: False)
fast_executemany (bool): Use fast executemany if supported (default: False)
autocommit (bool): Enable autocommit mode (default: False)
**kwargs: Additional arguments for customization
"""Access SQLAlchemy engines and metadata for advanced database operations.
def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:
"""
Get SQLAlchemy engine for advanced database operations.
Args:
engine_kwargs (dict, optional): Additional engine configuration parameters
Returns:
Engine: SQLAlchemy engine instance
"""
@property
def sqlalchemy_url(self) -> URL:
"""
SQLAlchemy URL object for this connection.
Returns:
URL: SQLAlchemy URL object
"""
@property
def inspector(self) -> Inspector:
"""
SQLAlchemy Inspector for database metadata.
Returns:
Inspector: Database inspector instance
"""Manage database transactions and autocommit behavior.
def get_autocommit(self, conn) -> bool:
"""
Get autocommit setting for connection.
Args:
conn: Database connection object
Returns:
bool: Current autocommit status
"""
def set_autocommit(self, conn, autocommit: bool) -> None:
"""
Set autocommit flag on connection.
Args:
conn: Database connection object
autocommit (bool): Autocommit setting to apply
"""Helper methods and properties for SQL operations and metadata access.
@property
def placeholder(self) -> str:
"""
SQL parameter placeholder character for SQLite.
Returns:
str: "?" (question mark placeholder)
"""
@property
def connection(self) -> Connection:
"""
Airflow connection object for this hook.
Returns:
Connection: Connection object instance
"""
@property
def connection_extra(self) -> dict:
"""
Connection extra parameters as dictionary.
Returns:
dict: Extra connection parameters from connection configuration
"""
@property
def last_description(self) -> list:
"""
Description from last executed cursor.
Returns:
list: Cursor description with column metadata
"""
@staticmethod
def split_sql_string(sql: str, strip_semicolon: bool = False) -> list[str]:
"""
Split SQL string into individual statements.
Args:
sql (str): SQL string with multiple statements
strip_semicolon (bool): Remove trailing semicolons (default: False)
Returns:
list[str]: List of individual SQL statements
"""
@staticmethod
def strip_sql_string(sql: str) -> str:
"""
Strip whitespace and comments from SQL string.
Args:
sql (str): SQL string to clean
Returns:
str: Cleaned SQL string
"""Access provider configuration and metadata.
# From airflow.providers.sqlite.get_provider_info
def get_provider_info() -> dict:
"""
Get provider metadata including integrations and connection types.
Returns:
dict: Provider metadata containing:
- package-name: "apache-airflow-providers-sqlite"
- name: "SQLite"
- description: SQLite provider description
- integrations: List of SQLite integration info
- hooks: List of available hook modules
- connection-types: List of supported connection types
"""# Type aliases for clarity
PandasDataFrame = "pandas.DataFrame"
PolarsDataFrame = "polars.DataFrame"
Connection = "airflow.models.Connection"
Engine = "sqlalchemy.engine.Engine"
Inspector = "sqlalchemy.engine.Inspector"
URL = "sqlalchemy.engine.URL"The SQLite hook handles common database errors and connection issues:
Common error patterns:
import sqlite3
from airflow.exceptions import AirflowException
try:
hook = SqliteHook(sqlite_conn_id='my_sqlite_conn')
results = hook.get_records("SELECT * FROM users")
except sqlite3.Error as e:
# Handle SQLite-specific errors
print(f"Database error: {e}")
except AirflowException as e:
# Handle Airflow-specific errors (connection not found, etc.)
print(f"Airflow error: {e}")
except Exception as e:
# Handle other errors
print(f"General error: {e}")# Connection URI: sqlite:///:memory:
hook = SqliteHook(sqlite_conn_id='sqlite_memory')
hook.run("CREATE TABLE temp_data (id INTEGER, value TEXT)")
hook.insert_rows("temp_data", [(1, "test"), (2, "data")])
results = hook.get_records("SELECT * FROM temp_data")# Connection URI: sqlite:///path/to/db.sqlite?mode=rw&cache=shared
hook = SqliteHook(sqlite_conn_id='sqlite_file')
results = hook.get_df("SELECT * FROM large_table", df_type="pandas")# Process large results in chunks to manage memory
for chunk_df in hook.get_df_by_chunks("SELECT * FROM big_table", chunksize=1000):
# Process each chunk
processed_chunk = chunk_df.groupby('category').sum()
# Save or further process results
print(f"Processed chunk with {len(chunk_df)} rows")# Manual transaction control
with hook._create_autocommit_connection(autocommit=False) as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
cursor.execute("UPDATE users SET active = 1 WHERE name = ?", ("Alice",))
conn.commit()
except Exception:
conn.rollback()
raise# Get SQLAlchemy engine for advanced operations
engine = hook.get_sqlalchemy_engine()
with engine.connect() as conn:
result = conn.execute("SELECT * FROM users")
for row in result:
print(row)sqliteairflow.providers.sqlite.hooks.sqlite.SqliteHooksqlite://sqlite_defaultapache-airflow>=2.10.0, apache-airflow-providers-common-sql>=1.26.0pandas (for pandas DataFrame support), polars (for polars DataFrame support)