CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

translation-system.mddocs/

Translation System

Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys. The translation system provides a flexible interface for controlling how dbt models, tests, and sources are represented as Dagster assets.

Capabilities

Core Translator

DagsterDbtTranslator

Main translator class that maps dbt resources to Dagster asset specifications.

class DagsterDbtTranslator:
    """
    Maps dbt resources to Dagster assets with customizable translation logic.
    
    This class provides methods to control how dbt models, sources, and tests
    are converted into Dagster assets, including asset keys, metadata, groups,
    and other asset properties.
    """
    
    def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...
    
    def get_asset_spec(
        self, 
        manifest: Mapping[str, Any], 
        unique_id: str, 
        project: Optional["DbtProject"]
    ) -> AssetSpec:
        """
        Get AssetSpec for a dbt resource.
        
        Parameters:
        - manifest: Complete dbt manifest dictionary
        - unique_id: Unique ID of the dbt resource
        - project: DbtProject instance (optional)
        
        Returns:
        AssetSpec object for the dbt resource
        """
    
    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        """
        Get asset key for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        AssetKey for the dbt resource
        """
    
    def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
        """
        Get group name for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        Group name string or None
        """
    
    def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
        """
        Get metadata for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        Dictionary of Dagster metadata
        """
    
    def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
        """
        Get tags for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        Dictionary of asset tags
        """
    
    def get_freshness_policy(
        self, 
        dbt_resource_props: Mapping[str, Any]
    ) -> Optional[FreshnessPolicy]:
        """
        Get freshness policy for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        FreshnessPolicy object or None
        """
    
    def get_auto_materialize_policy(
        self,
        dbt_resource_props: Mapping[str, Any]
    ) -> Optional[AutoMaterializePolicy]:
        """
        Get auto-materialize policy for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        AutoMaterializePolicy object or None
        """
    
    def get_deps_asset_keys(
        self,
        dbt_resource_props: Mapping[str, Any],
        manifest: Mapping[str, Any]
    ) -> Iterable[AssetKey]:
        """
        Get dependency asset keys for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        - manifest: Complete dbt manifest dictionary
        
        Returns:
        Iterable of AssetKey objects representing dependencies
        """
    
    def get_asset_check_spec(
        self, 
        asset_spec: AssetSpec, 
        manifest: Mapping[str, Any], 
        unique_id: str, 
        project: Optional["DbtProject"]
    ) -> Optional[AssetCheckSpec]:
        """
        Get AssetCheckSpec for a dbt test.
        
        Parameters:
        - asset_spec: AssetSpec for the target asset
        - manifest: Complete dbt manifest dictionary
        - unique_id: Unique ID of the dbt test
        - project: DbtProject instance (optional)
        
        Returns:
        AssetCheckSpec object for the dbt test or None
        """
    
    def get_partition_mapping(
        self, 
        dbt_resource_props: Mapping[str, Any], 
        dbt_parent_resource_props: Mapping[str, Any]
    ) -> Optional[PartitionMapping]:
        """
        Get partition mapping between assets.
        
        Parameters:
        - dbt_resource_props: Child dbt resource properties
        - dbt_parent_resource_props: Parent dbt resource properties
        
        Returns:
        PartitionMapping or None
        """
    
    def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
        """
        Get description for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        Description string
        """
    
    def get_code_version(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
        """
        Get code version for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        Code version string or None
        """
    
    def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]:
        """
        Get owners for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        List of owner strings or None
        """
    
    def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
        """
        Get automation condition for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        AutomationCondition or None
        """
    
    def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]:
        """
        Get partitions definition for a dbt resource.
        
        Parameters:
        - dbt_resource_props: dbt resource properties from manifest
        
        Returns:
        PartitionsDefinition or None
        """

Translator Settings

DagsterDbtTranslatorSettings

Configuration settings that control translator behavior and feature enablement.

@dataclass(frozen=True)
class DagsterDbtTranslatorSettings(Resolvable):
    """
    Settings for controlling DagsterDbtTranslator behavior.
    
    Attributes:
    - enable_asset_checks: Whether to create asset checks from dbt tests
    - enable_duplicate_source_asset_keys: Allow duplicate asset keys for sources
    - enable_code_references: Whether to include code references in metadata
    - enable_dbt_selection_by_name: Enable selection by name instead of unique_id
    - enable_source_tests_as_checks: Create checks for source tests
    """
    
    enable_asset_checks: bool = True
    enable_duplicate_source_asset_keys: bool = False
    enable_code_references: bool = False
    enable_dbt_selection_by_name: bool = False
    enable_source_tests_as_checks: bool = False

Usage Examples

Custom Asset Key Generation

from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator

class CustomAssetKeyTranslator(DagsterDbtTranslator):
    def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
        """Generate custom asset keys with database prefix."""
        database = dbt_resource_props.get("database", "default")
        schema = dbt_resource_props.get("schema", "default")
        name = dbt_resource_props["name"]
        
        return AssetKey([database, schema, name])
    
    def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
        """Group assets by dbt package."""
        package_name = dbt_resource_props.get("package_name")
        if package_name and package_name != "my_project":
            return f"dbt_package_{package_name}"
        
        # Use directory structure for main project
        fqn = dbt_resource_props.get("fqn", [])
        if len(fqn) > 1:
            return fqn[1]  # First subdirectory
        
        return "default"

# Use custom translator
from dagster_dbt import dbt_assets

@dbt_assets(
    manifest="./target/manifest.json",
    dagster_dbt_translator=CustomAssetKeyTranslator()
)
def my_dbt_assets(context, dbt):
    yield from dbt.cli(["build"], context=context).stream()

Environment-Specific Translation

import os
from dagster_dbt import DagsterDbtTranslator

class EnvironmentDbtTranslator(DagsterDbtTranslator):
    def __init__(self, environment: str = "dev"):
        self.environment = environment
    
    def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
        """Prefix asset keys with environment."""
        base_key = super().get_asset_key(dbt_resource_props)
        return AssetKey([self.environment] + list(base_key.path))
    
    def get_metadata(self, dbt_resource_props: dict) -> dict:
        """Add environment metadata."""
        metadata = super().get_metadata(dbt_resource_props)
        metadata["environment"] = self.environment
        metadata["target_database"] = dbt_resource_props.get("database")
        return metadata
    
    def get_tags(self, dbt_resource_props: dict) -> dict:
        """Add environment and materialization tags."""
        tags = super().get_tags(dbt_resource_props)
        tags["environment"] = self.environment
        
        config = dbt_resource_props.get("config", {})
        if "materialized" in config:
            tags["materialization"] = config["materialized"]
        
        return tags

# Use environment-specific translator
environment = os.getenv("DAGSTER_ENV", "dev")
translator = EnvironmentDbtTranslator(environment=environment)

@dbt_assets(
    manifest="./target/manifest.json",
    dagster_dbt_translator=translator
)
def environment_dbt_assets(context, dbt):
    yield from dbt.cli(["build"], context=context).stream()

Freshness Policy Translation

from dagster import FreshnessPolicy
from dagster_dbt import DagsterDbtTranslator
from datetime import timedelta

class FreshnessPolicyTranslator(DagsterDbtTranslator):
    def get_freshness_policy(self, dbt_resource_props: dict) -> Optional[FreshnessPolicy]:
        """Set freshness policies based on dbt configuration."""
        config = dbt_resource_props.get("config", {})
        
        # Check for custom freshness configuration
        if "dagster_freshness_policy" in config:
            policy_config = config["dagster_freshness_policy"]
            return FreshnessPolicy(
                maximum_lag_minutes=policy_config.get("maximum_lag_minutes", 60),
                cron_schedule=policy_config.get("cron_schedule")
            )
        
        # Set default policies based on materialization
        materialized = config.get("materialized")
        if materialized == "incremental":
            return FreshnessPolicy(maximum_lag_minutes=30)
        elif materialized == "snapshot":
            return FreshnessPolicy(maximum_lag_minutes=60 * 24)  # 24 hours
        
        return None
    
    def get_auto_materialize_policy(self, dbt_resource_props: dict) -> Optional[AutoMaterializePolicy]:
        """Set auto-materialize policies for specific models."""
        tags = dbt_resource_props.get("tags", [])
        
        if "auto_materialize" in tags:
            return AutoMaterializePolicy.eager()
        elif "lazy_materialize" in tags:
            return AutoMaterializePolicy.lazy()
        
        return None

@dbt_assets(
    manifest="./target/manifest.json",
    dagster_dbt_translator=FreshnessPolicyTranslator()
)
def fresh_dbt_assets(context, dbt):
    yield from dbt.cli(["build"], context=context).stream()

Multi-Tenant Translation

from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator

class MultiTenantTranslator(DagsterDbtTranslator):
    def __init__(self, tenant_id: str):
        self.tenant_id = tenant_id
    
    def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
        """Create tenant-specific asset keys."""
        base_key = super().get_asset_key(dbt_resource_props)
        return AssetKey([self.tenant_id] + list(base_key.path))
    
    def get_metadata(self, dbt_resource_props: dict) -> dict:
        """Add tenant metadata."""
        metadata = super().get_metadata(dbt_resource_props)
        metadata.update({
            "tenant_id": self.tenant_id,
            "tenant_database": f"{self.tenant_id}_{dbt_resource_props.get('database', 'default')}",
            "tenant_schema": f"{self.tenant_id}_{dbt_resource_props.get('schema', 'default')}"
        })
        return metadata
    
    def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
        """Group by tenant and model type."""
        base_group = super().get_group_name(dbt_resource_props)
        return f"{self.tenant_id}_{base_group}" if base_group else self.tenant_id

# Create tenant-specific assets
def create_tenant_assets(tenant_id: str):
    translator = MultiTenantTranslator(tenant_id=tenant_id)
    
    @dbt_assets(
        name=f"{tenant_id}_dbt_assets",
        manifest=f"./tenants/{tenant_id}/target/manifest.json",
        dagster_dbt_translator=translator
    )
    def tenant_dbt_assets(context, dbt):
        yield from dbt.cli(
            ["build", "--target", tenant_id], 
            context=context
        ).stream()
    
    return tenant_dbt_assets

# Create assets for multiple tenants
tenant_assets = [
    create_tenant_assets("client_a"),
    create_tenant_assets("client_b"),
    create_tenant_assets("client_c")
]

Advanced Translation Patterns

Dependency Override

from dagster_dbt import DagsterDbtTranslator

class DependencyOverrideTranslator(DagsterDbtTranslator):
    def get_deps_asset_keys(self, dbt_resource_props: dict, manifest: dict) -> Iterable[AssetKey]:
        """Override dependencies for specific models."""
        model_name = dbt_resource_props["name"]
        
        # Custom dependency logic for specific models
        if model_name == "critical_summary":
            # Force dependency on external data source
            yield AssetKey(["external", "data_feed"])
        
        # Include normal dbt dependencies
        yield from super().get_deps_asset_keys(dbt_resource_props, manifest)

Code Reference Enhancement

from dagster_dbt import DagsterDbtTranslator

class CodeReferenceTranslator(DagsterDbtTranslator):
    def get_metadata(self, dbt_resource_props: dict) -> dict:
        """Enhanced code references and documentation."""
        metadata = super().get_metadata(dbt_resource_props)
        
        # Add enhanced code references
        original_file_path = dbt_resource_props.get("original_file_path")
        if original_file_path:
            metadata["dbt_model_path"] = original_file_path
            metadata["github_link"] = f"https://github.com/myorg/dbt-project/blob/main/{original_file_path}"
        
        # Add documentation metadata
        description = dbt_resource_props.get("description")
        if description:
            metadata["model_description"] = description
        
        # Add column information
        columns = dbt_resource_props.get("columns", {})
        if columns:
            metadata["column_count"] = len(columns)
            metadata["documented_columns"] = len([
                col for col in columns.values() 
                if col.get("description")
            ])
        
        return metadata

Type Definitions

from dagster import AssetKey, AssetSpec, FreshnessPolicy, AutoMaterializePolicy
from typing import Mapping, Any, Optional, Iterable

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json