A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Integration with Dagster's component system for declarative dbt project configuration. The component system provides a way to define dbt projects as reusable components within Dagster code locations.
Dagster component for declarative dbt project configuration and management.
class DbtProjectComponent(Component):
"""
Dagster component for dbt project integration.
Provides declarative configuration for dbt projects within Dagster
code locations, enabling automatic asset discovery and resource
management.
Attributes:
- dbt_project_path: Path to the dbt project directory
- dbt_profiles_path: Path to the dbt profiles directory (optional)
- dbt_target: dbt target name (optional)
- dbt_parse_on_load: Whether to parse project on component load
"""
dbt_project_path: str
dbt_profiles_path: Optional[str] = None
dbt_target: Optional[str] = None
dbt_parse_on_load: bool = True
def build_defs(self) -> Definitions:
"""
Build Dagster definitions from dbt project.
Returns:
Definitions object containing dbt assets and resources
"""
def build_assets(self) -> Sequence[AssetsDefinition]:
"""
Build asset definitions from dbt project.
Returns:
Sequence of AssetsDefinition objects for dbt models
"""
def build_resources(self) -> Mapping[str, ResourceDefinition]:
"""
Build resource definitions for dbt integration.
Returns:
Mapping of resource names to ResourceDefinition objects
"""from dagster import Definitions
from dagster_dbt.components import DbtProjectComponent
# Define dbt project component
dbt_component = DbtProjectComponent(
dbt_project_path="./my_dbt_project",
dbt_target="dev"
)
# Build definitions from component
defs = dbt_component.build_defs()import os
from dagster_dbt.components import DbtProjectComponent
class EnvironmentDbtComponent(DbtProjectComponent):
def __init__(self, project_path: str):
environment = os.getenv("DAGSTER_ENV", "dev")
super().__init__(
dbt_project_path=project_path,
dbt_target=environment,
dbt_profiles_path=f"./profiles/{environment}"
)
def build_defs(self) -> Definitions:
"""Build environment-specific definitions."""
base_defs = super().build_defs()
# Add environment-specific configuration
return Definitions(
assets=base_defs.assets,
resources=base_defs.resources,
jobs=base_defs.jobs,
schedules=base_defs.schedules,
sensors=base_defs.sensors
)
# Use environment component
component = EnvironmentDbtComponent("./my_dbt_project")
defs = component.build_defs()from dagster import asset, AssetExecutionContext
from dagster_dbt.components import DbtProjectComponent
from dagster_dbt import DbtCliResource
class CustomDbtComponent(DbtProjectComponent):
def build_assets(self) -> Sequence[AssetsDefinition]:
"""Build assets with custom pre/post processing."""
# Get base dbt assets
dbt_assets = super().build_assets()
# Add custom preprocessing asset
@asset
def preprocess_data(context: AssetExecutionContext):
"""Preprocess data before dbt run."""
context.log.info("Preprocessing data for dbt")
# Custom preprocessing logic
return {"status": "preprocessed"}
# Add custom postprocessing asset
@asset(deps=dbt_assets)
def postprocess_results(context: AssetExecutionContext):
"""Postprocess dbt results."""
context.log.info("Postprocessing dbt results")
# Custom postprocessing logic
return {"status": "postprocessed"}
return [preprocess_data, *dbt_assets, postprocess_results]
component = CustomDbtComponent(
dbt_project_path="./my_dbt_project",
dbt_target="prod"
)
defs = component.build_defs()from dagster_dbt.components import DbtProjectComponent
from typing import Dict, Any
def create_dbt_component(config: Dict[str, Any]) -> DbtProjectComponent:
"""Factory function for creating dbt components from configuration."""
return DbtProjectComponent(
dbt_project_path=config["project_path"],
dbt_profiles_path=config.get("profiles_path"),
dbt_target=config.get("target", "dev"),
dbt_parse_on_load=config.get("parse_on_load", True)
)
# Configuration-driven component creation
project_configs = [
{
"project_path": "./analytics",
"target": "prod",
"profiles_path": "./profiles/prod"
},
{
"project_path": "./ml_models",
"target": "dev",
"parse_on_load": False
}
]
components = [create_dbt_component(config) for config in project_configs]
all_defs = [component.build_defs() for component in components]
# Combine all definitions
from dagster import Definitions
combined_assets = []
combined_resources = {}
for defs in all_defs:
combined_assets.extend(defs.assets or [])
combined_resources.update(defs.resources or {})
final_defs = Definitions(
assets=combined_assets,
resources=combined_resources
)from dagster import Component, Definitions, AssetsDefinition, ResourceDefinition
from typing import Optional, Sequence, MappingInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt