A Python-first scalable data loading library for building ELT pipelines with automatic schema inference and 20+ destination support.
npx @tessl/cli install tessl/pypi-dlt@1.19.0A Python-first scalable data loading library that enables developers to build production-ready ELT (Extract, Load, Transform) data pipelines without backend infrastructure. dlt provides automatic schema inference, incremental loading, schema evolution, and seamless integration with 20+ data warehouses and destinations.
pip install dltimport dltCommon imports for building pipelines:
from dlt import pipeline, resource, source, transformer, defer
from dlt.sources import incrementalimport dlt
# Define a simple data resource
@dlt.resource
def users():
for i in range(100):
yield {"id": i, "name": f"user_{i}"}
# Create and run a pipeline
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="my_dataset"
)
# Load the data
load_info = pipeline.run(users())
print(load_info)dlt follows the Extract-Normalize-Load (ENL) pattern:
Key architectural components:
@dlt.source)@dlt.resource)Create, configure, and execute data pipelines with full control over extract, normalize, and load phases.
def pipeline(
pipeline_name: str = None,
pipelines_dir: str = None,
pipeline_salt: str = None,
destination: Any = None,
staging: Any = None,
dataset_name: str = None,
import_schema_path: str = None,
export_schema_path: str = None,
full_refresh: bool = None,
dev_mode: bool = False,
refresh: str = None,
progress: Any = None,
**kwargs
) -> Pipeline:
"""
Creates or retrieves a pipeline instance.
Args:
pipeline_name: Unique pipeline identifier
pipelines_dir: Directory for pipeline storage
pipeline_salt: Salt for hashing sensitive data
destination: Destination name or instance (e.g., "duckdb", "bigquery")
staging: Staging destination for staging-aware destinations
dataset_name: Target dataset/schema name
import_schema_path: Path to import schema from
export_schema_path: Path to export schema to
full_refresh: Deprecated, use dev_mode instead
dev_mode: Enable development mode (fresh start on each run)
refresh: Refresh mode ("drop_sources", "drop_resources", "drop_data")
progress: Progress collector ("log", "tqdm", "enlighten", "alive_progress")
**kwargs: Additional destination-specific parameters
Returns:
Pipeline instance
"""def run(
data: Any,
*,
destination: Any = None,
staging: Any = None,
dataset_name: str = None,
table_name: str = None,
write_disposition: str = None,
columns: Any = None,
schema: Schema = None,
loader_file_format: str = None,
table_format: str = None,
schema_contract: Any = None,
refresh: str = None
) -> LoadInfo:
"""
Shorthand to run a pipeline with data in one call.
Args:
data: Data to load (resource, source, or Python data)
destination: Destination name or instance
staging: Staging destination name or instance
dataset_name: Target dataset/schema name
table_name: Target table name
write_disposition: How to write data ("append", "replace", "merge", "skip")
columns: Column definitions or hints
schema: Custom schema
loader_file_format: File format for staging ("jsonl", "parquet", "csv", "insert_values")
table_format: Table format ("iceberg", "delta", "hive")
schema_contract: Schema evolution rules
refresh: Refresh mode ("drop_sources", "drop_resources", "drop_data")
Returns:
LoadInfo with load statistics
"""def attach(
pipeline_name: str = None,
pipelines_dir: str = None,
pipeline_salt: str = None
) -> Pipeline:
"""
Attaches to an existing pipeline.
Args:
pipeline_name: Pipeline name
pipelines_dir: Pipelines directory
pipeline_salt: Pipeline salt
Returns:
Attached Pipeline instance
"""def current() -> Pipeline:
"""
Gets the currently active pipeline.
Returns:
Current Pipeline instance or None
"""Declarative decorators for defining data sources and resources with automatic schema inference.
def source(
func: Callable = None,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
schema_contract: Any = None,
spec: type = None,
_impl_cls: type = None
) -> Callable:
"""
Decorates a function as a data source.
Args:
func: Function that returns resource(s)
name: Source name (defaults to function name)
section: Configuration section name
max_table_nesting: Maximum nesting level for flattening
root_key: Add root key to nested data
schema: Custom schema
schema_contract: Schema evolution rules
spec: Configuration spec class
_impl_cls: Implementation class (internal)
Returns:
Decorated source function
"""def resource(
data: Any = None,
name: str = None,
table_name: str = None,
write_disposition: str = None,
columns: Any = None,
primary_key: str = None,
merge_key: str = None,
schema_contract: Any = None,
table_format: str = None,
file_format: str = None,
references: Any = None,
depends_on: Any = None,
selected: bool = True,
spec: type = None,
standalone: bool = False,
data_from: Any = None,
parallelized: bool = False
) -> Callable:
"""
Decorates a function or data as a resource.
Args:
data: Data to wrap (generator, iterable, or function)
name: Resource name
table_name: Target table name
write_disposition: Write behavior ("append", "replace", "merge", "skip")
columns: Column hints
primary_key: Primary key column(s)
merge_key: Merge key for deduplication
schema_contract: Schema evolution rules
table_format: Table format ("iceberg", "delta", "hive")
file_format: Staging file format
references: Foreign key references
depends_on: Resource dependencies
selected: Whether resource is selected for loading
spec: Configuration spec
standalone: Create standalone resource
data_from: Resource to transform data from
parallelized: Enable parallel execution
Returns:
Decorated resource or DltResource instance
"""def transformer(
f: Callable = None,
data_from: Any = None,
name: str = None,
**resource_kwargs
) -> Callable:
"""
Decorates a function as a data transformer.
Args:
f: Function that transforms data
data_from: Source resource or data
name: Transformer name
**resource_kwargs: Same as resource() decorator
Returns:
Decorated transformer function
"""def destination(
func: Callable = None,
batch_size: int = 10,
loader_file_format: str = None,
name: str = None,
naming_convention: str = None,
skip_dlt_columns_and_tables: bool = True,
max_table_nesting: int = 0,
spec: type = None
) -> Callable:
"""
Decorates a function as a custom destination.
Args:
func: Function that handles data batches
batch_size: Number of items per batch
loader_file_format: File format for staging
name: Destination name
naming_convention: Naming convention to use
skip_dlt_columns_and_tables: Skip dlt system tables
max_table_nesting: Maximum nesting level
spec: Configuration spec
Returns:
Decorated destination function
"""Configure incremental data loading with cursor-based tracking, deduplication, and state management.
class Incremental:
"""
Configures incremental loading for a resource.
Args:
cursor_path: JSON path to cursor field (e.g., "updated_at", "$.id")
initial_value: Starting cursor value
last_value_func: Function to get last value (max, min, or custom)
primary_key: Primary key for deduplication
end_value: End cursor value for bounded loading
row_order: Row order ("asc" or "desc")
allow_external_schedulers: Enable Airflow integration
on_cursor_value_missing: Action when cursor missing ("raise", "include", "exclude")
lag: Lag/attribution window (timedelta or numeric)
range_start: Range start boundary ("open" or "closed")
range_end: Range end boundary ("open" or "closed")
"""
def __init__(
self,
cursor_path: str,
initial_value: Any = None,
last_value_func: Callable = max,
primary_key: str = None,
end_value: Any = None,
row_order: str = "asc",
allow_external_schedulers: bool = False,
on_cursor_value_missing: str = "raise",
lag: Any = None,
range_start: str = "closed",
range_end: str = "closed"
): ...
@property
def last_value(self) -> Any:
"""Current cursor value"""Ready-to-use sources for common data loading scenarios.
Load files from local filesystem or cloud storage (S3, GCS, Azure).
def filesystem(
bucket_url: str,
credentials: Any = None,
file_glob: str = "*",
files_per_page: int = None,
extract_content: bool = False,
kwargs: dict = None,
client_kwargs: dict = None,
incremental: Incremental = None
) -> DltResource:
"""
Lists and loads files from filesystem or cloud storage.
Args:
bucket_url: Bucket URL (e.g., "s3://bucket", "gs://bucket", "az://container")
credentials: Cloud credentials or fsspec AbstractFileSystem instance
file_glob: Glob pattern for file filtering
files_per_page: Files per page for pagination (default: 100)
extract_content: Whether to extract file content
kwargs: Additional arguments passed to fsspec constructor
client_kwargs: Additional arguments for fsspec native client
incremental: Incremental loading configuration (e.g., by modification_date)
Returns:
DltResource with file items
"""def readers(
bucket_url: str,
credentials: Any = None,
file_glob: str = "*",
kwargs: dict = None,
client_kwargs: dict = None,
incremental: Incremental = None
) -> Tuple[DltResource, ...]:
"""
Source providing chunked file readers for various formats.
Args:
bucket_url: Bucket URL
credentials: Cloud credentials or fsspec AbstractFileSystem instance
file_glob: Glob pattern for file filtering
kwargs: Additional arguments for fsspec constructor
client_kwargs: Additional arguments for fsspec native client
incremental: Incremental loading configuration
Returns:
Tuple of resources: (read_csv, read_jsonl, read_parquet, read_csv_duckdb)
"""# File reader transformers
def read_csv(chunksize: int = None, **pandas_kwargs) -> DltResource: ...
def read_jsonl(chunksize: int = None) -> DltResource: ...
def read_parquet(chunksize: int = None) -> DltResource: ...
def read_csv_duckdb(chunksize: int = None, **duckdb_kwargs) -> DltResource: ...Load data from SQL databases with automatic schema inference.
def sql_database(
credentials: Any = None,
schema: str = None,
metadata: Any = None,
table_names: List[str] = None,
chunk_size: int = 50000,
backend: str = "sqlalchemy",
detect_precision_hints: bool = False,
reflection_level: str = "full",
defer_table_reflect: bool = None,
table_adapter_callback: Callable = None,
backend_kwargs: dict = None,
include_views: bool = False,
type_adapter_callback: Callable = None,
query_adapter_callback: Callable = None,
resolve_foreign_keys: bool = False,
engine_adapter_callback: Callable = None
) -> Iterable[DltResource]:
"""
Loads data from SQL database tables.
Args:
credentials: Database credentials, connection string, or SQLAlchemy Engine
schema: Database schema name
metadata: Optional SQLAlchemy MetaData instance (ignores schema arg)
table_names: List of tables to load (None = all tables)
chunk_size: Rows per chunk
backend: Backend ("sqlalchemy", "pyarrow", "pandas", "connectorx")
detect_precision_hints: Detect precision hints for numeric columns
reflection_level: Schema reflection ("minimal", "full", "full_with_precision")
defer_table_reflect: Defer reflection until runtime
table_adapter_callback: Callback to modify table config
backend_kwargs: Backend-specific arguments
include_views: Include database views
type_adapter_callback: Callback to adapt data types
query_adapter_callback: Callback to modify queries
resolve_foreign_keys: Resolve foreign key relationships
engine_adapter_callback: Callback to modify SQLAlchemy engine
Returns:
Iterable of DltResource instances (one per table)
"""def sql_table(
credentials: Any = None,
table: str = None,
schema: str = None,
metadata: Any = None,
incremental: Incremental = None,
chunk_size: int = 50000,
backend: str = "sqlalchemy",
detect_precision_hints: bool = None,
reflection_level: str = "full",
defer_table_reflect: bool = None,
table_adapter_callback: Callable = None,
backend_kwargs: dict = None,
type_adapter_callback: Callable = None,
included_columns: List[str] = None,
excluded_columns: List[str] = None,
query_adapter_callback: Callable = None,
resolve_foreign_keys: bool = False,
engine_adapter_callback: Callable = None,
write_disposition: str = "append",
primary_key: Any = None,
merge_key: Any = None
) -> DltResource:
"""
Loads data from a single SQL database table.
Args:
credentials: Database credentials, connection string, or SQLAlchemy Engine
table: Table name to load
schema: Database schema name
metadata: Optional SQLAlchemy MetaData instance
incremental: Incremental loading configuration
chunk_size: Rows per chunk
backend: Backend ("sqlalchemy", "pyarrow", "pandas", "connectorx")
detect_precision_hints: Detect precision hints for numeric columns
reflection_level: Schema reflection level
defer_table_reflect: Defer reflection until runtime
table_adapter_callback: Callback to modify table config
backend_kwargs: Backend-specific arguments
type_adapter_callback: Callback to adapt data types
included_columns: Columns to include
excluded_columns: Columns to exclude
query_adapter_callback: Callback to modify queries
resolve_foreign_keys: Resolve foreign key relationships
engine_adapter_callback: Callback to modify SQLAlchemy engine
write_disposition: Write mode ("append", "replace", "merge")
primary_key: Primary key column(s)
merge_key: Merge key column(s)
Returns:
DltResource for the table
"""Declarative REST API loading with automatic pagination.
def rest_api(
client: dict = None,
resources: List[Union[str, dict, Any]] = None,
resource_defaults: dict = None
) -> List[DltResource]:
"""
Declaratively loads data from REST APIs.
Args:
client: REST client configuration (base_url, auth, etc.)
resources: List of endpoint resources to load
resource_defaults: Default configuration for all resources
Returns:
List of DltResource instances
"""def rest_api_source(
config: RESTAPIConfig,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = None,
schema: Schema = None,
schema_contract: Any = None,
parallelized: bool = False
) -> DltSource:
"""
Creates a REST API source with full source configuration options.
Args:
config: REST API configuration dictionary
name: Source name
section: Configuration section
max_table_nesting: Maximum nesting level
root_key: Add root key to nested tables
schema: Custom schema
schema_contract: Schema evolution rules
parallelized: Enable parallel extraction
Returns:
DltSource instance
"""def rest_api_resources(config: RESTAPIConfig) -> List[DltResource]:
"""
Creates a list of resources from REST API configuration.
Args:
config: REST API configuration dictionary
Returns:
List of DltResource instances
"""20+ supported destinations including data warehouses, databases, data lakes, and vector databases.
# Relational Databases
def postgres(credentials: Any = None, **kwargs) -> Destination: ...
def duckdb(credentials: str = None, **kwargs) -> Destination: ...
def mssql(credentials: Any = None, **kwargs) -> Destination: ...
def clickhouse(credentials: Any = None, **kwargs) -> Destination: ...
def sqlalchemy(credentials: Any = None, **kwargs) -> Destination: ...
# Cloud Data Warehouses
def snowflake(credentials: Any = None, **kwargs) -> Destination: ...
def bigquery(credentials: Any = None, **kwargs) -> Destination: ...
def redshift(credentials: Any = None, **kwargs) -> Destination: ...
def athena(credentials: Any = None, **kwargs) -> Destination: ...
def synapse(credentials: Any = None, **kwargs) -> Destination: ...
def databricks(credentials: Any = None, **kwargs) -> Destination: ...
def dremio(credentials: Any = None, **kwargs) -> Destination: ...
# Data Lakes
def filesystem(bucket_url: str = None, credentials: Any = None, **kwargs) -> Destination: ...
def ducklake(credentials: str = None, **kwargs) -> Destination: ...
def motherduck(credentials: Any = None, **kwargs) -> Destination: ...
# Vector Databases
def weaviate(credentials: Any = None, **kwargs) -> Destination: ...
def qdrant(credentials: Any = None, location: str = None, **kwargs) -> Destination: ...
def lancedb(credentials: Any = None, **kwargs) -> Destination: ...
# Testing
def dummy(mode: str = "normal", **kwargs) -> Destination: ...Hierarchical configuration system with multiple providers (TOML files, environment variables, secrets).
# Configuration accessors
config: Any # Dictionary-like access to configuration
secrets: Any # Dictionary-like access to secretsdef configspec(cls: type = None, **kwargs) -> type:
"""
Decorator marking a class as a configuration specification.
Args:
cls: Class to decorate
**kwargs: Configuration options
Returns:
Decorated configuration class
"""def with_config(
func: Callable = None,
sections: tuple = None,
auto_pipeline_section: bool = False,
**kwargs
) -> Callable:
"""
Decorator that injects configuration into function parameters.
Args:
func: Function to decorate
sections: Configuration sections to search
auto_pipeline_section: Automatically use pipeline section
**kwargs: Additional options
Returns:
Decorated function with injected config
"""def configspec(cls: type = None, **kwargs) -> type:
"""
Marks a class as a configuration specification.
Args:
cls: Class to mark as configspec
**kwargs: Configuration options
Returns:
Decorated configuration class
"""Automatic schema inference, evolution, and validation with configurable contracts.
class Schema:
"""
Manages data schemas with tables, columns, and evolution rules.
Attributes:
name: Schema name
tables: Dictionary of table schemas
"""Integration helpers for dbt, Airflow, and data visualization.
# dbt integration
from dlt.helpers.dbt import create_venv, package_runner, DBTPackageRunner
from dlt.helpers.dbt_cloud import run_dbt_cloud_job, get_dbt_cloud_run_status
# Schema visualization
from dlt.helpers import graphviz, dbml, mermaidQuery loaded data using a pandas-like interface with SQL translation.
def dataset(
pipeline: Pipeline = None,
dataset_name: str = None,
dataset_type: str = "auto"
) -> Dataset:
"""
Creates a dataset query interface for loaded data.
Args:
pipeline: Pipeline instance (None = current pipeline)
dataset_name: Dataset name (None = pipeline dataset)
dataset_type: Dataset type ("auto", "ibis", "default")
Returns:
Dataset instance for querying tables
"""class Dataset:
"""
Dataset interface for querying loaded data.
Attributes:
dataset_name: Name of the dataset
schema_name: Schema name
Methods:
Access tables as attributes (e.g., dataset.users)
"""class Relation:
"""
Represents a queryable table with pandas-like operations.
Methods:
select(*columns): Select columns
filter(condition): Filter rows
limit(n): Limit results
head(n): Get first n rows
df(): Convert to pandas DataFrame
"""Integration with dbt (data build tool) for transformations.
# dbt package execution
from dlt.pipeline.dbt import get_venv, run_dbt_package
def get_venv(
pipeline: Pipeline,
venv_path: str = "dbt",
dbt_version: str = None
) -> Any:
"""
Creates or restores virtual environment for dbt execution.
Args:
pipeline: Pipeline instance
venv_path: Path to virtual environment
dbt_version: dbt version to install
Returns:
Virtual environment instance
"""# dbt cloud integration
from dlt.helpers.dbt_cloud import run_dbt_cloud_job, get_dbt_cloud_run_statusMark data with special processing instructions.
from dlt import mark
def mark.with_table_name(data: Any, table_name: str) -> Any:
"""
Marks data to be loaded to a specific table.
Args:
data: Data to mark
table_name: Target table name
Returns:
Marked data
"""def mark.with_hints(
data: Any,
hints: dict = None,
create_table_variant: bool = False
) -> Any:
"""
Marks data with column or table hints.
Args:
data: Data to mark
hints: Column or table hints dictionary
create_table_variant: Create table variant
Returns:
Marked data
"""def mark.with_file_import(
file_path: str,
table_name: str = None,
**kwargs
) -> Any:
"""
Marks a file for direct import.
Args:
file_path: Path to file
table_name: Target table name
**kwargs: Additional arguments
Returns:
File import marker
"""def mark.make_hints(
columns: dict = None,
primary_key: Any = None,
merge_key: Any = None,
**kwargs
) -> dict:
"""
Creates a hints dictionary.
Args:
columns: Column hints
primary_key: Primary key column(s)
merge_key: Merge key column(s)
**kwargs: Additional hints
Returns:
Hints dictionary
"""Track pipeline execution progress with various collectors.
from dlt import progress
# Progress collectors
progress.tqdm() # tqdm progress bars
progress.log() # Log-based progress
progress.enlighten() # Enlighten progress bars
progress.alive_progress() # Alive progress barsfrom dlt import __version__
__version__: str # Package version stringfrom dlt import hub
# Access to verified sources from dltHub
# Requires dlthub package: pip install dlthub# Secret values
TSecretValue = Any # Marks values as secrets
TSecretStrValue = str # String secrets
# Credentials
TCredentials = Any # Generic credentials type
# Data items
TDataItem = Any # Single data item (dict, object, etc.)
TDataItems = Union[TDataItem, List[TDataItem]] # Single or multiple items
# Write dispositions
TWriteDisposition = Literal["append", "replace", "merge", "skip"]
# File formats
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "parquet", "csv", "insert_values"]
# Table formats
TTableFormat = Literal["iceberg", "delta", "hive"]
# Data types
TDataType = Literal[
"text", "bigint", "double", "bool", "timestamp", "date", "time",
"decimal", "wei", "json", "binary", "complex"
]
# Column hints
TColumnHint = Literal[
"primary_key", "merge_key", "unique", "partition", "cluster", "sort",
"nullable", "dedup_sort", "root_key", "parent_key", "row_key", "hard_delete"
]class DltSource:
"""
Represents a data source containing one or more resources.
Attributes:
name: Source name
schema: Source schema
resources: Dictionary of resources
selected_resources: Currently selected resources
Methods:
with_resources(*resource_names): Select specific resources
add_limit(limit): Add row limit to all resources
"""class DltResource:
"""
Represents a single data resource (table).
Attributes:
name: Resource name
write_disposition: Write mode
selected: Whether resource is selected
Methods:
add_filter(filter_func): Add filter function
add_map(map_func): Add transformation function
add_yield_map(map_func): Add yield-based transformation
add_limit(limit): Limit number of items
apply_hints(**hints): Apply column/table hints
"""RESTAPIConfig = TypedDict("RESTAPIConfig", {
"client": dict, # Client configuration
"resources": List[Union[str, dict]], # Resource definitions
"resource_defaults": dict # Default resource settings
}, total=False)TColumnSchema = TypedDict("TColumnSchema", {
"name": str,
"data_type": TDataType,
"nullable": bool,
# ... additional column properties
}, total=False)
TTableSchema = TypedDict("TTableSchema", {
"name": str,
"columns": Dict[str, TColumnSchema],
"write_disposition": TWriteDisposition,
# ... additional table properties
}, total=False)