A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Utility functions for asset selection, naming conventions, metadata handling, and manifest operations. These utilities provide common functionality needed for dbt-Dagster integration.
Selects dbt resources by their unique IDs using dbt's selection syntax.
def select_unique_ids(
manifest: Mapping[str, Any],
select: str,
exclude: str = ""
) -> Set[str]:
"""
Select dbt resources by unique IDs using dbt selection syntax.
Parameters:
- manifest: Parsed dbt manifest dictionary
- select: dbt selection string (e.g., "tag:daily", "model:my_model+")
- exclude: dbt exclusion string
Returns:
Set of unique IDs for selected dbt resources
"""Generates Dagster-compatible names from dbt resource properties.
def dagster_name_fn(dbt_resource_props: Mapping[str, Any]) -> str:
"""
Generate Dagster name from dbt resource properties.
Creates a valid Dagster asset name from dbt resource information,
handling special characters and ensuring uniqueness.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Valid Dagster asset name string
"""Default function for converting dbt node information to Dagster asset keys.
def default_node_info_to_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
"""
Convert dbt node info to asset key using default logic.
Parameters:
- node_info: dbt node information dictionary
Returns:
AssetKey for the dbt node
"""Metadata entries specifically for dbt objects and their integration with Dagster.
class DbtMetadataSet:
"""
Collection of metadata entries for dbt objects.
Provides standardized metadata that can be attached to Dagster
assets representing dbt models, sources, and tests.
"""
@property
def materialization_type(self) -> MetadataEntry:
"""
Metadata entry for dbt materialization type.
Returns:
MetadataEntry indicating the dbt materialization (table, view, etc.)
"""Validates and loads dbt manifest from various input formats.
def validate_manifest(manifest: DbtManifestParam) -> Mapping[str, Any]:
"""
Validate and load dbt manifest from various formats.
Parameters:
- manifest: Manifest as dict, file path, or JSON string
Returns:
Validated manifest dictionary
Raises:
DagsterDbtError: If manifest is invalid
"""Reads and caches manifest content from file path.
def read_manifest_path(manifest_path: str) -> Mapping[str, Any]:
"""
Read manifest from file path with caching.
Uses internal caching to avoid repeated file I/O for the same
manifest file within a single process.
Parameters:
- manifest_path: Path to manifest.json file
Returns:
Parsed manifest dictionary
Raises:
DagsterDbtManifestNotFoundError: If file doesn't exist
"""from dagster import AssetSelection
from dagster_dbt.utils import select_unique_ids
from dagster_dbt.dbt_manifest import validate_manifest
def create_custom_asset_selection(manifest_path: str, criteria: str) -> AssetSelection:
"""Create asset selection from custom criteria."""
manifest = validate_manifest(manifest_path)
# Get unique IDs for selected models
selected_ids = select_unique_ids(
manifest=manifest,
select=criteria,
exclude="tag:deprecated"
)
# Convert to asset keys (assuming standard naming)
asset_keys = []
for unique_id in selected_ids:
node = manifest["nodes"][unique_id]
asset_key = AssetKey([node["schema"], node["name"]])
asset_keys.append(asset_key)
return AssetSelection.keys(*asset_keys)
# Use custom selection
selection = create_custom_asset_selection(
"./target/manifest.json",
"tag:daily +models/marts/"
)from dagster import AssetKey
from dagster_dbt.utils import dagster_name_fn, default_node_info_to_asset_key
def custom_asset_key_generator(dbt_resource_props: dict) -> AssetKey:
"""Generate custom asset keys with environment prefix."""
# Use utility to get base asset key
base_key = default_node_info_to_asset_key(dbt_resource_props)
# Add environment prefix
environment = os.getenv("DBT_TARGET", "dev")
return AssetKey([environment] + list(base_key.path))
def custom_naming_function(dbt_resource_props: dict) -> str:
"""Generate custom Dagster asset names."""
# Use utility for base name
base_name = dagster_name_fn(dbt_resource_props)
# Add materialization suffix
materialization = dbt_resource_props.get("config", {}).get("materialized", "view")
return f"{base_name}_{materialization}"from dagster import MetadataEntry
from dagster_dbt.metadata_set import DbtMetadataSet
def enhanced_dbt_metadata(dbt_resource_props: dict) -> dict:
"""Generate enhanced metadata for dbt assets."""
metadata = {}
# Use DbtMetadataSet for standard entries
dbt_metadata = DbtMetadataSet()
if "materialized" in dbt_resource_props.get("config", {}):
metadata["materialization"] = dbt_metadata.materialization_type
# Add custom metadata
if "description" in dbt_resource_props:
metadata["description"] = MetadataEntry.text(dbt_resource_props["description"])
# Add column count
columns = dbt_resource_props.get("columns", {})
if columns:
metadata["column_count"] = MetadataEntry.int(len(columns))
# Document column coverage
documented_columns = sum(
1 for col in columns.values()
if col.get("description")
)
metadata["documentation_coverage"] = MetadataEntry.float(
documented_columns / len(columns) if columns else 0.0
)
# Add test information
if "test" in dbt_resource_props.get("resource_type", ""):
test_config = dbt_resource_props.get("test_metadata", {})
metadata["test_type"] = MetadataEntry.text(
test_config.get("name", "unknown")
)
return metadatafrom dagster_dbt.utils import validate_manifest, read_manifest_path
from dagster_dbt.errors import DagsterDbtManifestNotFoundError
def analyze_dbt_project(manifest_input) -> dict:
"""Analyze dbt project from manifest."""
try:
# Validate and load manifest
manifest = validate_manifest(manifest_input)
# Analyze nodes
nodes = manifest.get("nodes", {})
sources = manifest.get("sources", {})
analysis = {
"total_nodes": len(nodes),
"total_sources": len(sources),
"models": len([n for n in nodes.values() if n.get("resource_type") == "model"]),
"tests": len([n for n in nodes.values() if n.get("resource_type") == "test"]),
"snapshots": len([n for n in nodes.values() if n.get("resource_type") == "snapshot"]),
"seeds": len([n for n in nodes.values() if n.get("resource_type") == "seed"]),
}
# Analyze materializations
materializations = {}
for node in nodes.values():
if node.get("resource_type") == "model":
mat_type = node.get("config", {}).get("materialized", "view")
materializations[mat_type] = materializations.get(mat_type, 0) + 1
analysis["materializations"] = materializations
# Analyze tags
all_tags = set()
for node in nodes.values():
all_tags.update(node.get("tags", []))
analysis["unique_tags"] = sorted(list(all_tags))
analysis["tag_count"] = len(all_tags)
return analysis
except DagsterDbtManifestNotFoundError as e:
return {"error": f"Manifest not found: {e}"}
except Exception as e:
return {"error": f"Analysis failed: {e}"}
# Analyze from file path
analysis = analyze_dbt_project("./target/manifest.json")
print(f"Found {analysis['models']} models and {analysis['tests']} tests")
# Analyze from manifest dict
with open("./target/manifest.json") as f:
manifest_dict = json.load(f)
analysis = analyze_dbt_project(manifest_dict)from dagster_dbt.utils import select_unique_ids
def validate_dbt_selection(manifest: dict, select: str, exclude: str = "") -> dict:
"""Validate dbt selection criteria and return statistics."""
try:
selected_ids = select_unique_ids(
manifest=manifest,
select=select,
exclude=exclude
)
# Analyze selection results
selected_nodes = {
unique_id: manifest["nodes"][unique_id]
for unique_id in selected_ids
if unique_id in manifest["nodes"]
}
# Group by resource type
by_type = {}
for node in selected_nodes.values():
resource_type = node.get("resource_type", "unknown")
by_type[resource_type] = by_type.get(resource_type, 0) + 1
return {
"valid": True,
"selected_count": len(selected_ids),
"by_type": by_type,
"selected_models": [
node["name"] for node in selected_nodes.values()
if node.get("resource_type") == "model"
]
}
except Exception as e:
return {
"valid": False,
"error": str(e)
}
# Validate selection
result = validate_dbt_selection(
manifest=manifest,
select="tag:daily",
exclude="tag:slow"
)
if result["valid"]:
print(f"Selection is valid, found {result['selected_count']} resources")
else:
print(f"Selection is invalid: {result['error']}")# Asset resource types that are treated as assets
ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"]from dagster import AssetKey, MetadataEntry
from typing import Mapping, Any, Set, Union
from pathlib import Path
# Type alias for manifest parameter
DbtManifestParam = Union[Mapping[str, Any], str, Path]Install with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt