or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

destinations.mddocs/

Destinations

dlt supports 20+ destinations including data warehouses, databases, data lakes, and vector databases.

Capabilities

Relational Databases

def postgres(
    credentials: Union[str, ConnectionStringCredentials] = None,
    create_indexes: bool = True,
    **kwargs
) -> Destination:
    """
    PostgreSQL destination.

    Args:
        credentials: Connection string or credentials
        create_indexes: Create indexes for hints
        **kwargs: Additional destination parameters

    Example:
        pipeline = dlt.pipeline(
            destination=dlt.destinations.postgres("postgresql://localhost/db"),
            dataset_name="my_data"
        )
    """
def duckdb(
    credentials: str = None,
    create_indexes: bool = True,
    **kwargs
) -> Destination:
    """
    DuckDB destination (embedded analytical database).

    Args:
        credentials: Database file path (":memory:" for in-memory)
        create_indexes: Create indexes
        **kwargs: Additional parameters

    Example:
        pipeline = dlt.pipeline(
            destination=dlt.destinations.duckdb("my_data.db"),
            dataset_name="analytics"
        )
    """
def mssql(credentials: Any = None, create_indexes: bool = True, **kwargs) -> Destination:
    """Microsoft SQL Server destination."""
def clickhouse(credentials: Any = None, **kwargs) -> Destination:
    """ClickHouse destination (columnar OLAP database)."""
def sqlalchemy(credentials: Any = None, **kwargs) -> Destination:
    """Generic SQLAlchemy destination for any SQLAlchemy-compatible database."""

Cloud Data Warehouses

def snowflake(
    credentials: Any = None,
    stage_name: str = None,
    **kwargs
) -> Destination:
    """
    Snowflake destination.

    Args:
        credentials: Snowflake credentials
        stage_name: Staging area name
        **kwargs: Additional parameters
    """
def bigquery(
    credentials: Any = None,
    location: str = "US",
    **kwargs
) -> Destination:
    """
    Google BigQuery destination.

    Args:
        credentials: GCP credentials
        location: Dataset location
        **kwargs: Additional parameters
    """
def redshift(
    credentials: Any = None,
    staging_iam_role: str = None,
    **kwargs
) -> Destination:
    """
    AWS Redshift destination.

    Args:
        credentials: Redshift credentials
        staging_iam_role: IAM role for S3 staging
        **kwargs: Additional parameters
    """
def athena(
    credentials: Any = None,
    query_result_bucket: str = None,
    **kwargs
) -> Destination:
    """
    AWS Athena destination.

    Args:
        credentials: AWS credentials
        query_result_bucket: S3 bucket for query results
        **kwargs: Additional parameters
    """
def synapse(credentials: Any = None, **kwargs) -> Destination:
    """Azure Synapse Analytics destination."""
def databricks(credentials: Any = None, **kwargs) -> Destination:
    """Databricks SQL Warehouse destination."""
def dremio(credentials: Any = None, **kwargs) -> Destination:
    """Dremio destination."""

Data Lakes

def filesystem(
    bucket_url: str = None,
    credentials: Any = None,
    layout: str = "{table_name}/{load_id}.{file_id}.{ext}",
    **kwargs
) -> Destination:
    """
    Filesystem/data lake destination (S3, GCS, Azure, local).

    Args:
        bucket_url: Storage path (s3://, gs://, az://, file://)
        credentials: Cloud credentials
        layout: File path template
        **kwargs: Additional parameters

    Example:
        pipeline = dlt.pipeline(
            destination=dlt.destinations.filesystem("s3://my-bucket/data"),
            dataset_name="raw"
        )
    """
def ducklake(
    credentials: str = None,
    table_format: str = "iceberg",
    **kwargs
) -> Destination:
    """
    DuckDB with data lake table format support (Iceberg, Delta).

    Args:
        credentials: Database path
        table_format: "iceberg" or "delta"
        **kwargs: Additional parameters
    """
def motherduck(
    credentials: Any = None,
    **kwargs
) -> Destination:
    """MotherDuck destination (cloud-hosted DuckDB)."""

Vector Databases

def weaviate(
    credentials: Any = None,
    **kwargs
) -> Destination:
    """
    Weaviate vector database destination.

    Args:
        credentials: Weaviate credentials (URL, API key)
        **kwargs: Additional parameters
    """
def qdrant(
    credentials: Any = None,
    location: str = None,
    **kwargs
) -> Destination:
    """
    Qdrant vector database destination.

    Args:
        credentials: Qdrant credentials
        location: ":memory:" for in-memory or URL
        **kwargs: Additional parameters
    """
def lancedb(
    credentials: Any = None,
    **kwargs
) -> Destination:
    """LanceDB vector database destination."""

Testing

def dummy(
    mode: str = "normal",
    **kwargs
) -> Destination:
    """
    Dummy destination for testing (no actual loading).

    Args:
        mode: "normal", "fail_jobs", "fail_batches"
        **kwargs: Additional parameters
    """

Custom Destination

@dlt.destination
def custom_destination(items, table):
    """
    Creates a custom destination.

    Example:
        @dlt.destination(batch_size=100)
        def my_destination(items, table):
            # Custom loading logic
            for item in items:
                upload_to_api(item)
    """

Destination Adapters

Destination-specific configuration applied to resources.

# BigQuery adapters
from dlt.destinations.adapters import (
    bigquery_adapter,
    bigquery_partition
)

def bigquery_adapter(
    resource: Any,
    partition: str = None,
    cluster: list = None,
    **kwargs
) -> Any:
    """
    Applies BigQuery-specific hints.

    Args:
        resource: Resource to configure
        partition: Partition column
        cluster: Clustering columns
        **kwargs: Additional BigQuery options

    Example:
        users = bigquery_adapter(
            users_resource,
            partition="created_date",
            cluster=["user_id"]
        )
    """

def bigquery_partition(
    resource: Any,
    partition_by: str,
    partition_type: str = "day",
    **kwargs
) -> Any:
    """
    Configures BigQuery table partitioning.

    Args:
        resource: Resource to partition
        partition_by: Column to partition by
        partition_type: "hour", "day", "month", "year"
        **kwargs: Additional options
    """
# Athena adapters
from dlt.destinations.adapters import (
    athena_adapter,
    athena_partition
)

def athena_adapter(
    resource: Any,
    partition: list = None,
    **kwargs
) -> Any:
    """Applies Athena-specific hints."""

def athena_partition(
    resource: Any,
    partition_by: list,
    **kwargs
) -> Any:
    """Configures Athena table partitioning."""
# Vector database adapters
from dlt.destinations.adapters import (
    weaviate_adapter,
    qdrant_adapter,
    lancedb_adapter
)

def weaviate_adapter(
    resource: Any,
    vectorize: str = None,
    **kwargs
) -> Any:
    """
    Configures Weaviate-specific options.

    Args:
        resource: Resource to configure
        vectorize: Field to vectorize
        **kwargs: Weaviate options
    """

def qdrant_adapter(
    resource: Any,
    embed: str = None,
    **kwargs
) -> Any:
    """Configures Qdrant-specific options."""

def lancedb_adapter(
    resource: Any,
    **kwargs
) -> Any:
    """Configures LanceDB-specific options."""
# Other adapters
from dlt.destinations.adapters import (
    postgres_adapter,
    synapse_adapter,
    clickhouse_adapter,
    databricks_adapter
)

def postgres_adapter(resource: Any, **kwargs) -> Any:
    """Applies PostgreSQL-specific hints."""

def synapse_adapter(resource: Any, **kwargs) -> Any:
    """Applies Synapse-specific hints."""

def clickhouse_adapter(resource: Any, **kwargs) -> Any:
    """Applies ClickHouse-specific hints."""

def databricks_adapter(resource: Any, **kwargs) -> Any:
    """Applies Databricks-specific hints."""

Usage Examples

Basic Destination Configuration

import dlt

# PostgreSQL
pipeline = dlt.pipeline(
    pipeline_name="pg_pipeline",
    destination=dlt.destinations.postgres("postgresql://localhost/db"),
    dataset_name="analytics"
)

# DuckDB (in-memory)
pipeline = dlt.pipeline(
    destination=dlt.destinations.duckdb(),
    dataset_name="data"
)

# BigQuery
pipeline = dlt.pipeline(
    destination=dlt.destinations.bigquery(
        credentials=dlt.secrets["gcp_credentials"]
    ),
    dataset_name="raw_data"
)

With Staging

# Redshift with S3 staging
pipeline = dlt.pipeline(
    destination=dlt.destinations.redshift(
        credentials=dlt.secrets["redshift"]
    ),
    staging=dlt.destinations.filesystem("s3://my-bucket/staging"),
    dataset_name="warehouse"
)

Filesystem Destination

# S3 data lake
pipeline = dlt.pipeline(
    destination=dlt.destinations.filesystem(
        bucket_url="s3://data-lake/raw",
        credentials=dlt.secrets["aws_credentials"]
    ),
    dataset_name="events"
)

# Custom layout
pipeline = dlt.pipeline(
    destination=dlt.destinations.filesystem(
        bucket_url="s3://bucket",
        layout="{schema_name}/{table_name}/year={load_id[:4]}/{load_id}.{file_id}.parquet"
    ),
    dataset_name="partitioned"
)

BigQuery with Partitioning

from dlt.destinations.adapters import bigquery_adapter

@dlt.resource
def events():
    yield from get_events()

# Apply partitioning
events_partitioned = bigquery_adapter(
    events,
    partition="event_date",
    cluster=["user_id", "event_type"]
)

pipeline = dlt.pipeline(
    destination=dlt.destinations.bigquery(),
    dataset_name="analytics"
)
pipeline.run(events_partitioned)

Vector Database

from dlt.destinations.adapters import weaviate_adapter

@dlt.resource
def documents():
    yield from get_documents()

# Configure vectorization
docs_vectorized = weaviate_adapter(
    documents,
    vectorize="content"
)

pipeline = dlt.pipeline(
    destination=dlt.destinations.weaviate(
        credentials={"url": "http://localhost:8080"}
    ),
    dataset_name="docs"
)
pipeline.run(docs_vectorized)

Multiple Destinations

# Load to multiple destinations
pipeline1 = dlt.pipeline(
    destination=dlt.destinations.duckdb("local.db"),
    dataset_name="local"
)
pipeline1.run(my_source())

pipeline2 = dlt.pipeline(
    destination=dlt.destinations.bigquery(),
    dataset_name="cloud"
)
pipeline2.run(my_source())

Credential Configuration

Credentials can be provided via:

  • Function arguments
  • secrets.toml file
  • Environment variables
  • dlt configuration system

Example secrets.toml:

[destination.postgres.credentials]
database = "mydb"
username = "user"
password = "secret"
host = "localhost"
port = 5432

[destination.bigquery.credentials]
project_id = "my-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "sa@project.iam.gserviceaccount.com"

[destination.filesystem.credentials]
aws_access_key_id = "AKIA..."
aws_secret_access_key = "secret..."