CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-sqlite

Provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

Apache Airflow SQLite Provider

A provider package that integrates SQLite databases with Apache Airflow for workflow orchestration and data pipeline operations. This package extends Airflow's SQL capabilities with SQLite-specific connection handling, enabling seamless integration of SQLite databases into data workflows.

Package Information

  • Package Name: apache-airflow-providers-sqlite
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-sqlite
  • Minimum Requirements: Python >=3.10, Apache Airflow >=2.10.0, apache-airflow-providers-common-sql >=1.26.0

Core Imports

# Main hook import
from airflow.providers.sqlite.hooks.sqlite import SqliteHook

# For type hints and advanced usage
import sqlite3
from airflow.models import Connection
from sqlalchemy.engine import Engine, Inspector
from sqlalchemy.engine.url import URL

Additional imports for DataFrame operations:

# For pandas DataFrames (optional dependency)
from pandas import DataFrame as PandasDataFrame

# For polars DataFrames (optional dependency) 
from polars import DataFrame as PolarsDataFrame

Basic Usage

from airflow.providers.sqlite.hooks.sqlite import SqliteHook

# Initialize hook with connection ID
hook = SqliteHook(sqlite_conn_id='sqlite_default')

# Execute a query and get all results
results = hook.get_records("SELECT * FROM users WHERE active = ?", parameters=[True])

# Execute a query and get first result only  
first_result = hook.get_first("SELECT COUNT(*) FROM users")

# Run SQL commands (INSERT, UPDATE, DELETE)
hook.run("INSERT INTO users (name, email) VALUES (?, ?)", parameters=["John Doe", "john@example.com"])

# Get results as pandas DataFrame (requires pandas)
df = hook.get_df("SELECT * FROM users", df_type="pandas")

# Get results as polars DataFrame (requires polars)
df = hook.get_df("SELECT * FROM users", df_type="polars")

# Bulk insert multiple rows
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
hook.insert_rows(table="users", rows=rows, target_fields=["name", "email"])

# Test connection
status, message = hook.test_connection()
if status:
    print("Connection successful")
else:
    print(f"Connection failed: {message}")

Connection Configuration

SQLite connections support various URI formats:

# File-based database (relative path)
sqlite:///path/to/database.db

# File-based database (absolute path)
sqlite:////absolute/path/to/database.db

# In-memory database
sqlite:///:memory:

# With query parameters
sqlite:///path/to/db.sqlite?mode=ro
sqlite:///path/to/db.sqlite?mode=rw
sqlite:///path/to/db.sqlite?cache=shared

Capabilities

Hook Class Definition

The SqliteHook class provides SQLite database integration.

class SqliteHook(DbApiHook):
    """
    Interact with SQLite databases.
    
    Class Attributes:
        conn_name_attr: str = "sqlite_conn_id"
        default_conn_name: str = "sqlite_default"
        conn_type: str = "sqlite" 
        hook_name: str = "Sqlite"
    """
    
    def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwargs):
        """
        Initialize SQLite hook.
        
        Args:
            *args: If single positional arg provided, used as connection ID
            schema (str, optional): Database schema (typically not used with SQLite)
            log_sql (bool): Whether to log SQL statements (default: True)
            **kwargs: Additional keyword arguments, including connection ID via conn_name_attr
        """

Connection Management

Establish and manage SQLite database connections with proper URI handling.

def get_conn(self) -> sqlite3.dbapi2.Connection:
    """
    Return SQLite connection object with proper URI conversion.
    
    Converts SQLAlchemy URI format to sqlite3-compatible file URI format.
    Handles file paths, in-memory databases, and query parameters.
    
    Returns:
        sqlite3.dbapi2.Connection: SQLite database connection
    """

def get_uri(self) -> str:
    """
    Override DbApiHook get_uri method for SQLAlchemy engine compatibility.
    
    Transforms Airflow connection URI to SQLAlchemy-compatible format,
    handling SQLite-specific URI requirements.
    
    Returns:
        str: SQLAlchemy-compatible URI string  
    """

def get_conn_id(self) -> str:
    """
    Get the connection ID used by this hook.
    
    Returns:
        str: Connection ID
    """

def get_cursor(self):
    """
    Get database cursor for executing SQL statements.
    
    Returns:
        sqlite3.Cursor: Database cursor object
    """

def test_connection(self):
    """
    Test the SQLite database connection.
    
    Returns:
        tuple[bool, str]: (connection_success, status_message)
    """

SQL Execution

Execute SQL statements with parameter binding and transaction control.

