or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-sqlite

Provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-sqlite@4.1.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-sqlite@4.1.0

index.mddocs/

Apache Airflow SQLite Provider

A provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations. This package extends Airflow's SQL capabilities with SQLite-specific connection handling, enabling seamless integration of SQLite databases into data workflows.

Package Information

  • Package Name: apache-airflow-providers-sqlite
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-sqlite
  • Minimum Requirements: Python >=3.10, Apache Airflow >=2.10.0, apache-airflow-providers-common-sql >=1.26.0

Core Imports

# Main hook import
from airflow.providers.sqlite.hooks.sqlite import SqliteHook

# For type hints and advanced usage
import sqlite3
from airflow.models import Connection
from sqlalchemy.engine import Engine, Inspector
from sqlalchemy.engine.url import URL

Additional imports for DataFrame operations:

# For pandas DataFrames (optional dependency)
from pandas import DataFrame as PandasDataFrame

# For polars DataFrames (optional dependency) 
from polars import DataFrame as PolarsDataFrame

Basic Usage

from airflow.providers.sqlite.hooks.sqlite import SqliteHook

# Initialize hook with connection ID
hook = SqliteHook(sqlite_conn_id='sqlite_default')

# Execute a query and get all results
results = hook.get_records("SELECT * FROM users WHERE active = ?", parameters=[True])

# Execute a query and get first result only  
first_result = hook.get_first("SELECT COUNT(*) FROM users")

# Run SQL commands (INSERT, UPDATE, DELETE)
hook.run("INSERT INTO users (name, email) VALUES (?, ?)", parameters=["John Doe", "john@example.com"])

# Get results as pandas DataFrame (requires pandas)
df = hook.get_df("SELECT * FROM users", df_type="pandas")

# Get results as polars DataFrame (requires polars)
df = hook.get_df("SELECT * FROM users", df_type="polars")

# Bulk insert multiple rows
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
hook.insert_rows(table="users", rows=rows, target_fields=["name", "email"])

# Test connection
status, message = hook.test_connection()
if status:
    print("Connection successful")
else:
    print(f"Connection failed: {message}")

Connection Configuration

SQLite connections support various URI formats:

# File-based database (relative path)
sqlite:///path/to/database.db

# File-based database (absolute path)
sqlite:////absolute/path/to/database.db

# In-memory database
sqlite:///:memory:

# With query parameters
sqlite:///path/to/db.sqlite?mode=ro
sqlite:///path/to/db.sqlite?mode=rw
sqlite:///path/to/db.sqlite?cache=shared

Capabilities

Hook Class Definition

The SqliteHook class provides SQLite database integration.

class SqliteHook(DbApiHook):
    """
    Interact with SQLite databases.
    
    Class Attributes:
        conn_name_attr: str = "sqlite_conn_id"
        default_conn_name: str = "sqlite_default"
        conn_type: str = "sqlite" 
        hook_name: str = "Sqlite"
    """
    
    def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwargs):
        """
        Initialize SQLite hook.
        
        Args:
            *args: If single positional arg provided, used as connection ID
            schema (str, optional): Database schema (typically not used with SQLite)
            log_sql (bool): Whether to log SQL statements (default: True)
            **kwargs: Additional keyword arguments, including connection ID via conn_name_attr
        """

Connection Management

Establish and manage SQLite database connections with proper URI handling.

def get_conn(self) -> sqlite3.dbapi2.Connection:
    """
    Return SQLite connection object with proper URI conversion.
    
    Converts SQLAlchemy URI format to sqlite3-compatible file URI format.
    Handles file paths, in-memory databases, and query parameters.
    
    Returns:
        sqlite3.dbapi2.Connection: SQLite database connection
    """

def get_uri(self) -> str:
    """
    Override DbApiHook get_uri method for SQLAlchemy engine compatibility.
    
    Transforms Airflow connection URI to SQLAlchemy-compatible format,
    handling SQLite-specific URI requirements.
    
    Returns:
        str: SQLAlchemy-compatible URI string  
    """

def get_conn_id(self) -> str:
    """
    Get the connection ID used by this hook.
    
    Returns:
        str: Connection ID
    """

def get_cursor(self):
    """
    Get database cursor for executing SQL statements.
    
    Returns:
        sqlite3.Cursor: Database cursor object
    """

def test_connection(self):
    """
    Test the SQLite database connection.
    
    Returns:
        tuple[bool, str]: (connection_success, status_message)
    """

SQL Execution

Execute SQL statements with parameter binding and transaction control.

def run(self, sql, autocommit: bool = False, parameters=None, handler=None, 
        split_statements: bool = False, return_last: bool = True):
    """
    Execute SQL statement(s) with optional parameter binding.
    
    Args:
        sql (str | list[str]): SQL statement(s) to execute
        autocommit (bool): Enable autocommit mode (default: False)
        parameters (list | dict, optional): Query parameters for binding
        handler (callable, optional): Result handler function
        split_statements (bool): Split multiple statements (default: False)
        return_last (bool): Return result from last statement only (default: True)
        
    Returns:
        any: Query results based on handler, or None for non-SELECT statements
    """

def get_records(self, sql: str, parameters=None) -> list[tuple]:
    """
    Execute SQL query and return all records.
    
    Args:
        sql (str): SQL query to execute
        parameters (list | dict, optional): Query parameters for binding
        
    Returns:
        list[tuple]: List of result tuples
    """

def get_first(self, sql: str, parameters=None):
    """
    Execute SQL query and return first record.
    
    Args:
        sql (str): SQL query to execute  
        parameters (list | dict, optional): Query parameters for binding
        
    Returns:
        tuple | None: First result tuple or None if no results
    """

DataFrame Operations

Convert query results to pandas or polars DataFrames for data analysis.

def get_df(self, sql: str, parameters=None, *, df_type: str = "pandas", **kwargs):
    """
    Execute SQL query and return results as DataFrame.
    
    Args:
        sql (str): SQL query to execute
        parameters (list | dict, optional): Query parameters for binding
        df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
        **kwargs: Additional arguments passed to DataFrame constructor
        
    Returns:
        PandasDataFrame | PolarsDataFrame: DataFrame with query results
    """

def get_df_by_chunks(self, sql: str, parameters=None, *, chunksize: int, 
                     df_type: str = "pandas", **kwargs):
    """
    Execute SQL query and return results as DataFrame chunks.
    
    Args:
        sql (str): SQL query to execute
        parameters (list | dict, optional): Query parameters for binding
        chunksize (int): Number of rows per chunk (required)
        df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
        **kwargs: Additional arguments passed to DataFrame constructor
        
    Yields:
        PandasDataFrame | PolarsDataFrame: Iterator of DataFrame chunks
    """

Bulk Operations

Efficiently insert multiple rows with batching and transaction control.

def insert_rows(self, table: str, rows, target_fields=None, commit_every: int = 1000, 
                replace: bool = False, *, executemany: bool = False, 
                fast_executemany: bool = False, autocommit: bool = False, **kwargs) -> None:
    """
    Insert multiple rows into table with batching and optional replacement.
    
    Args:
        table (str): Target table name
        rows (Iterable): Collection of row tuples to insert
        target_fields (list[str], optional): Column names for insertion
        commit_every (int): Commit transaction every N rows (default: 1000)
        replace (bool): Use REPLACE INTO instead of INSERT INTO (default: False)
        executemany (bool): Use cursor.executemany() for batch insertion (default: False)
        fast_executemany (bool): Use fast executemany if supported (default: False)
        autocommit (bool): Enable autocommit mode (default: False) 
        **kwargs: Additional arguments for customization
    """

SQLAlchemy Integration

Access SQLAlchemy engines and metadata for advanced database operations.

def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:
    """
    Get SQLAlchemy engine for advanced database operations.
    
    Args:
        engine_kwargs (dict, optional): Additional engine configuration parameters
        
    Returns:
        Engine: SQLAlchemy engine instance
    """

@property
def sqlalchemy_url(self) -> URL:
    """
    SQLAlchemy URL object for this connection.
    
    Returns:
        URL: SQLAlchemy URL object
    """

@property  
def inspector(self) -> Inspector:
    """
    SQLAlchemy Inspector for database metadata.
    
    Returns:
        Inspector: Database inspector instance
    """

Transaction Control

Manage database transactions and autocommit behavior.

def get_autocommit(self, conn) -> bool:
    """
    Get autocommit setting for connection.
    
    Args:
        conn: Database connection object
        
    Returns:
        bool: Current autocommit status
    """

def set_autocommit(self, conn, autocommit: bool) -> None:
    """
    Set autocommit flag on connection.
    
    Args:
        conn: Database connection object
        autocommit (bool): Autocommit setting to apply
    """

Properties and Utilities

Helper methods and properties for SQL operations and metadata access.

@property
def placeholder(self) -> str:
    """
    SQL parameter placeholder character for SQLite.
    
    Returns:
        str: "?" (question mark placeholder)
    """

@property
def connection(self) -> Connection:
    """
    Airflow connection object for this hook.
    
    Returns:
        Connection: Connection object instance
    """

@property
def connection_extra(self) -> dict:
    """
    Connection extra parameters as dictionary.
    
    Returns:
        dict: Extra connection parameters from connection configuration
    """

@property
def last_description(self) -> list:
    """
    Description from last executed cursor.
    
    Returns:
        list: Cursor description with column metadata
    """

@staticmethod
def split_sql_string(sql: str, strip_semicolon: bool = False) -> list[str]:
    """
    Split SQL string into individual statements.
    
    Args:
        sql (str): SQL string with multiple statements
        strip_semicolon (bool): Remove trailing semicolons (default: False)
        
    Returns:
        list[str]: List of individual SQL statements
    """

@staticmethod
def strip_sql_string(sql: str) -> str:
    """
    Strip whitespace and comments from SQL string.
    
    Args:
        sql (str): SQL string to clean
        
    Returns:
        str: Cleaned SQL string
    """

Provider Metadata

Access provider configuration and metadata.

# From airflow.providers.sqlite.get_provider_info
def get_provider_info() -> dict:
    """
    Get provider metadata including integrations and connection types.
    
    Returns:
        dict: Provider metadata containing:
            - package-name: "apache-airflow-providers-sqlite"
            - name: "SQLite" 
            - description: SQLite provider description
            - integrations: List of SQLite integration info
            - hooks: List of available hook modules
            - connection-types: List of supported connection types
    """

Types

# Type aliases for clarity
PandasDataFrame = "pandas.DataFrame"
PolarsDataFrame = "polars.DataFrame" 
Connection = "airflow.models.Connection"
Engine = "sqlalchemy.engine.Engine"
Inspector = "sqlalchemy.engine.Inspector"
URL = "sqlalchemy.engine.URL"

Error Handling

The SQLite hook handles common database errors and connection issues:

  • Connection errors: Invalid file paths, permission issues, database corruption
  • SQL errors: Syntax errors, constraint violations, table/column not found
  • Transaction errors: Deadlocks, lock timeouts, rollback scenarios
  • URI format errors: Invalid connection string formats, parameter parsing

Common error patterns:

import sqlite3
from airflow.exceptions import AirflowException

try:
    hook = SqliteHook(sqlite_conn_id='my_sqlite_conn')
    results = hook.get_records("SELECT * FROM users")
except sqlite3.Error as e:
    # Handle SQLite-specific errors
    print(f"Database error: {e}")
except AirflowException as e:
    # Handle Airflow-specific errors (connection not found, etc.)
    print(f"Airflow error: {e}")
except Exception as e:
    # Handle other errors
    print(f"General error: {e}")

Usage Examples

Working with In-Memory Databases

# Connection URI: sqlite:///:memory:
hook = SqliteHook(sqlite_conn_id='sqlite_memory')
hook.run("CREATE TABLE temp_data (id INTEGER, value TEXT)")
hook.insert_rows("temp_data", [(1, "test"), (2, "data")])
results = hook.get_records("SELECT * FROM temp_data")

File Database with Custom Parameters

# Connection URI: sqlite:///path/to/db.sqlite?mode=rw&cache=shared
hook = SqliteHook(sqlite_conn_id='sqlite_file')
results = hook.get_df("SELECT * FROM large_table", df_type="pandas")

Batch Processing with Chunked DataFrames

# Process large results in chunks to manage memory
for chunk_df in hook.get_df_by_chunks("SELECT * FROM big_table", chunksize=1000):
    # Process each chunk
    processed_chunk = chunk_df.groupby('category').sum()
    # Save or further process results
    print(f"Processed chunk with {len(chunk_df)} rows")

Transaction Management

# Manual transaction control
with hook._create_autocommit_connection(autocommit=False) as conn:
    cursor = conn.cursor()
    try:
        cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
        cursor.execute("UPDATE users SET active = 1 WHERE name = ?", ("Alice",))
        conn.commit()
    except Exception:
        conn.rollback()
        raise

Using SQLAlchemy Engine

# Get SQLAlchemy engine for advanced operations
engine = hook.get_sqlalchemy_engine()
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM users")
    for row in result:
        print(row)

Connection Configuration

  • Connection Type: sqlite
  • Hook Class: airflow.providers.sqlite.hooks.sqlite.SqliteHook
  • Supported URI Schemes: sqlite://
  • Default Connection ID: sqlite_default

Dependencies

  • Required: apache-airflow>=2.10.0, apache-airflow-providers-common-sql>=1.26.0
  • Python: >=3.10
  • Optional: pandas (for pandas DataFrame support), polars (for polars DataFrame support)

Provider Information

  • Package Name: apache-airflow-providers-sqlite
  • Provider Name: SQLite
  • Integration: SQLite database integration
  • External Documentation: https://www.sqlite.org/index.html