dlt supports 20+ destinations including data warehouses, databases, data lakes, and vector databases.
def postgres(
credentials: Union[str, ConnectionStringCredentials] = None,
create_indexes: bool = True,
**kwargs
) -> Destination:
"""
PostgreSQL destination.
Args:
credentials: Connection string or credentials
create_indexes: Create indexes for hints
**kwargs: Additional destination parameters
Example:
pipeline = dlt.pipeline(
destination=dlt.destinations.postgres("postgresql://localhost/db"),
dataset_name="my_data"
)
"""def duckdb(
credentials: str = None,
create_indexes: bool = True,
**kwargs
) -> Destination:
"""
DuckDB destination (embedded analytical database).
Args:
credentials: Database file path (":memory:" for in-memory)
create_indexes: Create indexes
**kwargs: Additional parameters
Example:
pipeline = dlt.pipeline(
destination=dlt.destinations.duckdb("my_data.db"),
dataset_name="analytics"
)
"""def mssql(credentials: Any = None, create_indexes: bool = True, **kwargs) -> Destination:
"""Microsoft SQL Server destination."""def clickhouse(credentials: Any = None, **kwargs) -> Destination:
"""ClickHouse destination (columnar OLAP database)."""def sqlalchemy(credentials: Any = None, **kwargs) -> Destination:
"""Generic SQLAlchemy destination for any SQLAlchemy-compatible database."""def snowflake(
credentials: Any = None,
stage_name: str = None,
**kwargs
) -> Destination:
"""
Snowflake destination.
Args:
credentials: Snowflake credentials
stage_name: Staging area name
**kwargs: Additional parameters
"""def bigquery(
credentials: Any = None,
location: str = "US",
**kwargs
) -> Destination:
"""
Google BigQuery destination.
Args:
credentials: GCP credentials
location: Dataset location
**kwargs: Additional parameters
"""def redshift(
credentials: Any = None,
staging_iam_role: str = None,
**kwargs
) -> Destination:
"""
AWS Redshift destination.
Args:
credentials: Redshift credentials
staging_iam_role: IAM role for S3 staging
**kwargs: Additional parameters
"""def athena(
credentials: Any = None,
query_result_bucket: str = None,
**kwargs
) -> Destination:
"""
AWS Athena destination.
Args:
credentials: AWS credentials
query_result_bucket: S3 bucket for query results
**kwargs: Additional parameters
"""def synapse(credentials: Any = None, **kwargs) -> Destination:
"""Azure Synapse Analytics destination."""def databricks(credentials: Any = None, **kwargs) -> Destination:
"""Databricks SQL Warehouse destination."""def dremio(credentials: Any = None, **kwargs) -> Destination:
"""Dremio destination."""def filesystem(
bucket_url: str = None,
credentials: Any = None,
layout: str = "{table_name}/{load_id}.{file_id}.{ext}",
**kwargs
) -> Destination:
"""
Filesystem/data lake destination (S3, GCS, Azure, local).
Args:
bucket_url: Storage path (s3://, gs://, az://, file://)
credentials: Cloud credentials
layout: File path template
**kwargs: Additional parameters
Example:
pipeline = dlt.pipeline(
destination=dlt.destinations.filesystem("s3://my-bucket/data"),
dataset_name="raw"
)
"""def ducklake(
credentials: str = None,
table_format: str = "iceberg",
**kwargs
) -> Destination:
"""
DuckDB with data lake table format support (Iceberg, Delta).
Args:
credentials: Database path
table_format: "iceberg" or "delta"
**kwargs: Additional parameters
"""def motherduck(
credentials: Any = None,
**kwargs
) -> Destination:
"""MotherDuck destination (cloud-hosted DuckDB)."""def weaviate(
credentials: Any = None,
**kwargs
) -> Destination:
"""
Weaviate vector database destination.
Args:
credentials: Weaviate credentials (URL, API key)
**kwargs: Additional parameters
"""def qdrant(
credentials: Any = None,
location: str = None,
**kwargs
) -> Destination:
"""
Qdrant vector database destination.
Args:
credentials: Qdrant credentials
location: ":memory:" for in-memory or URL
**kwargs: Additional parameters
"""def lancedb(
credentials: Any = None,
**kwargs
) -> Destination:
"""LanceDB vector database destination."""def dummy(
mode: str = "normal",
**kwargs
) -> Destination:
"""
Dummy destination for testing (no actual loading).
Args:
mode: "normal", "fail_jobs", "fail_batches"
**kwargs: Additional parameters
"""@dlt.destination
def custom_destination(items, table):
"""
Creates a custom destination.
Example:
@dlt.destination(batch_size=100)
def my_destination(items, table):
# Custom loading logic
for item in items:
upload_to_api(item)
"""Destination-specific configuration applied to resources.
# BigQuery adapters
from dlt.destinations.adapters import (
bigquery_adapter,
bigquery_partition
)
def bigquery_adapter(
resource: Any,
partition: str = None,
cluster: list = None,
**kwargs
) -> Any:
"""
Applies BigQuery-specific hints.
Args:
resource: Resource to configure
partition: Partition column
cluster: Clustering columns
**kwargs: Additional BigQuery options
Example:
users = bigquery_adapter(
users_resource,
partition="created_date",
cluster=["user_id"]
)
"""
def bigquery_partition(
resource: Any,
partition_by: str,
partition_type: str = "day",
**kwargs
) -> Any:
"""
Configures BigQuery table partitioning.
Args:
resource: Resource to partition
partition_by: Column to partition by
partition_type: "hour", "day", "month", "year"
**kwargs: Additional options
"""# Athena adapters
from dlt.destinations.adapters import (
athena_adapter,
athena_partition
)
def athena_adapter(
resource: Any,
partition: list = None,
**kwargs
) -> Any:
"""Applies Athena-specific hints."""
def athena_partition(
resource: Any,
partition_by: list,
**kwargs
) -> Any:
"""Configures Athena table partitioning."""# Vector database adapters
from dlt.destinations.adapters import (
weaviate_adapter,
qdrant_adapter,
lancedb_adapter
)
def weaviate_adapter(
resource: Any,
vectorize: str = None,
**kwargs
) -> Any:
"""
Configures Weaviate-specific options.
Args:
resource: Resource to configure
vectorize: Field to vectorize
**kwargs: Weaviate options
"""
def qdrant_adapter(
resource: Any,
embed: str = None,
**kwargs
) -> Any:
"""Configures Qdrant-specific options."""
def lancedb_adapter(
resource: Any,
**kwargs
) -> Any:
"""Configures LanceDB-specific options."""# Other adapters
from dlt.destinations.adapters import (
postgres_adapter,
synapse_adapter,
clickhouse_adapter,
databricks_adapter
)
def postgres_adapter(resource: Any, **kwargs) -> Any:
"""Applies PostgreSQL-specific hints."""
def synapse_adapter(resource: Any, **kwargs) -> Any:
"""Applies Synapse-specific hints."""
def clickhouse_adapter(resource: Any, **kwargs) -> Any:
"""Applies ClickHouse-specific hints."""
def databricks_adapter(resource: Any, **kwargs) -> Any:
"""Applies Databricks-specific hints."""import dlt
# PostgreSQL
pipeline = dlt.pipeline(
pipeline_name="pg_pipeline",
destination=dlt.destinations.postgres("postgresql://localhost/db"),
dataset_name="analytics"
)
# DuckDB (in-memory)
pipeline = dlt.pipeline(
destination=dlt.destinations.duckdb(),
dataset_name="data"
)
# BigQuery
pipeline = dlt.pipeline(
destination=dlt.destinations.bigquery(
credentials=dlt.secrets["gcp_credentials"]
),
dataset_name="raw_data"
)# Redshift with S3 staging
pipeline = dlt.pipeline(
destination=dlt.destinations.redshift(
credentials=dlt.secrets["redshift"]
),
staging=dlt.destinations.filesystem("s3://my-bucket/staging"),
dataset_name="warehouse"
)# S3 data lake
pipeline = dlt.pipeline(
destination=dlt.destinations.filesystem(
bucket_url="s3://data-lake/raw",
credentials=dlt.secrets["aws_credentials"]
),
dataset_name="events"
)
# Custom layout
pipeline = dlt.pipeline(
destination=dlt.destinations.filesystem(
bucket_url="s3://bucket",
layout="{schema_name}/{table_name}/year={load_id[:4]}/{load_id}.{file_id}.parquet"
),
dataset_name="partitioned"
)from dlt.destinations.adapters import bigquery_adapter
@dlt.resource
def events():
yield from get_events()
# Apply partitioning
events_partitioned = bigquery_adapter(
events,
partition="event_date",
cluster=["user_id", "event_type"]
)
pipeline = dlt.pipeline(
destination=dlt.destinations.bigquery(),
dataset_name="analytics"
)
pipeline.run(events_partitioned)from dlt.destinations.adapters import weaviate_adapter
@dlt.resource
def documents():
yield from get_documents()
# Configure vectorization
docs_vectorized = weaviate_adapter(
documents,
vectorize="content"
)
pipeline = dlt.pipeline(
destination=dlt.destinations.weaviate(
credentials={"url": "http://localhost:8080"}
),
dataset_name="docs"
)
pipeline.run(docs_vectorized)# Load to multiple destinations
pipeline1 = dlt.pipeline(
destination=dlt.destinations.duckdb("local.db"),
dataset_name="local"
)
pipeline1.run(my_source())
pipeline2 = dlt.pipeline(
destination=dlt.destinations.bigquery(),
dataset_name="cloud"
)
pipeline2.run(my_source())Credentials can be provided via:
secrets.toml fileExample secrets.toml:
[destination.postgres.credentials]
database = "mydb"
username = "user"
password = "secret"
host = "localhost"
port = 5432
[destination.bigquery.credentials]
project_id = "my-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "sa@project.iam.gserviceaccount.com"
[destination.filesystem.credentials]
aws_access_key_id = "AKIA..."
aws_secret_access_key = "secret..."