A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management. This module provides the core interface for running dbt commands within Dagster ops and assets.
The main resource for executing dbt CLI commands within Dagster.
class DbtCliResource(ConfigurableResource):
"""
Resource for executing dbt CLI commands.
Attributes:
- project_dir: Path to the dbt project directory
- profiles_dir: Path to the dbt profiles directory (optional)
- profile: Name of the dbt profile to use (optional)
- target: Name of the dbt target to use (optional)
- global_config_flags: List of global dbt flags to apply
"""
project_dir: str
profiles_dir: Optional[str] = None
profile: Optional[str] = None
target: Optional[str] = None
global_config_flags: List[str] = []
def cli(
self,
args: List[str],
context: Optional[AssetExecutionContext | OpExecutionContext] = None,
**kwargs
) -> DbtCliInvocation:
"""
Execute dbt CLI command.
Parameters:
- args: dbt command arguments (e.g., ["build", "--select", "my_model"])
- context: Dagster execution context for logging and metadata
- **kwargs: Additional arguments passed to subprocess
Returns:
DbtCliInvocation object for streaming results and accessing artifacts
"""from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource
dbt_resource = DbtCliResource(
project_dir="./my_dbt_project",
profiles_dir="~/.dbt",
target="dev"
)
@asset
def run_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
# Run specific models
dbt_run = dbt.cli([
"run",
"--select", "tag:daily",
"--exclude", "tag:slow"
], context=context)
# Stream events and get results
for event in dbt_run.stream():
context.log.info(f"dbt: {event}")
# Access artifacts
run_results = dbt_run.get_artifact("run_results.json")
return {"models_run": len(run_results.get("results", []))}Represents a dbt CLI command invocation with methods for streaming events and accessing artifacts.
class DbtCliInvocation:
"""
Represents a dbt CLI command invocation.
Provides methods for streaming execution events and accessing
generated artifacts like run_results.json and manifest.json.
"""
def stream(self) -> Iterator[DbtCliEventMessage]:
"""
Stream dbt CLI events as they occur.
Yields:
DbtCliEventMessage objects containing event data
"""
def stream_raw_events(self) -> Iterator[dict]:
"""
Stream raw dbt event dictionaries.
Yields:
Raw dbt event dictionaries without parsing
"""
def get_artifact(self, artifact_name: str) -> Optional[dict]:
"""
Get a dbt artifact by name.
Parameters:
- artifact_name: Name of artifact (e.g., "run_results.json", "manifest.json")
Returns:
Parsed artifact dictionary or None if not found
"""
def wait(self) -> CompletedProcess:
"""
Wait for the dbt command to complete.
Returns:
CompletedProcess with return code and outputs
"""
@property
def is_successful(self) -> bool:
"""
Check if the dbt command completed successfully.
Returns:
True if command succeeded, False otherwise
"""Base class for dbt CLI event messages with common event handling.
class DbtCliEventMessage:
"""
Base class for dbt CLI event messages.
Attributes:
- raw_event: Raw event dictionary from dbt
- event_type: Type of dbt event
- log_level: Logging level of the event
"""
raw_event: dict
@property
def event_type(self) -> str:
"""Get the dbt event type."""
@property
def log_level(self) -> str:
"""Get the event log level."""
def to_default_asset_events(
self,
context: AssetExecutionContext,
manifest: dict,
**kwargs
) -> Iterator[AssetMaterialization | AssetObservation]:
"""
Convert to Dagster asset events.
Parameters:
- context: Asset execution context
- manifest: dbt manifest dictionary
- **kwargs: Additional conversion parameters
Yields:
AssetMaterialization or AssetObservation events
"""Event message implementation for dbt Core CLI commands.
class DbtCoreCliEventMessage(DbtCliEventMessage):
"""
dbt Core CLI event message implementation.
Handles events from dbt Core CLI execution with Core-specific
event parsing and asset event generation.
"""Event message implementation for dbt Fusion CLI commands.
class DbtFusionCliEventMessage(DbtCliEventMessage):
"""
dbt Fusion CLI event message implementation.
Handles events from dbt Fusion CLI execution with Fusion-specific
event parsing and optimized performance characteristics.
"""from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource
@asset
def process_dbt_with_custom_events(
context: AssetExecutionContext,
dbt: DbtCliResource
):
dbt_run = dbt.cli(["test"], context=context)
test_results = []
for event in dbt_run.stream():
if event.event_type == "test_result":
test_results.append({
"test_name": event.raw_event.get("data", {}).get("node_name"),
"status": event.raw_event.get("data", {}).get("status"),
"execution_time": event.raw_event.get("data", {}).get("execution_time")
})
# Log important events
if event.log_level in ["error", "warn"]:
context.log.warning(f"dbt {event.log_level}: {event.raw_event}")
return {"test_results": test_results}from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource
@asset
def analyze_dbt_run_results(
context: AssetExecutionContext,
dbt: DbtCliResource
):
# Run dbt and get artifacts
invocation = dbt.cli(["run", "--select", "tag:important"], context=context)
# Process events
for event in invocation.stream():
pass # Process events as needed
# Access run results
run_results = invocation.get_artifact("run_results.json")
manifest = invocation.get_artifact("manifest.json")
if run_results and manifest:
successful_models = [
result["unique_id"]
for result in run_results.get("results", [])
if result["status"] == "success"
]
return {
"successful_models": successful_models,
"total_execution_time": run_results.get("elapsed_time", 0)
}
return {"error": "Could not access dbt artifacts"}# Import types for type hints
from dagster import AssetExecutionContext, OpExecutionContext
from subprocess import CompletedProcess
from typing import Iterator, List, Optional, Union, dictInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt