A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys. The translation system provides a flexible interface for controlling how dbt models, tests, and sources are represented as Dagster assets.
Main translator class that maps dbt resources to Dagster asset specifications.
class DagsterDbtTranslator:
"""
Maps dbt resources to Dagster assets with customizable translation logic.
This class provides methods to control how dbt models, sources, and tests
are converted into Dagster assets, including asset keys, metadata, groups,
and other asset properties.
"""
def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...
def get_asset_spec(
self,
manifest: Mapping[str, Any],
unique_id: str,
project: Optional["DbtProject"]
) -> AssetSpec:
"""
Get AssetSpec for a dbt resource.
Parameters:
- manifest: Complete dbt manifest dictionary
- unique_id: Unique ID of the dbt resource
- project: DbtProject instance (optional)
Returns:
AssetSpec object for the dbt resource
"""
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""
Get asset key for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
AssetKey for the dbt resource
"""
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""
Get group name for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Group name string or None
"""
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Get metadata for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Dictionary of Dagster metadata
"""
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
"""
Get tags for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Dictionary of asset tags
"""
def get_freshness_policy(
self,
dbt_resource_props: Mapping[str, Any]
) -> Optional[FreshnessPolicy]:
"""
Get freshness policy for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
FreshnessPolicy object or None
"""
def get_auto_materialize_policy(
self,
dbt_resource_props: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
"""
Get auto-materialize policy for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
AutoMaterializePolicy object or None
"""
def get_deps_asset_keys(
self,
dbt_resource_props: Mapping[str, Any],
manifest: Mapping[str, Any]
) -> Iterable[AssetKey]:
"""
Get dependency asset keys for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
- manifest: Complete dbt manifest dictionary
Returns:
Iterable of AssetKey objects representing dependencies
"""
def get_asset_check_spec(
self,
asset_spec: AssetSpec,
manifest: Mapping[str, Any],
unique_id: str,
project: Optional["DbtProject"]
) -> Optional[AssetCheckSpec]:
"""
Get AssetCheckSpec for a dbt test.
Parameters:
- asset_spec: AssetSpec for the target asset
- manifest: Complete dbt manifest dictionary
- unique_id: Unique ID of the dbt test
- project: DbtProject instance (optional)
Returns:
AssetCheckSpec object for the dbt test or None
"""
def get_partition_mapping(
self,
dbt_resource_props: Mapping[str, Any],
dbt_parent_resource_props: Mapping[str, Any]
) -> Optional[PartitionMapping]:
"""
Get partition mapping between assets.
Parameters:
- dbt_resource_props: Child dbt resource properties
- dbt_parent_resource_props: Parent dbt resource properties
Returns:
PartitionMapping or None
"""
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
"""
Get description for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Description string
"""
def get_code_version(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""
Get code version for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Code version string or None
"""
def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]:
"""
Get owners for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
List of owner strings or None
"""
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
"""
Get automation condition for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
AutomationCondition or None
"""
def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]:
"""
Get partitions definition for a dbt resource.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
PartitionsDefinition or None
"""Configuration settings that control translator behavior and feature enablement.
@dataclass(frozen=True)
class DagsterDbtTranslatorSettings(Resolvable):
"""
Settings for controlling DagsterDbtTranslator behavior.
Attributes:
- enable_asset_checks: Whether to create asset checks from dbt tests
- enable_duplicate_source_asset_keys: Allow duplicate asset keys for sources
- enable_code_references: Whether to include code references in metadata
- enable_dbt_selection_by_name: Enable selection by name instead of unique_id
- enable_source_tests_as_checks: Create checks for source tests
"""
enable_asset_checks: bool = True
enable_duplicate_source_asset_keys: bool = False
enable_code_references: bool = False
enable_dbt_selection_by_name: bool = False
enable_source_tests_as_checks: bool = Falsefrom dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomAssetKeyTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
"""Generate custom asset keys with database prefix."""
database = dbt_resource_props.get("database", "default")
schema = dbt_resource_props.get("schema", "default")
name = dbt_resource_props["name"]
return AssetKey([database, schema, name])
def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
"""Group assets by dbt package."""
package_name = dbt_resource_props.get("package_name")
if package_name and package_name != "my_project":
return f"dbt_package_{package_name}"
# Use directory structure for main project
fqn = dbt_resource_props.get("fqn", [])
if len(fqn) > 1:
return fqn[1] # First subdirectory
return "default"
# Use custom translator
from dagster_dbt import dbt_assets
@dbt_assets(
manifest="./target/manifest.json",
dagster_dbt_translator=CustomAssetKeyTranslator()
)
def my_dbt_assets(context, dbt):
yield from dbt.cli(["build"], context=context).stream()import os
from dagster_dbt import DagsterDbtTranslator
class EnvironmentDbtTranslator(DagsterDbtTranslator):
def __init__(self, environment: str = "dev"):
self.environment = environment
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
"""Prefix asset keys with environment."""
base_key = super().get_asset_key(dbt_resource_props)
return AssetKey([self.environment] + list(base_key.path))
def get_metadata(self, dbt_resource_props: dict) -> dict:
"""Add environment metadata."""
metadata = super().get_metadata(dbt_resource_props)
metadata["environment"] = self.environment
metadata["target_database"] = dbt_resource_props.get("database")
return metadata
def get_tags(self, dbt_resource_props: dict) -> dict:
"""Add environment and materialization tags."""
tags = super().get_tags(dbt_resource_props)
tags["environment"] = self.environment
config = dbt_resource_props.get("config", {})
if "materialized" in config:
tags["materialization"] = config["materialized"]
return tags
# Use environment-specific translator
environment = os.getenv("DAGSTER_ENV", "dev")
translator = EnvironmentDbtTranslator(environment=environment)
@dbt_assets(
manifest="./target/manifest.json",
dagster_dbt_translator=translator
)
def environment_dbt_assets(context, dbt):
yield from dbt.cli(["build"], context=context).stream()from dagster import FreshnessPolicy
from dagster_dbt import DagsterDbtTranslator
from datetime import timedelta
class FreshnessPolicyTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, dbt_resource_props: dict) -> Optional[FreshnessPolicy]:
"""Set freshness policies based on dbt configuration."""
config = dbt_resource_props.get("config", {})
# Check for custom freshness configuration
if "dagster_freshness_policy" in config:
policy_config = config["dagster_freshness_policy"]
return FreshnessPolicy(
maximum_lag_minutes=policy_config.get("maximum_lag_minutes", 60),
cron_schedule=policy_config.get("cron_schedule")
)
# Set default policies based on materialization
materialized = config.get("materialized")
if materialized == "incremental":
return FreshnessPolicy(maximum_lag_minutes=30)
elif materialized == "snapshot":
return FreshnessPolicy(maximum_lag_minutes=60 * 24) # 24 hours
return None
def get_auto_materialize_policy(self, dbt_resource_props: dict) -> Optional[AutoMaterializePolicy]:
"""Set auto-materialize policies for specific models."""
tags = dbt_resource_props.get("tags", [])
if "auto_materialize" in tags:
return AutoMaterializePolicy.eager()
elif "lazy_materialize" in tags:
return AutoMaterializePolicy.lazy()
return None
@dbt_assets(
manifest="./target/manifest.json",
dagster_dbt_translator=FreshnessPolicyTranslator()
)
def fresh_dbt_assets(context, dbt):
yield from dbt.cli(["build"], context=context).stream()from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class MultiTenantTranslator(DagsterDbtTranslator):
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
"""Create tenant-specific asset keys."""
base_key = super().get_asset_key(dbt_resource_props)
return AssetKey([self.tenant_id] + list(base_key.path))
def get_metadata(self, dbt_resource_props: dict) -> dict:
"""Add tenant metadata."""
metadata = super().get_metadata(dbt_resource_props)
metadata.update({
"tenant_id": self.tenant_id,
"tenant_database": f"{self.tenant_id}_{dbt_resource_props.get('database', 'default')}",
"tenant_schema": f"{self.tenant_id}_{dbt_resource_props.get('schema', 'default')}"
})
return metadata
def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
"""Group by tenant and model type."""
base_group = super().get_group_name(dbt_resource_props)
return f"{self.tenant_id}_{base_group}" if base_group else self.tenant_id
# Create tenant-specific assets
def create_tenant_assets(tenant_id: str):
translator = MultiTenantTranslator(tenant_id=tenant_id)
@dbt_assets(
name=f"{tenant_id}_dbt_assets",
manifest=f"./tenants/{tenant_id}/target/manifest.json",
dagster_dbt_translator=translator
)
def tenant_dbt_assets(context, dbt):
yield from dbt.cli(
["build", "--target", tenant_id],
context=context
).stream()
return tenant_dbt_assets
# Create assets for multiple tenants
tenant_assets = [
create_tenant_assets("client_a"),
create_tenant_assets("client_b"),
create_tenant_assets("client_c")
]from dagster_dbt import DagsterDbtTranslator
class DependencyOverrideTranslator(DagsterDbtTranslator):
def get_deps_asset_keys(self, dbt_resource_props: dict, manifest: dict) -> Iterable[AssetKey]:
"""Override dependencies for specific models."""
model_name = dbt_resource_props["name"]
# Custom dependency logic for specific models
if model_name == "critical_summary":
# Force dependency on external data source
yield AssetKey(["external", "data_feed"])
# Include normal dbt dependencies
yield from super().get_deps_asset_keys(dbt_resource_props, manifest)from dagster_dbt import DagsterDbtTranslator
class CodeReferenceTranslator(DagsterDbtTranslator):
def get_metadata(self, dbt_resource_props: dict) -> dict:
"""Enhanced code references and documentation."""
metadata = super().get_metadata(dbt_resource_props)
# Add enhanced code references
original_file_path = dbt_resource_props.get("original_file_path")
if original_file_path:
metadata["dbt_model_path"] = original_file_path
metadata["github_link"] = f"https://github.com/myorg/dbt-project/blob/main/{original_file_path}"
# Add documentation metadata
description = dbt_resource_props.get("description")
if description:
metadata["model_description"] = description
# Add column information
columns = dbt_resource_props.get("columns", {})
if columns:
metadata["column_count"] = len(columns)
metadata["documented_columns"] = len([
col for col in columns.values()
if col.get("description")
])
return metadatafrom dagster import AssetKey, AssetSpec, FreshnessPolicy, AutoMaterializePolicy
from typing import Mapping, Any, Optional, IterableInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt