Load data from SQL databases with automatic schema reflection, incremental loading, and support for multiple backends (SQLAlchemy, PyArrow, pandas, ConnectorX).
def sql_database(
credentials: Union[str, ConnectionStringCredentials, Engine] = None,
schema: str = None,
metadata: Any = None,
table_names: List[str] = None,
chunk_size: int = 50000,
backend: Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] = "sqlalchemy",
detect_precision_hints: bool = False,
reflection_level: Literal["minimal", "full", "full_with_precision"] = "full",
defer_table_reflect: bool = None,
table_adapter_callback: Callable = None,
backend_kwargs: dict = None,
include_views: bool = False,
type_adapter_callback: Callable = None,
query_adapter_callback: Callable = None,
resolve_foreign_keys: bool = False,
engine_adapter_callback: Callable = None
) -> Iterable[DltResource]:
"""
Loads tables from SQL database with automatic schema reflection.
Args:
credentials: Database connection:
- Connection string: "postgresql://user:pass@host:5432/db"
- ConnectionStringCredentials instance
- SQLAlchemy Engine instance
schema: Database schema name (default: database default schema)
metadata: Optional SQLAlchemy MetaData instance (overrides schema arg)
table_names: List of table names to load (None = all tables)
chunk_size: Number of rows per chunk (default: 50000)
backend: Data loading backend:
- "sqlalchemy": Standard SQLAlchemy (default)
- "pyarrow": Fast loading with PyArrow
- "pandas": Use pandas for type inference
- "connectorx": High-performance ConnectorX
detect_precision_hints: Detect precision hints for numeric columns
reflection_level: Schema introspection depth:
- "minimal": Basic schema info
- "full": Complete schema with indexes (default)
- "full_with_precision": Full schema with precision details
defer_table_reflect: Defer schema reflection until runtime
table_adapter_callback: Callback to modify table configuration
backend_kwargs: Backend-specific arguments
include_views: Include database views (default: False)
type_adapter_callback: Callback to adapt column types
query_adapter_callback: Callback to modify SELECT queries
resolve_foreign_keys: Resolve and include foreign key relationships
engine_adapter_callback: Callback to modify SQLAlchemy engine
Returns:
Iterable of DltResource instances (one per table)
Example:
pipeline.run(
sql_database(
credentials="postgresql://user:pass@localhost/db",
table_names=["users", "orders"]
)
)
"""def sql_table(
credentials: Union[str, ConnectionStringCredentials, Engine] = None,
table: str = None,
schema: str = None,
chunk_size: int = 50000,
backend: Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] = "sqlalchemy",
reflection_level: Literal["minimal", "full", "full_with_precision"] = "full",
defer_table_reflect: bool = False,
table_adapter_callback: Callable = None,
backend_kwargs: dict = None,
type_adapter_callback: Callable = None,
query_adapter_callback: Callable = None,
incremental: Incremental = None
) -> DltResource:
"""
Loads a single table from SQL database.
Args:
credentials: Database connection
table: Table name to load
schema: Database schema name
chunk_size: Rows per chunk
backend: Loading backend
reflection_level: Schema reflection depth
defer_table_reflect: Defer reflection
table_adapter_callback: Table config callback
backend_kwargs: Backend arguments
type_adapter_callback: Type adapter callback
query_adapter_callback: Query adapter callback
incremental: Incremental configuration
Returns:
DltResource for the table
Example:
pipeline.run(
sql_table(
credentials=conn_str,
table="users",
incremental=dlt.sources.incremental("updated_at")
)
)
"""def engine_from_credentials(credentials: Any) -> Engine:
"""
Creates SQLAlchemy engine from credentials.
Args:
credentials: Connection string or credentials object
Returns:
SQLAlchemy Engine instance
Example:
engine = engine_from_credentials("postgresql://localhost/db")
with engine.connect() as conn:
result = conn.execute("SELECT 1")
"""def remove_nullability_adapter(table: Any) -> Any:
"""
Table adapter that removes nullable constraints.
Args:
table: SQLAlchemy Table object
Returns:
Modified table with nullable columns
Example:
sql_database(
credentials=conn_str,
table_adapter_callback=remove_nullability_adapter
)
"""# Reflection levels
ReflectionLevel = Literal["minimal", "full", "full_with_precision"]
# Backend types
TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"]
# Adapter callbacks
TTypeAdapter = Callable[[str, str], str] # (table_name, column_type) -> dlt_type
TQueryAdapter = Callable[[Any, str], Any] # (query, table_name) -> modified_query
TTableAdapter = Callable[[Any], Any] # (table) -> modified_tableimport dlt
from dlt.sources.sql_database import sql_database
# Load all tables from PostgreSQL
pipeline = dlt.pipeline(destination="duckdb", dataset_name="postgres_replica")
source = sql_database(
credentials="postgresql://user:password@localhost:5432/mydb"
)
pipeline.run(source)source = sql_database(
credentials="mysql://user:password@localhost:3306/mydb",
table_names=["customers", "orders", "products"]
)
pipeline.run(source)from dlt.sources import incremental
# Load only new/updated records
source = sql_database(
credentials=conn_str,
table_names=["events"]
)
# Add incremental to specific table
events_resource = source.resources["events"]
events_resource.apply_hints(
incremental=incremental("updated_at"),
primary_key="id"
)
pipeline.run(source)from dlt.sources.sql_database import sql_table
# Load single table with incremental
users = sql_table(
credentials=conn_str,
table="users",
chunk_size=10000,
incremental=dlt.sources.incremental("updated_at")
)
pipeline.run(users, write_disposition="merge", primary_key="id")# PostgreSQL with schema
source = sql_database(
credentials="postgresql://localhost/db",
schema="analytics",
table_names=["sales", "revenue"]
)
pipeline.run(source)# Fast loading with PyArrow
source = sql_database(
credentials=conn_str,
backend="pyarrow",
chunk_size=100000 # Larger chunks with PyArrow
)
pipeline.run(source)# High-performance loading
source = sql_database(
credentials=conn_str,
backend="connectorx",
backend_kwargs={
"partition_on": "id",
"partition_num": 4 # Parallel partitions
}
)
pipeline.run(source)def custom_type_adapter(table_name: str, column_type: str) -> str:
"""
Maps database types to dlt types.
"""
if "json" in column_type.lower():
return "json"
if "uuid" in column_type.lower():
return "text"
return None # Use default mapping
source = sql_database(
credentials=conn_str,
type_adapter_callback=custom_type_adapter
)
pipeline.run(source)def query_adapter(query, table_name: str):
"""
Modifies SELECT queries.
"""
# Add WHERE clause for soft-deleted records
if table_name in ["users", "orders"]:
from sqlalchemy import text
return query.where(text("deleted_at IS NULL"))
return query
source = sql_database(
credentials=conn_str,
query_adapter_callback=query_adapter
)
pipeline.run(source)def table_adapter(table):
"""
Modifies table configuration.
"""
# Remove nullable constraints
for column in table.columns:
column.nullable = True
return table
source = sql_database(
credentials=conn_str,
table_adapter_callback=table_adapter
)
pipeline.run(source)# Load both tables and views
source = sql_database(
credentials=conn_str,
include_views=True
)
pipeline.run(source)# PostgreSQL
"postgresql://user:password@host:5432/database"
"postgresql+psycopg2://user:password@host/database"
# MySQL
"mysql://user:password@host:3306/database"
"mysql+pymysql://user:password@host/database"
# SQL Server
"mssql+pyodbc://user:password@host/database?driver=ODBC+Driver+17+for+SQL+Server"
# SQLite
"sqlite:///path/to/database.db"
# Oracle
"oracle+cx_oracle://user:password@host:1521/?service_name=service"from dlt.sources.credentials import ConnectionStringCredentials
credentials = ConnectionStringCredentials(
"postgresql://user:password@localhost:5432/mydb"
)
source = sql_database(credentials=credentials)
pipeline.run(source)from sqlalchemy import create_engine
engine = create_engine("postgresql://localhost/db")
source = sql_database(
credentials=engine,
table_names=["users"]
)
pipeline.run(source)# Faster for large schemas
source = sql_database(
credentials=conn_str,
reflection_level="minimal" # Skip indexes and constraints
)
pipeline.run(source)# Defer schema reflection until runtime (faster initialization)
source = sql_database(
credentials=conn_str,
defer_table_reflect=True
)
pipeline.run(source)# Select tables matching pattern
source = sql_database(credentials=conn_str)
# Filter to only fact tables
fact_tables = [name for name in source.resources.keys() if name.startswith("fact_")]
selected_source = source.with_resources(*fact_tables)
pipeline.run(selected_source)from dlt.sources.sql_database import sql_table
# Load with merge (upsert)
users = sql_table(
credentials=conn_str,
table="users",
incremental=dlt.sources.incremental("updated_at")
)
pipeline.run(
users,
write_disposition="merge",
primary_key="id"
)source = sql_database(credentials=conn_str)
# Configure each table differently
source.resources["users"].apply_hints(
write_disposition="merge",
primary_key="id",
incremental=dlt.sources.incremental("updated_at")
)
source.resources["logs"].apply_hints(
write_disposition="append",
incremental=dlt.sources.incremental("created_at")
)
source.resources["config"].apply_hints(
write_disposition="replace"
)
pipeline.run(source)| Backend | Speed | Features | Use Case |
|---|---|---|---|
| sqlalchemy | Medium | Full features | Default, good balance |
| pyarrow | Fast | Efficient types | Large tables |
| pandas | Medium | Type inference | Complex types |
| connectorx | Very Fast | Parallel loads | Very large tables |