Integration helpers for dbt, Airflow, data visualization, and other tools.
from dlt.helpers.dbt import (
create_venv,
restore_venv,
package_runner,
DBTPackageRunner,
DEFAULT_DBT_VERSION
)
def create_venv(destination: str, dbt_version: str = None) -> str:
"""
Creates a virtual environment with dbt and destination adapter.
Args:
destination: Destination name (e.g., "postgres", "bigquery")
dbt_version: dbt version (default: latest compatible)
Returns:
Path to created virtual environment
Example:
venv_path = create_venv("postgres", dbt_version="1.7.0")
"""def restore_venv(venv_path: str) -> str:
"""
Restores existing dbt virtual environment.
Args:
venv_path: Path to virtual environment
Returns:
Path to restored virtual environment
"""def package_runner(
pipeline: Any,
package_location: str,
package_repository_branch: str = None,
package_profiles_dir: str = None,
**dbt_run_kwargs
) -> DBTPackageRunner:
"""
Creates a dbt package runner.
Args:
pipeline: dlt Pipeline instance
package_location: Path to dbt package or git URL
package_repository_branch: Git branch (if git URL)
package_profiles_dir: Path to profiles directory
**dbt_run_kwargs: Arguments for dbt run command
Returns:
DBTPackageRunner instance
Example:
runner = package_runner(
pipeline,
"https://github.com/dlt-hub/dbt-duckdb-transform",
package_repository_branch="main"
)
runner.run_all()
"""class DBTPackageRunner:
"""
Runs dbt packages with dlt integration.
Methods:
run_all(): Run all dbt commands (deps, seed, run, test)
run_deps(): Install dbt dependencies
run_seed(): Load seed data
run_run(): Run dbt models
run_test(): Run dbt tests
"""
def run_all(self) -> None:
"""Runs complete dbt workflow: deps, seed, run, test"""
def run_deps(self) -> None:
"""Installs dbt package dependencies"""
def run_seed(self) -> None:
"""Loads seed CSV files"""
def run_run(self, **kwargs) -> None:
"""
Runs dbt models.
Args:
**kwargs: dbt run arguments (models, exclude, etc.)
"""
def run_test(self, **kwargs) -> None:
"""
Runs dbt tests.
Args:
**kwargs: dbt test arguments (models, exclude, etc.)
"""DEFAULT_DBT_VERSION: str # Default dbt version for venv creationfrom dlt.helpers.dbt_cloud import (
run_dbt_cloud_job,
get_dbt_cloud_run_status
)
def run_dbt_cloud_job(
account_id: int,
job_id: int,
api_token: str,
cause: str = "Triggered by dlt",
git_branch: str = None,
schema_override: str = None,
wait_for_completion: bool = True,
timeout: int = 3600
) -> dict:
"""
Triggers a dbt Cloud job run.
Args:
account_id: dbt Cloud account ID
job_id: dbt Cloud job ID
api_token: dbt Cloud API token
cause: Trigger reason
git_branch: Override git branch
schema_override: Override target schema
wait_for_completion: Wait for job to complete
timeout: Timeout in seconds
Returns:
Job run information dictionary
Example:
result = run_dbt_cloud_job(
account_id=12345,
job_id=67890,
api_token=dlt.secrets["dbt_cloud_api_token"],
wait_for_completion=True
)
print(f"Job status: {result['status']}")
"""def get_dbt_cloud_run_status(
account_id: int,
run_id: int,
api_token: str
) -> dict:
"""
Gets status of a dbt Cloud job run.
Args:
account_id: dbt Cloud account ID
run_id: Run ID
api_token: dbt Cloud API token
Returns:
Run status dictionary
Example:
status = get_dbt_cloud_run_status(12345, 98765, api_token)
print(f"Status: {status['status']}")
"""from dlt.helpers import graphviz, dbml, mermaid
# These modules provide schema export and visualization functions
# Usage typically through pipeline.default_schema# Airflow integration
from dlt.helpers import airflow_helper
# Ibis integration for dataframes
from dlt.helpers import ibis
# Marimo notebook integration
from dlt.helpers import marimoimport dlt
from dlt.helpers.dbt import package_runner
# Load data with dlt
pipeline = dlt.pipeline(
destination="postgres",
dataset_name="raw_data"
)
pipeline.run(my_source())
# Run dbt transformations
dbt = package_runner(
pipeline,
package_location="dbt/my_transforms"
)
dbt.run_all() # deps -> seed -> run -> testfrom dlt.helpers.dbt import package_runner
pipeline = dlt.pipeline(destination="bigquery", dataset_name="analytics")
pipeline.run(my_data())
# Use dbt package from git
dbt = package_runner(
pipeline,
package_location="https://github.com/my-org/dbt-transforms",
package_repository_branch="main"
)
dbt.run_all()from dlt.helpers.dbt import package_runner
pipeline = dlt.pipeline(...)
pipeline.run(my_source())
dbt = package_runner(pipeline, "dbt/transforms")
# Install dependencies
dbt.run_deps()
# Run specific models
dbt.run_run(models=["staging", "marts.sales"])
# Test specific models
dbt.run_test(models=["marts.sales"])import dlt
from dlt.helpers.dbt_cloud import run_dbt_cloud_job
# Load data
pipeline = dlt.pipeline(destination="snowflake", dataset_name="raw")
pipeline.run(my_source())
# Trigger dbt Cloud job
result = run_dbt_cloud_job(
account_id=12345,
job_id=67890,
api_token=dlt.secrets["dbt_cloud_api_token"],
cause=f"Triggered after {pipeline.pipeline_name} load",
wait_for_completion=True,
timeout=1800 # 30 minutes
)
if result["status"] == "success":
print("dbt Cloud job completed successfully")
else:
print(f"dbt Cloud job failed: {result['status']}")from dlt.helpers.dbt_cloud import run_dbt_cloud_job
# Load to dev schema
pipeline = dlt.pipeline(
destination="snowflake",
dataset_name="dev_data"
)
pipeline.run(my_source())
# Run dbt Cloud job with schema override
run_dbt_cloud_job(
account_id=12345,
job_id=67890,
api_token=dlt.secrets["dbt_cloud_api_token"],
schema_override="dev_data", # Use dev schema
git_branch="develop" # Use develop branch
)from dlt.helpers.dbt import create_venv
# Create venv for PostgreSQL
venv_path = create_venv(
destination="postgres",
dbt_version="1.7.4"
)
print(f"dbt venv created at: {venv_path}")from dlt.helpers import dbml
pipeline = dlt.pipeline(...)
pipeline.run(my_source())
# Export schema to DBML format
schema = pipeline.default_schema
dbml_str = dbml.to_dbml(schema)
# Save to file
with open("schema.dbml", "w") as f:
f.write(dbml_str)from dlt.helpers import graphviz
pipeline = dlt.pipeline(...)
pipeline.run(my_source())
# Generate graphviz visualization
schema = pipeline.default_schema
dot = graphviz.to_graphviz(schema)
# Render to file
dot.render("schema", format="png", cleanup=True)from dlt.helpers import mermaid
pipeline = dlt.pipeline(...)
pipeline.run(my_source())
# Export to mermaid format
schema = pipeline.default_schema
mermaid_str = mermaid.to_mermaid(schema)
print(mermaid_str)
# Can be rendered in markdown or mermaid toolsfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import dlt
def load_data():
pipeline = dlt.pipeline(
destination="bigquery",
dataset_name="airflow_data"
)
pipeline.run(my_source())
def run_dbt():
from dlt.helpers.dbt_cloud import run_dbt_cloud_job
run_dbt_cloud_job(
account_id=12345,
job_id=67890,
api_token="{{ var.value.dbt_cloud_token }}"
)
with DAG("dlt_pipeline", start_date=datetime(2024, 1, 1)) as dag:
load_task = PythonOperator(
task_id="load_data",
python_callable=load_data
)
transform_task = PythonOperator(
task_id="run_dbt",
python_callable=run_dbt
)
load_task >> transform_taskfrom dlt.helpers import ibis
import dlt
# Load data
pipeline = dlt.pipeline(destination="duckdb", dataset_name="data")
pipeline.run(my_source())
# Query with Ibis
dataset = pipeline.dataset()
users_table = dataset.users
# Use ibis for complex queries
import ibis
result = (
users_table
.filter(users_table.status == "active")
.group_by("country")
.aggregate(count=users_table.count())
)
df = result.to_pandas()
print(df)import dlt
# Use different progress trackers
pipeline = dlt.pipeline(
destination="postgres",
dataset_name="data",
progress="tqdm" # Options: "log", "tqdm", "enlighten", "alive_progress"
)
pipeline.run(my_large_source())
# Shows progress bars during extract/normalize/loadimport dlt
from dlt.helpers.dbt import package_runner
def etl_pipeline():
# 1. Extract and Load with dlt
pipeline = dlt.pipeline(
pipeline_name="sales_pipeline",
destination="postgres",
dataset_name="raw_sales"
)
# Load raw data
load_info = pipeline.run(sales_api_source())
print(f"Loaded {len(load_info.loads_ids)} packages")
# 2. Transform with dbt
dbt = package_runner(
pipeline,
package_location="dbt/sales_transforms",
models="marts" # Only run marts models
)
# Run dbt workflow
dbt.run_deps()
dbt.run_run(models="staging marts")
dbt.run_test()
print("ETL pipeline completed")
etl_pipeline()dlt.helpers.dbtdlt.helpers.dbt_clouddlt.helpers.graphvizdlt.helpers.dbmldlt.helpers.mermaiddlt.helpers.airflow_helperdlt.helpers.ibisdlt.helpers.marimo