CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

cli-resource.mddocs/

CLI Resource and Execution

Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management. This module provides the core interface for running dbt commands within Dagster ops and assets.

Capabilities

CLI Resource

DbtCliResource

The main resource for executing dbt CLI commands within Dagster.

class DbtCliResource(ConfigurableResource):
    """
    Resource for executing dbt CLI commands.
    
    Attributes:
    - project_dir: Path to the dbt project directory
    - profiles_dir: Path to the dbt profiles directory (optional)
    - profile: Name of the dbt profile to use (optional)
    - target: Name of the dbt target to use (optional)
    - global_config_flags: List of global dbt flags to apply
    """
    
    project_dir: str
    profiles_dir: Optional[str] = None
    profile: Optional[str] = None
    target: Optional[str] = None
    global_config_flags: List[str] = []
    
    def cli(
        self,
        args: List[str], 
        context: Optional[AssetExecutionContext | OpExecutionContext] = None,
        **kwargs
    ) -> DbtCliInvocation:
        """
        Execute dbt CLI command.
        
        Parameters:
        - args: dbt command arguments (e.g., ["build", "--select", "my_model"])
        - context: Dagster execution context for logging and metadata
        - **kwargs: Additional arguments passed to subprocess
        
        Returns:
        DbtCliInvocation object for streaming results and accessing artifacts
        """

Usage Example

from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource

dbt_resource = DbtCliResource(
    project_dir="./my_dbt_project",
    profiles_dir="~/.dbt",
    target="dev"
)

@asset
def run_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    # Run specific models
    dbt_run = dbt.cli([
        "run", 
        "--select", "tag:daily",
        "--exclude", "tag:slow"
    ], context=context)
    
    # Stream events and get results
    for event in dbt_run.stream():
        context.log.info(f"dbt: {event}")
    
    # Access artifacts
    run_results = dbt_run.get_artifact("run_results.json")
    return {"models_run": len(run_results.get("results", []))}

CLI Invocation

DbtCliInvocation

Represents a dbt CLI command invocation with methods for streaming events and accessing artifacts.

class DbtCliInvocation:
    """
    Represents a dbt CLI command invocation.
    
    Provides methods for streaming execution events and accessing
    generated artifacts like run_results.json and manifest.json.
    """
    
    def stream(self) -> Iterator[DbtCliEventMessage]:
        """
        Stream dbt CLI events as they occur.
        
        Yields:
        DbtCliEventMessage objects containing event data
        """
    
    def stream_raw_events(self) -> Iterator[dict]:
        """
        Stream raw dbt event dictionaries.
        
        Yields:
        Raw dbt event dictionaries without parsing
        """
    
    def get_artifact(self, artifact_name: str) -> Optional[dict]:
        """
        Get a dbt artifact by name.
        
        Parameters:
        - artifact_name: Name of artifact (e.g., "run_results.json", "manifest.json")
        
        Returns:
        Parsed artifact dictionary or None if not found
        """
    
    def wait(self) -> CompletedProcess:
        """
        Wait for the dbt command to complete.
        
        Returns:
        CompletedProcess with return code and outputs
        """
    
    @property
    def is_successful(self) -> bool:
        """
        Check if the dbt command completed successfully.
        
        Returns:
        True if command succeeded, False otherwise
        """

CLI Event Messages

DbtCliEventMessage

Base class for dbt CLI event messages with common event handling.

class DbtCliEventMessage:
    """
    Base class for dbt CLI event messages.
    
    Attributes:
    - raw_event: Raw event dictionary from dbt
    - event_type: Type of dbt event
    - log_level: Logging level of the event
    """
    
    raw_event: dict
    
    @property
    def event_type(self) -> str:
        """Get the dbt event type."""
    
    @property 
    def log_level(self) -> str:
        """Get the event log level."""
    
    def to_default_asset_events(
        self, 
        context: AssetExecutionContext,
        manifest: dict,
        **kwargs
    ) -> Iterator[AssetMaterialization | AssetObservation]:
        """
        Convert to Dagster asset events.
        
        Parameters:
        - context: Asset execution context
        - manifest: dbt manifest dictionary
        - **kwargs: Additional conversion parameters
        
        Yields:
        AssetMaterialization or AssetObservation events
        """

DbtCoreCliEventMessage

Event message implementation for dbt Core CLI commands.

class DbtCoreCliEventMessage(DbtCliEventMessage):
    """
    dbt Core CLI event message implementation.
    
    Handles events from dbt Core CLI execution with Core-specific
    event parsing and asset event generation.
    """

DbtFusionCliEventMessage

Event message implementation for dbt Fusion CLI commands.

class DbtFusionCliEventMessage(DbtCliEventMessage):
    """
    dbt Fusion CLI event message implementation.
    
    Handles events from dbt Fusion CLI execution with Fusion-specific
    event parsing and optimized performance characteristics.
    """

Advanced Usage Examples

Custom Event Handling

from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource

@asset
def process_dbt_with_custom_events(
    context: AssetExecutionContext, 
    dbt: DbtCliResource
):
    dbt_run = dbt.cli(["test"], context=context)
    
    test_results = []
    for event in dbt_run.stream():
        if event.event_type == "test_result":
            test_results.append({
                "test_name": event.raw_event.get("data", {}).get("node_name"),
                "status": event.raw_event.get("data", {}).get("status"),
                "execution_time": event.raw_event.get("data", {}).get("execution_time")
            })
        
        # Log important events
        if event.log_level in ["error", "warn"]:
            context.log.warning(f"dbt {event.log_level}: {event.raw_event}")
    
    return {"test_results": test_results}

Artifact Access

from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource

@asset
def analyze_dbt_run_results(
    context: AssetExecutionContext,
    dbt: DbtCliResource
):
    # Run dbt and get artifacts
    invocation = dbt.cli(["run", "--select", "tag:important"], context=context)
    
    # Process events
    for event in invocation.stream():
        pass  # Process events as needed
    
    # Access run results
    run_results = invocation.get_artifact("run_results.json")
    manifest = invocation.get_artifact("manifest.json")
    
    if run_results and manifest:
        successful_models = [
            result["unique_id"] 
            for result in run_results.get("results", [])
            if result["status"] == "success"
        ]
        
        return {
            "successful_models": successful_models,
            "total_execution_time": run_results.get("elapsed_time", 0)
        }
    
    return {"error": "Could not access dbt artifacts"}

Type Definitions

# Import types for type hints
from dagster import AssetExecutionContext, OpExecutionContext
from subprocess import CompletedProcess
from typing import Iterator, List, Optional, Union, dict

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json