or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

source-sql-database.mddocs/

SQL Database Source

Load data from SQL databases with automatic schema reflection, incremental loading, and support for multiple backends (SQLAlchemy, PyArrow, pandas, ConnectorX).

Capabilities

SQL Database Source

def sql_database(
    credentials: Union[str, ConnectionStringCredentials, Engine] = None,
    schema: str = None,
    metadata: Any = None,
    table_names: List[str] = None,
    chunk_size: int = 50000,
    backend: Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] = "sqlalchemy",
    detect_precision_hints: bool = False,
    reflection_level: Literal["minimal", "full", "full_with_precision"] = "full",
    defer_table_reflect: bool = None,
    table_adapter_callback: Callable = None,
    backend_kwargs: dict = None,
    include_views: bool = False,
    type_adapter_callback: Callable = None,
    query_adapter_callback: Callable = None,
    resolve_foreign_keys: bool = False,
    engine_adapter_callback: Callable = None
) -> Iterable[DltResource]:
    """
    Loads tables from SQL database with automatic schema reflection.

    Args:
        credentials: Database connection:
            - Connection string: "postgresql://user:pass@host:5432/db"
            - ConnectionStringCredentials instance
            - SQLAlchemy Engine instance
        schema: Database schema name (default: database default schema)
        metadata: Optional SQLAlchemy MetaData instance (overrides schema arg)
        table_names: List of table names to load (None = all tables)
        chunk_size: Number of rows per chunk (default: 50000)
        backend: Data loading backend:
            - "sqlalchemy": Standard SQLAlchemy (default)
            - "pyarrow": Fast loading with PyArrow
            - "pandas": Use pandas for type inference
            - "connectorx": High-performance ConnectorX
        detect_precision_hints: Detect precision hints for numeric columns
        reflection_level: Schema introspection depth:
            - "minimal": Basic schema info
            - "full": Complete schema with indexes (default)
            - "full_with_precision": Full schema with precision details
        defer_table_reflect: Defer schema reflection until runtime
        table_adapter_callback: Callback to modify table configuration
        backend_kwargs: Backend-specific arguments
        include_views: Include database views (default: False)
        type_adapter_callback: Callback to adapt column types
        query_adapter_callback: Callback to modify SELECT queries
        resolve_foreign_keys: Resolve and include foreign key relationships
        engine_adapter_callback: Callback to modify SQLAlchemy engine

    Returns:
        Iterable of DltResource instances (one per table)

    Example:
        pipeline.run(
            sql_database(
                credentials="postgresql://user:pass@localhost/db",
                table_names=["users", "orders"]
            )
        )
    """

SQL Table Resource

def sql_table(
    credentials: Union[str, ConnectionStringCredentials, Engine] = None,
    table: str = None,
    schema: str = None,
    chunk_size: int = 50000,
    backend: Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] = "sqlalchemy",
    reflection_level: Literal["minimal", "full", "full_with_precision"] = "full",
    defer_table_reflect: bool = False,
    table_adapter_callback: Callable = None,
    backend_kwargs: dict = None,
    type_adapter_callback: Callable = None,
    query_adapter_callback: Callable = None,
    incremental: Incremental = None
) -> DltResource:
    """
    Loads a single table from SQL database.

    Args:
        credentials: Database connection
        table: Table name to load
        schema: Database schema name
        chunk_size: Rows per chunk
        backend: Loading backend
        reflection_level: Schema reflection depth
        defer_table_reflect: Defer reflection
        table_adapter_callback: Table config callback
        backend_kwargs: Backend arguments
        type_adapter_callback: Type adapter callback
        query_adapter_callback: Query adapter callback
        incremental: Incremental configuration

    Returns:
        DltResource for the table

    Example:
        pipeline.run(
            sql_table(
                credentials=conn_str,
                table="users",
                incremental=dlt.sources.incremental("updated_at")
            )
        )
    """

Utility Functions

def engine_from_credentials(credentials: Any) -> Engine:
    """
    Creates SQLAlchemy engine from credentials.

    Args:
        credentials: Connection string or credentials object

    Returns:
        SQLAlchemy Engine instance

    Example:
        engine = engine_from_credentials("postgresql://localhost/db")
        with engine.connect() as conn:
            result = conn.execute("SELECT 1")
    """
def remove_nullability_adapter(table: Any) -> Any:
    """
    Table adapter that removes nullable constraints.

    Args:
        table: SQLAlchemy Table object

    Returns:
        Modified table with nullable columns

    Example:
        sql_database(
            credentials=conn_str,
            table_adapter_callback=remove_nullability_adapter
        )
    """

Type Definitions

# Reflection levels
ReflectionLevel = Literal["minimal", "full", "full_with_precision"]

# Backend types
TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"]

# Adapter callbacks
TTypeAdapter = Callable[[str, str], str]  # (table_name, column_type) -> dlt_type
TQueryAdapter = Callable[[Any, str], Any]  # (query, table_name) -> modified_query
TTableAdapter = Callable[[Any], Any]  # (table) -> modified_table

Usage Examples

Load All Tables

import dlt
from dlt.sources.sql_database import sql_database

# Load all tables from PostgreSQL
pipeline = dlt.pipeline(destination="duckdb", dataset_name="postgres_replica")

source = sql_database(
    credentials="postgresql://user:password@localhost:5432/mydb"
)

pipeline.run(source)

Load Specific Tables

source = sql_database(
    credentials="mysql://user:password@localhost:3306/mydb",
    table_names=["customers", "orders", "products"]
)

pipeline.run(source)

Incremental Loading

from dlt.sources import incremental

# Load only new/updated records
source = sql_database(
    credentials=conn_str,
    table_names=["events"]
)

# Add incremental to specific table
events_resource = source.resources["events"]
events_resource.apply_hints(
    incremental=incremental("updated_at"),
    primary_key="id"
)

pipeline.run(source)

Using sql_table for Single Table

from dlt.sources.sql_database import sql_table

# Load single table with incremental
users = sql_table(
    credentials=conn_str,
    table="users",
    chunk_size=10000,
    incremental=dlt.sources.incremental("updated_at")
)

pipeline.run(users, write_disposition="merge", primary_key="id")

With Specific Schema

# PostgreSQL with schema
source = sql_database(
    credentials="postgresql://localhost/db",
    schema="analytics",
    table_names=["sales", "revenue"]
)

pipeline.run(source)

Using PyArrow Backend

# Fast loading with PyArrow
source = sql_database(
    credentials=conn_str,
    backend="pyarrow",
    chunk_size=100000  # Larger chunks with PyArrow
)

pipeline.run(source)

Using ConnectorX Backend

# High-performance loading
source = sql_database(
    credentials=conn_str,
    backend="connectorx",
    backend_kwargs={
        "partition_on": "id",
        "partition_num": 4  # Parallel partitions
    }
)

pipeline.run(source)

Custom Type Mapping

def custom_type_adapter(table_name: str, column_type: str) -> str:
    """
    Maps database types to dlt types.
    """
    if "json" in column_type.lower():
        return "json"
    if "uuid" in column_type.lower():
        return "text"
    return None  # Use default mapping

source = sql_database(
    credentials=conn_str,
    type_adapter_callback=custom_type_adapter
)

pipeline.run(source)

Custom Query Modification

def query_adapter(query, table_name: str):
    """
    Modifies SELECT queries.
    """
    # Add WHERE clause for soft-deleted records
    if table_name in ["users", "orders"]:
        from sqlalchemy import text
        return query.where(text("deleted_at IS NULL"))
    return query

source = sql_database(
    credentials=conn_str,
    query_adapter_callback=query_adapter
)

pipeline.run(source)

Table Configuration Callback

def table_adapter(table):
    """
    Modifies table configuration.
    """
    # Remove nullable constraints
    for column in table.columns:
        column.nullable = True
    return table

source = sql_database(
    credentials=conn_str,
    table_adapter_callback=table_adapter
)

pipeline.run(source)

Including Views

# Load both tables and views
source = sql_database(
    credentials=conn_str,
    include_views=True
)

pipeline.run(source)

Connection String Formats

# PostgreSQL
"postgresql://user:password@host:5432/database"
"postgresql+psycopg2://user:password@host/database"

# MySQL
"mysql://user:password@host:3306/database"
"mysql+pymysql://user:password@host/database"

# SQL Server
"mssql+pyodbc://user:password@host/database?driver=ODBC+Driver+17+for+SQL+Server"

# SQLite
"sqlite:///path/to/database.db"

# Oracle
"oracle+cx_oracle://user:password@host:1521/?service_name=service"

Using Credentials Object

from dlt.sources.credentials import ConnectionStringCredentials

credentials = ConnectionStringCredentials(
    "postgresql://user:password@localhost:5432/mydb"
)

source = sql_database(credentials=credentials)
pipeline.run(source)

With SQLAlchemy Engine

from sqlalchemy import create_engine

engine = create_engine("postgresql://localhost/db")

source = sql_database(
    credentials=engine,
    table_names=["users"]
)

pipeline.run(source)

Minimal Reflection

# Faster for large schemas
source = sql_database(
    credentials=conn_str,
    reflection_level="minimal"  # Skip indexes and constraints
)

pipeline.run(source)

Deferred Reflection

# Defer schema reflection until runtime (faster initialization)
source = sql_database(
    credentials=conn_str,
    defer_table_reflect=True
)

pipeline.run(source)

Table Selection by Pattern

# Select tables matching pattern
source = sql_database(credentials=conn_str)

# Filter to only fact tables
fact_tables = [name for name in source.resources.keys() if name.startswith("fact_")]
selected_source = source.with_resources(*fact_tables)

pipeline.run(selected_source)

Merge Write Disposition

from dlt.sources.sql_database import sql_table

# Load with merge (upsert)
users = sql_table(
    credentials=conn_str,
    table="users",
    incremental=dlt.sources.incremental("updated_at")
)

pipeline.run(
    users,
    write_disposition="merge",
    primary_key="id"
)

Multiple Tables with Different Settings

source = sql_database(credentials=conn_str)

# Configure each table differently
source.resources["users"].apply_hints(
    write_disposition="merge",
    primary_key="id",
    incremental=dlt.sources.incremental("updated_at")
)

source.resources["logs"].apply_hints(
    write_disposition="append",
    incremental=dlt.sources.incremental("created_at")
)

source.resources["config"].apply_hints(
    write_disposition="replace"
)

pipeline.run(source)

Supported Databases

  • PostgreSQL
  • MySQL / MariaDB
  • Microsoft SQL Server
  • Oracle
  • SQLite
  • Redshift
  • Snowflake (via SQLAlchemy)
  • Any SQLAlchemy-compatible database

Backend Comparison

BackendSpeedFeaturesUse Case
sqlalchemyMediumFull featuresDefault, good balance
pyarrowFastEfficient typesLarge tables
pandasMediumType inferenceComplex types
connectorxVery FastParallel loadsVery large tables