or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

helpers.mddocs/

Helpers and Integrations

Integration helpers for dbt, Airflow, data visualization, and other tools.

Capabilities

dbt Integration

from dlt.helpers.dbt import (
    create_venv,
    restore_venv,
    package_runner,
    DBTPackageRunner,
    DEFAULT_DBT_VERSION
)

def create_venv(destination: str, dbt_version: str = None) -> str:
    """
    Creates a virtual environment with dbt and destination adapter.

    Args:
        destination: Destination name (e.g., "postgres", "bigquery")
        dbt_version: dbt version (default: latest compatible)

    Returns:
        Path to created virtual environment

    Example:
        venv_path = create_venv("postgres", dbt_version="1.7.0")
    """
def restore_venv(venv_path: str) -> str:
    """
    Restores existing dbt virtual environment.

    Args:
        venv_path: Path to virtual environment

    Returns:
        Path to restored virtual environment
    """
def package_runner(
    pipeline: Any,
    package_location: str,
    package_repository_branch: str = None,
    package_profiles_dir: str = None,
    **dbt_run_kwargs
) -> DBTPackageRunner:
    """
    Creates a dbt package runner.

    Args:
        pipeline: dlt Pipeline instance
        package_location: Path to dbt package or git URL
        package_repository_branch: Git branch (if git URL)
        package_profiles_dir: Path to profiles directory
        **dbt_run_kwargs: Arguments for dbt run command

    Returns:
        DBTPackageRunner instance

    Example:
        runner = package_runner(
            pipeline,
            "https://github.com/dlt-hub/dbt-duckdb-transform",
            package_repository_branch="main"
        )
        runner.run_all()
    """
class DBTPackageRunner:
    """
    Runs dbt packages with dlt integration.

    Methods:
        run_all(): Run all dbt commands (deps, seed, run, test)
        run_deps(): Install dbt dependencies
        run_seed(): Load seed data
        run_run(): Run dbt models
        run_test(): Run dbt tests
    """

    def run_all(self) -> None:
        """Runs complete dbt workflow: deps, seed, run, test"""

    def run_deps(self) -> None:
        """Installs dbt package dependencies"""

    def run_seed(self) -> None:
        """Loads seed CSV files"""

    def run_run(self, **kwargs) -> None:
        """
        Runs dbt models.

        Args:
            **kwargs: dbt run arguments (models, exclude, etc.)
        """

    def run_test(self, **kwargs) -> None:
        """
        Runs dbt tests.

        Args:
            **kwargs: dbt test arguments (models, exclude, etc.)
        """
DEFAULT_DBT_VERSION: str  # Default dbt version for venv creation

dbt Cloud Integration

from dlt.helpers.dbt_cloud import (
    run_dbt_cloud_job,
    get_dbt_cloud_run_status
)

def run_dbt_cloud_job(
    account_id: int,
    job_id: int,
    api_token: str,
    cause: str = "Triggered by dlt",
    git_branch: str = None,
    schema_override: str = None,
    wait_for_completion: bool = True,
    timeout: int = 3600
) -> dict:
    """
    Triggers a dbt Cloud job run.

    Args:
        account_id: dbt Cloud account ID
        job_id: dbt Cloud job ID
        api_token: dbt Cloud API token
        cause: Trigger reason
        git_branch: Override git branch
        schema_override: Override target schema
        wait_for_completion: Wait for job to complete
        timeout: Timeout in seconds

    Returns:
        Job run information dictionary

    Example:
        result = run_dbt_cloud_job(
            account_id=12345,
            job_id=67890,
            api_token=dlt.secrets["dbt_cloud_api_token"],
            wait_for_completion=True
        )
        print(f"Job status: {result['status']}")
    """
def get_dbt_cloud_run_status(
    account_id: int,
    run_id: int,
    api_token: str
) -> dict:
    """
    Gets status of a dbt Cloud job run.

    Args:
        account_id: dbt Cloud account ID
        run_id: Run ID
        api_token: dbt Cloud API token

    Returns:
        Run status dictionary

    Example:
        status = get_dbt_cloud_run_status(12345, 98765, api_token)
        print(f"Status: {status['status']}")
    """

Schema Visualization

from dlt.helpers import graphviz, dbml, mermaid

# These modules provide schema export and visualization functions
# Usage typically through pipeline.default_schema

Other Helpers

# Airflow integration
from dlt.helpers import airflow_helper

# Ibis integration for dataframes
from dlt.helpers import ibis

# Marimo notebook integration
from dlt.helpers import marimo

Usage Examples

dbt Integration - Local Package

import dlt
from dlt.helpers.dbt import package_runner

# Load data with dlt
pipeline = dlt.pipeline(
    destination="postgres",
    dataset_name="raw_data"
)
pipeline.run(my_source())

# Run dbt transformations
dbt = package_runner(
    pipeline,
    package_location="dbt/my_transforms"
)
dbt.run_all()  # deps -> seed -> run -> test

dbt Integration - Git Package

from dlt.helpers.dbt import package_runner

pipeline = dlt.pipeline(destination="bigquery", dataset_name="analytics")
pipeline.run(my_data())

# Use dbt package from git
dbt = package_runner(
    pipeline,
    package_location="https://github.com/my-org/dbt-transforms",
    package_repository_branch="main"
)
dbt.run_all()

dbt Integration - Selective Runs

from dlt.helpers.dbt import package_runner

pipeline = dlt.pipeline(...)
pipeline.run(my_source())

dbt = package_runner(pipeline, "dbt/transforms")

# Install dependencies
dbt.run_deps()

# Run specific models
dbt.run_run(models=["staging", "marts.sales"])

# Test specific models
dbt.run_test(models=["marts.sales"])

dbt Cloud Integration

import dlt
from dlt.helpers.dbt_cloud import run_dbt_cloud_job

# Load data
pipeline = dlt.pipeline(destination="snowflake", dataset_name="raw")
pipeline.run(my_source())

# Trigger dbt Cloud job
result = run_dbt_cloud_job(
    account_id=12345,
    job_id=67890,
    api_token=dlt.secrets["dbt_cloud_api_token"],
    cause=f"Triggered after {pipeline.pipeline_name} load",
    wait_for_completion=True,
    timeout=1800  # 30 minutes
)

if result["status"] == "success":
    print("dbt Cloud job completed successfully")
else:
    print(f"dbt Cloud job failed: {result['status']}")

dbt Cloud with Schema Override

from dlt.helpers.dbt_cloud import run_dbt_cloud_job

# Load to dev schema
pipeline = dlt.pipeline(
    destination="snowflake",
    dataset_name="dev_data"
)
pipeline.run(my_source())

# Run dbt Cloud job with schema override
run_dbt_cloud_job(
    account_id=12345,
    job_id=67890,
    api_token=dlt.secrets["dbt_cloud_api_token"],
    schema_override="dev_data",  # Use dev schema
    git_branch="develop"  # Use develop branch
)

Creating dbt Virtual Environment

from dlt.helpers.dbt import create_venv

# Create venv for PostgreSQL
venv_path = create_venv(
    destination="postgres",
    dbt_version="1.7.4"
)

print(f"dbt venv created at: {venv_path}")

Schema Export to DBML

from dlt.helpers import dbml

pipeline = dlt.pipeline(...)
pipeline.run(my_source())

# Export schema to DBML format
schema = pipeline.default_schema
dbml_str = dbml.to_dbml(schema)

# Save to file
with open("schema.dbml", "w") as f:
    f.write(dbml_str)

Schema Visualization with Graphviz

from dlt.helpers import graphviz

pipeline = dlt.pipeline(...)
pipeline.run(my_source())

# Generate graphviz visualization
schema = pipeline.default_schema
dot = graphviz.to_graphviz(schema)

# Render to file
dot.render("schema", format="png", cleanup=True)

Mermaid Diagram Export

from dlt.helpers import mermaid

pipeline = dlt.pipeline(...)
pipeline.run(my_source())

# Export to mermaid format
schema = pipeline.default_schema
mermaid_str = mermaid.to_mermaid(schema)

print(mermaid_str)
# Can be rendered in markdown or mermaid tools

Airflow Integration

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import dlt

def load_data():
    pipeline = dlt.pipeline(
        destination="bigquery",
        dataset_name="airflow_data"
    )
    pipeline.run(my_source())

def run_dbt():
    from dlt.helpers.dbt_cloud import run_dbt_cloud_job
    run_dbt_cloud_job(
        account_id=12345,
        job_id=67890,
        api_token="{{ var.value.dbt_cloud_token }}"
    )

with DAG("dlt_pipeline", start_date=datetime(2024, 1, 1)) as dag:
    load_task = PythonOperator(
        task_id="load_data",
        python_callable=load_data
    )

    transform_task = PythonOperator(
        task_id="run_dbt",
        python_callable=run_dbt
    )

    load_task >> transform_task

Ibis Integration

from dlt.helpers import ibis
import dlt

# Load data
pipeline = dlt.pipeline(destination="duckdb", dataset_name="data")
pipeline.run(my_source())

# Query with Ibis
dataset = pipeline.dataset()
users_table = dataset.users

# Use ibis for complex queries
import ibis
result = (
    users_table
    .filter(users_table.status == "active")
    .group_by("country")
    .aggregate(count=users_table.count())
)

df = result.to_pandas()
print(df)

Progress Tracking

import dlt

# Use different progress trackers
pipeline = dlt.pipeline(
    destination="postgres",
    dataset_name="data",
    progress="tqdm"  # Options: "log", "tqdm", "enlighten", "alive_progress"
)

pipeline.run(my_large_source())
# Shows progress bars during extract/normalize/load

Combined dlt + dbt Workflow

import dlt
from dlt.helpers.dbt import package_runner

def etl_pipeline():
    # 1. Extract and Load with dlt
    pipeline = dlt.pipeline(
        pipeline_name="sales_pipeline",
        destination="postgres",
        dataset_name="raw_sales"
    )

    # Load raw data
    load_info = pipeline.run(sales_api_source())
    print(f"Loaded {len(load_info.loads_ids)} packages")

    # 2. Transform with dbt
    dbt = package_runner(
        pipeline,
        package_location="dbt/sales_transforms",
        models="marts"  # Only run marts models
    )

    # Run dbt workflow
    dbt.run_deps()
    dbt.run_run(models="staging marts")
    dbt.run_test()

    print("ETL pipeline completed")

etl_pipeline()

Helper Module Locations

  • dbt: dlt.helpers.dbt
  • dbt Cloud: dlt.helpers.dbt_cloud
  • Graphviz: dlt.helpers.graphviz
  • DBML: dlt.helpers.dbml
  • Mermaid: dlt.helpers.mermaid
  • Airflow: dlt.helpers.airflow_helper
  • Ibis: dlt.helpers.ibis
  • Marimo: dlt.helpers.marimo