CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

component-system.mddocs/

Component System

Integration with Dagster's component system for declarative dbt project configuration. The component system provides a way to define dbt projects as reusable components within Dagster code locations.

Capabilities

dbt Project Component

DbtProjectComponent

Dagster component for declarative dbt project configuration and management.

class DbtProjectComponent(Component):
    """
    Dagster component for dbt project integration.
    
    Provides declarative configuration for dbt projects within Dagster
    code locations, enabling automatic asset discovery and resource
    management.
    
    Attributes:
    - dbt_project_path: Path to the dbt project directory
    - dbt_profiles_path: Path to the dbt profiles directory (optional)
    - dbt_target: dbt target name (optional)
    - dbt_parse_on_load: Whether to parse project on component load
    """
    
    dbt_project_path: str
    dbt_profiles_path: Optional[str] = None
    dbt_target: Optional[str] = None
    dbt_parse_on_load: bool = True
    
    def build_defs(self) -> Definitions:
        """
        Build Dagster definitions from dbt project.
        
        Returns:
        Definitions object containing dbt assets and resources
        """
    
    def build_assets(self) -> Sequence[AssetsDefinition]:
        """
        Build asset definitions from dbt project.
        
        Returns:
        Sequence of AssetsDefinition objects for dbt models
        """
    
    def build_resources(self) -> Mapping[str, ResourceDefinition]:
        """
        Build resource definitions for dbt integration.
        
        Returns:
        Mapping of resource names to ResourceDefinition objects
        """

Usage Examples

Basic Component Configuration

from dagster import Definitions
from dagster_dbt.components import DbtProjectComponent

# Define dbt project component
dbt_component = DbtProjectComponent(
    dbt_project_path="./my_dbt_project",
    dbt_target="dev"
)

# Build definitions from component
defs = dbt_component.build_defs()

Multi-Environment Component

import os
from dagster_dbt.components import DbtProjectComponent

class EnvironmentDbtComponent(DbtProjectComponent):
    def __init__(self, project_path: str):
        environment = os.getenv("DAGSTER_ENV", "dev")
        
        super().__init__(
            dbt_project_path=project_path,
            dbt_target=environment,
            dbt_profiles_path=f"./profiles/{environment}"
        )
    
    def build_defs(self) -> Definitions:
        """Build environment-specific definitions."""
        base_defs = super().build_defs()
        
        # Add environment-specific configuration
        return Definitions(
            assets=base_defs.assets,
            resources=base_defs.resources,
            jobs=base_defs.jobs,
            schedules=base_defs.schedules,
            sensors=base_defs.sensors
        )

# Use environment component
component = EnvironmentDbtComponent("./my_dbt_project")
defs = component.build_defs()

Component with Custom Assets

from dagster import asset, AssetExecutionContext
from dagster_dbt.components import DbtProjectComponent
from dagster_dbt import DbtCliResource

class CustomDbtComponent(DbtProjectComponent):
    def build_assets(self) -> Sequence[AssetsDefinition]:
        """Build assets with custom pre/post processing."""
        # Get base dbt assets
        dbt_assets = super().build_assets()
        
        # Add custom preprocessing asset
        @asset
        def preprocess_data(context: AssetExecutionContext):
            """Preprocess data before dbt run."""
            context.log.info("Preprocessing data for dbt")
            # Custom preprocessing logic
            return {"status": "preprocessed"}
        
        # Add custom postprocessing asset
        @asset(deps=dbt_assets)
        def postprocess_results(context: AssetExecutionContext):
            """Postprocess dbt results."""
            context.log.info("Postprocessing dbt results")
            # Custom postprocessing logic
            return {"status": "postprocessed"}
        
        return [preprocess_data, *dbt_assets, postprocess_results]

component = CustomDbtComponent(
    dbt_project_path="./my_dbt_project",
    dbt_target="prod"
)
defs = component.build_defs()

Component Factory Pattern

from dagster_dbt.components import DbtProjectComponent
from typing import Dict, Any

def create_dbt_component(config: Dict[str, Any]) -> DbtProjectComponent:
    """Factory function for creating dbt components from configuration."""
    return DbtProjectComponent(
        dbt_project_path=config["project_path"],
        dbt_profiles_path=config.get("profiles_path"),
        dbt_target=config.get("target", "dev"),
        dbt_parse_on_load=config.get("parse_on_load", True)
    )

# Configuration-driven component creation
project_configs = [
    {
        "project_path": "./analytics",
        "target": "prod",
        "profiles_path": "./profiles/prod"
    },
    {
        "project_path": "./ml_models", 
        "target": "dev",
        "parse_on_load": False
    }
]

components = [create_dbt_component(config) for config in project_configs]
all_defs = [component.build_defs() for component in components]

# Combine all definitions
from dagster import Definitions

combined_assets = []
combined_resources = {}

for defs in all_defs:
    combined_assets.extend(defs.assets or [])
    combined_resources.update(defs.resources or {})

final_defs = Definitions(
    assets=combined_assets,
    resources=combined_resources
)

Type Definitions

from dagster import Component, Definitions, AssetsDefinition, ResourceDefinition
from typing import Optional, Sequence, Mapping

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json