def run(self, sql, autocommit: bool = False, parameters=None, handler=None, 
        split_statements: bool = False, return_last: bool = True):
    """
    Execute SQL statement(s) with optional parameter binding.
    
    Args:
        sql (str | list[str]): SQL statement(s) to execute
        autocommit (bool): Enable autocommit mode (default: False)
        parameters (list | dict, optional): Query parameters for binding
        handler (callable, optional): Result handler function
        split_statements (bool): Split multiple statements (default: False)
        return_last (bool): Return result from last statement only (default: True)
        
    Returns:
        any: Query results based on handler, or None for non-SELECT statements
    """

def get_records(self, sql: str, parameters=None) -> list[tuple]:
    """
    Execute SQL query and return all records.
    
    Args:
        sql (str): SQL query to execute
        parameters (list | dict, optional): Query parameters for binding
        
    Returns:
        list[tuple]: List of result tuples
    """

def get_first(self, sql: str, parameters=None):
    """
    Execute SQL query and return first record.
    
    Args:
        sql (str): SQL query to execute  
        parameters (list | dict, optional): Query parameters for binding
        
    Returns:
        tuple | None: First result tuple or None if no results
    """

DataFrame Operations

Convert query results to pandas or polars DataFrames for data analysis.

def get_df(self, sql: str, parameters=None, *, df_type: str = "pandas", **kwargs):
    """
    Execute SQL query and return results as DataFrame.
    
    Args:
        sql (str): SQL query to execute
        parameters (list | dict, optional): Query parameters for binding
        df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
        **kwargs: Additional arguments passed to DataFrame constructor
        
    Returns:
        PandasDataFrame | PolarsDataFrame: DataFrame with query results
    """

def get_df_by_chunks(self, sql: str, parameters=None, *, chunksize: int, 
                     df_type: str = "pandas", **kwargs):
    """
    Execute SQL query and return results as DataFrame chunks.
    
    Args:
        sql (str): SQL query to execute
        parameters (list | dict, optional): Query parameters for binding
        chunksize (int): Number of rows per chunk (required)
        df_type (str): DataFrame type - "pandas" or "polars" (default: "pandas")
        **kwargs: Additional arguments passed to DataFrame constructor
        
    Yields:
        PandasDataFrame | PolarsDataFrame: Iterator of DataFrame chunks
    """

Bulk Operations

Efficiently insert multiple rows with batching and transaction control.

def insert_rows(self, table: str, rows, target_fields=None, commit_every: int = 1000, 
                replace: bool = False, *, executemany: bool = False, 
                fast_executemany: bool = False, autocommit: bool = False, **kwargs) -> None:
    """
    Insert multiple rows into table with batching and optional replacement.
    
    Args:
        table (str): Target table name
        rows (Iterable): Collection of row tuples to insert
        target_fields (list[str], optional): Column names for insertion
        commit_every (int): Commit transaction every N rows (default: 1000)
        replace (bool): Use REPLACE INTO instead of INSERT INTO (default: False)
        executemany (bool): Use cursor.executemany() for batch insertion (default: False)
        fast_executemany (bool): Use fast executemany if supported (default: False)
        autocommit (bool): Enable autocommit mode (default: False) 
        **kwargs: Additional arguments for customization
    """

SQLAlchemy Integration

Access SQLAlchemy engines and metadata for advanced database operations.

def get_sqlalchemy_engine(self, engine_kwargs=None) -> Engine:
    """
    Get SQLAlchemy engine for advanced database operations.
    
    Args:
        engine_kwargs (dict, optional): Additional engine configuration parameters
        
    Returns:
        Engine: SQLAlchemy engine instance
    """

@property
def sqlalchemy_url(self) -> URL:
    """
    SQLAlchemy URL object for this connection.
    
    Returns:
        URL: SQLAlchemy URL object
    """

@property  
def inspector(self) -> Inspector:
    """
    SQLAlchemy Inspector for database metadata.
    
    Returns:
        Inspector: Database inspector instance
    """

Transaction Control

Manage database transactions and autocommit behavior.

def get_autocommit(self, conn) -> bool:
    """
    Get autocommit setting for connection.
    
    Args:
        conn: Database connection object
        
    Returns:
        bool: Current autocommit status
    """

def set_autocommit(self, conn, autocommit: bool) -> None:
    """
    Set autocommit flag on connection.
    
    Args:
        conn: Database connection object
        autocommit (bool): Autocommit setting to apply
    """

Properties and Utilities

Helper methods and properties for SQL operations and metadata access.

@property
def placeholder(self) -> str:
    """
    SQL parameter placeholder character for SQLite.
    
    Returns:
        str: "?" (question mark placeholder)
    """

@property
def connection(self) -> Connection:
    """
    Airflow connection object for this hook.
    
    Returns:
        Connection: Connection object instance
    """

@property
def connection_extra(self) -> dict:
    """
    Connection extra parameters as dictionary.
    
    Returns:
        dict: Extra connection parameters from connection configuration
    """

