CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Helpers

Utility functions for asset selection, naming conventions, metadata handling, and manifest operations. These utilities provide common functionality needed for dbt-Dagster integration.

Capabilities

Asset Selection Utilities

select_unique_ids

Selects dbt resources by their unique IDs using dbt's selection syntax.

def select_unique_ids(
    manifest: Mapping[str, Any], 
    select: str, 
    exclude: str = ""
) -> Set[str]:
    """
    Select dbt resources by unique IDs using dbt selection syntax.
    
    Parameters:
    - manifest: Parsed dbt manifest dictionary
    - select: dbt selection string (e.g., "tag:daily", "model:my_model+")
    - exclude: dbt exclusion string
    
    Returns:
    Set of unique IDs for selected dbt resources
    """

Naming Utilities

dagster_name_fn

Generates Dagster-compatible names from dbt resource properties.

def dagster_name_fn(dbt_resource_props: Mapping[str, Any]) -> str:
    """
    Generate Dagster name from dbt resource properties.
    
    Creates a valid Dagster asset name from dbt resource information,
    handling special characters and ensuring uniqueness.
    
    Parameters:
    - dbt_resource_props: dbt resource properties from manifest
    
    Returns:
    Valid Dagster asset name string
    """

Asset Key Utilities

default_node_info_to_asset_key

Default function for converting dbt node information to Dagster asset keys.

def default_node_info_to_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
    """
    Convert dbt node info to asset key using default logic.
    
    Parameters:
    - node_info: dbt node information dictionary
    
    Returns:
    AssetKey for the dbt node
    """

Metadata Utilities

DbtMetadataSet

Metadata entries specifically for dbt objects and their integration with Dagster.

class DbtMetadataSet:
    """
    Collection of metadata entries for dbt objects.
    
    Provides standardized metadata that can be attached to Dagster
    assets representing dbt models, sources, and tests.
    """
    
    @property
    def materialization_type(self) -> MetadataEntry:
        """
        Metadata entry for dbt materialization type.
        
        Returns:
        MetadataEntry indicating the dbt materialization (table, view, etc.)
        """

Manifest Utilities

validate_manifest

Validates and loads dbt manifest from various input formats.

def validate_manifest(manifest: DbtManifestParam) -> Mapping[str, Any]:
    """
    Validate and load dbt manifest from various formats.
    
    Parameters:
    - manifest: Manifest as dict, file path, or JSON string
    
    Returns:
    Validated manifest dictionary
    
    Raises:
    DagsterDbtError: If manifest is invalid
    """

read_manifest_path

Reads and caches manifest content from file path.

def read_manifest_path(manifest_path: str) -> Mapping[str, Any]:
    """
    Read manifest from file path with caching.
    
    Uses internal caching to avoid repeated file I/O for the same
    manifest file within a single process.
    
    Parameters:
    - manifest_path: Path to manifest.json file
    
    Returns:
    Parsed manifest dictionary
    
    Raises:
    DagsterDbtManifestNotFoundError: If file doesn't exist
    """

Usage Examples

Custom Asset Selection

from dagster import AssetSelection
from dagster_dbt.utils import select_unique_ids
from dagster_dbt.dbt_manifest import validate_manifest

def create_custom_asset_selection(manifest_path: str, criteria: str) -> AssetSelection:
    """Create asset selection from custom criteria."""
    manifest = validate_manifest(manifest_path)
    
    # Get unique IDs for selected models
    selected_ids = select_unique_ids(
        manifest=manifest,
        select=criteria,
        exclude="tag:deprecated"
    )
    
    # Convert to asset keys (assuming standard naming)
    asset_keys = []
    for unique_id in selected_ids:
        node = manifest["nodes"][unique_id]
        asset_key = AssetKey([node["schema"], node["name"]])
        asset_keys.append(asset_key)
    
    return AssetSelection.keys(*asset_keys)

# Use custom selection
selection = create_custom_asset_selection(
    "./target/manifest.json",
    "tag:daily +models/marts/"
)

Asset Key Generation

from dagster import AssetKey
from dagster_dbt.utils import dagster_name_fn, default_node_info_to_asset_key

def custom_asset_key_generator(dbt_resource_props: dict) -> AssetKey:
    """Generate custom asset keys with environment prefix."""
    # Use utility to get base asset key
    base_key = default_node_info_to_asset_key(dbt_resource_props)
    
    # Add environment prefix
    environment = os.getenv("DBT_TARGET", "dev")
    return AssetKey([environment] + list(base_key.path))

def custom_naming_function(dbt_resource_props: dict) -> str:
    """Generate custom Dagster asset names."""
    # Use utility for base name
    base_name = dagster_name_fn(dbt_resource_props)
    
    # Add materialization suffix
    materialization = dbt_resource_props.get("config", {}).get("materialized", "view")
    return f"{base_name}_{materialization}"

Metadata Enhancement

from dagster import MetadataEntry
from dagster_dbt.metadata_set import DbtMetadataSet

def enhanced_dbt_metadata(dbt_resource_props: dict) -> dict:
    """Generate enhanced metadata for dbt assets."""
    metadata = {}
    
    # Use DbtMetadataSet for standard entries
    dbt_metadata = DbtMetadataSet()
    if "materialized" in dbt_resource_props.get("config", {}):
        metadata["materialization"] = dbt_metadata.materialization_type
    
    # Add custom metadata
    if "description" in dbt_resource_props:
        metadata["description"] = MetadataEntry.text(dbt_resource_props["description"])
    
    # Add column count
    columns = dbt_resource_props.get("columns", {})
    if columns:
        metadata["column_count"] = MetadataEntry.int(len(columns))
        
        # Document column coverage
        documented_columns = sum(
            1 for col in columns.values() 
            if col.get("description")
        )
        metadata["documentation_coverage"] = MetadataEntry.float(
            documented_columns / len(columns) if columns else 0.0
        )
    
    # Add test information
    if "test" in dbt_resource_props.get("resource_type", ""):
        test_config = dbt_resource_props.get("test_metadata", {})
        metadata["test_type"] = MetadataEntry.text(
            test_config.get("name", "unknown")
        )
    
    return metadata

Manifest Processing

from dagster_dbt.utils import validate_manifest, read_manifest_path
from dagster_dbt.errors import DagsterDbtManifestNotFoundError

def analyze_dbt_project(manifest_input) -> dict:
    """Analyze dbt project from manifest."""
    try:
        # Validate and load manifest
        manifest = validate_manifest(manifest_input)
        
        # Analyze nodes
        nodes = manifest.get("nodes", {})
        sources = manifest.get("sources", {})
        
        analysis = {
            "total_nodes": len(nodes),
            "total_sources": len(sources),
            "models": len([n for n in nodes.values() if n.get("resource_type") == "model"]),
            "tests": len([n for n in nodes.values() if n.get("resource_type") == "test"]),
            "snapshots": len([n for n in nodes.values() if n.get("resource_type") == "snapshot"]),
            "seeds": len([n for n in nodes.values() if n.get("resource_type") == "seed"]),
        }
        
        # Analyze materializations
        materializations = {}
        for node in nodes.values():
            if node.get("resource_type") == "model":
                mat_type = node.get("config", {}).get("materialized", "view")
                materializations[mat_type] = materializations.get(mat_type, 0) + 1
        
        analysis["materializations"] = materializations
        
        # Analyze tags
        all_tags = set()
        for node in nodes.values():
            all_tags.update(node.get("tags", []))
        
        analysis["unique_tags"] = sorted(list(all_tags))
        analysis["tag_count"] = len(all_tags)
        
        return analysis
        
    except DagsterDbtManifestNotFoundError as e:
        return {"error": f"Manifest not found: {e}"}
    except Exception as e:
        return {"error": f"Analysis failed: {e}"}

# Analyze from file path
analysis = analyze_dbt_project("./target/manifest.json")
print(f"Found {analysis['models']} models and {analysis['tests']} tests")

# Analyze from manifest dict
with open("./target/manifest.json") as f:
    manifest_dict = json.load(f)
analysis = analyze_dbt_project(manifest_dict)

Selection Validation

from dagster_dbt.utils import select_unique_ids

def validate_dbt_selection(manifest: dict, select: str, exclude: str = "") -> dict:
    """Validate dbt selection criteria and return statistics."""
    try:
        selected_ids = select_unique_ids(
            manifest=manifest,
            select=select,
            exclude=exclude
        )
        
        # Analyze selection results
        selected_nodes = {
            unique_id: manifest["nodes"][unique_id]
            for unique_id in selected_ids
            if unique_id in manifest["nodes"]
        }
        
        # Group by resource type
        by_type = {}
        for node in selected_nodes.values():
            resource_type = node.get("resource_type", "unknown")
            by_type[resource_type] = by_type.get(resource_type, 0) + 1
        
        return {
            "valid": True,
            "selected_count": len(selected_ids),
            "by_type": by_type,
            "selected_models": [
                node["name"] for node in selected_nodes.values()
                if node.get("resource_type") == "model"
            ]
        }
        
    except Exception as e:
        return {
            "valid": False,
            "error": str(e)
        }

# Validate selection
result = validate_dbt_selection(
    manifest=manifest,
    select="tag:daily",
    exclude="tag:slow"
)

if result["valid"]:
    print(f"Selection is valid, found {result['selected_count']} resources")
else:
    print(f"Selection is invalid: {result['error']}")

Constants

# Asset resource types that are treated as assets
ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"]

Type Definitions

from dagster import AssetKey, MetadataEntry
from typing import Mapping, Any, Set, Union
from pathlib import Path

# Type alias for manifest parameter
DbtManifestParam = Union[Mapping[str, Any], str, Path]

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json