@property
def last_description(self) -> list:
    """
    Description from last executed cursor.
    
    Returns:
        list: Cursor description with column metadata
    """

@staticmethod
def split_sql_string(sql: str, strip_semicolon: bool = False) -> list[str]:
    """
    Split SQL string into individual statements.
    
    Args:
        sql (str): SQL string with multiple statements
        strip_semicolon (bool): Remove trailing semicolons (default: False)
        
    Returns:
        list[str]: List of individual SQL statements
    """

@staticmethod
def strip_sql_string(sql: str) -> str:
    """
    Strip whitespace and comments from SQL string.
    
    Args:
        sql (str): SQL string to clean
        
    Returns:
        str: Cleaned SQL string
    """

Provider Metadata

Access provider configuration and metadata.

# From airflow.providers.sqlite.get_provider_info
def get_provider_info() -> dict:
    """
    Get provider metadata including integrations and connection types.
    
    Returns:
        dict: Provider metadata containing:
            - package-name: "apache-airflow-providers-sqlite"
            - name: "SQLite" 
            - description: SQLite provider description
            - integrations: List of SQLite integration info
            - hooks: List of available hook modules
            - connection-types: List of supported connection types
    """

Types

# Type aliases for clarity
PandasDataFrame = "pandas.DataFrame"
PolarsDataFrame = "polars.DataFrame" 
Connection = "airflow.models.Connection"
Engine = "sqlalchemy.engine.Engine"
Inspector = "sqlalchemy.engine.Inspector"
URL = "sqlalchemy.engine.URL"

Error Handling

The SQLite hook handles common database errors and connection issues:

  • Connection errors: Invalid file paths, permission issues, database corruption
  • SQL errors: Syntax errors, constraint violations, table/column not found
  • Transaction errors: Deadlocks, lock timeouts, rollback scenarios
  • URI format errors: Invalid connection string formats, parameter parsing

Common error patterns:

import sqlite3
from airflow.exceptions import AirflowException

try:
    hook = SqliteHook(sqlite_conn_id='my_sqlite_conn')
    results = hook.get_records("SELECT * FROM users")
except sqlite3.Error as e:
    # Handle SQLite-specific errors
    print(f"Database error: {e}")
except AirflowException as e:
    # Handle Airflow-specific errors (connection not found, etc.)
    print(f"Airflow error: {e}")
except Exception as e:
    # Handle other errors
    print(f"General error: {e}")

Usage Examples

Working with In-Memory Databases

# Connection URI: sqlite:///:memory:
hook = SqliteHook(sqlite_conn_id='sqlite_memory')
hook.run("CREATE TABLE temp_data (id INTEGER, value TEXT)")
hook.insert_rows("temp_data", [(1, "test"), (2, "data")])
results = hook.get_records("SELECT * FROM temp_data")

File Database with Custom Parameters

# Connection URI: sqlite:///path/to/db.sqlite?mode=rw&cache=shared
hook = SqliteHook(sqlite_conn_id='sqlite_file')
results = hook.get_df("SELECT * FROM large_table", df_type="pandas")

Batch Processing with Chunked DataFrames

# Process large results in chunks to manage memory
for chunk_df in hook.get_df_by_chunks("SELECT * FROM big_table", chunksize=1000):
    # Process each chunk
    processed_chunk = chunk_df.groupby('category').sum()
    # Save or further process results
    print(f"Processed chunk with {len(chunk_df)} rows")

Transaction Management

# Manual transaction control
with hook._create_autocommit_connection(autocommit=False) as conn:
    cursor = conn.cursor()
    try:
        cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
        cursor.execute("UPDATE users SET active = 1 WHERE name = ?", ("Alice",))
        conn.commit()
    except Exception:
        conn.rollback()
        raise

Using SQLAlchemy Engine

# Get SQLAlchemy engine for advanced operations
engine = hook.get_sqlalchemy_engine()
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM users")
    for row in result:
        print(row)

Connection Configuration

  • Connection Type: sqlite
  • Hook Class: airflow.providers.sqlite.hooks.sqlite.SqliteHook
  • Supported URI Schemes: sqlite://
  • Default Connection ID: sqlite_default

Dependencies

  • Required: apache-airflow>=2.10.0, apache-airflow-providers-common-sql>=1.26.0
  • Python: >=3.10
  • Optional: pandas (for pandas DataFrame support), polars (for polars DataFrame support)

Provider Information

  • Package Name: apache-airflow-providers-sqlite
  • Provider Name: SQLite
  • Integration: SQLite database integration
  • External Documentation: https://www.sqlite.org/index.html

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-sqlite
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-sqlite@4.1.x
Publish Source
CLI
Badge
tessl/pypi-apache-airflow-providers-sqlite badge