A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities. This module provides the primary interface for integrating dbt models into Dagster pipelines as first-class assets.
Creates a set of Dagster assets from dbt models defined in a manifest.json file. This is the primary decorator for local dbt integration.
def dbt_assets(
manifest: DbtManifestParam,
select: str = DBT_DEFAULT_SELECT,
exclude: str = DBT_DEFAULT_EXCLUDE,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[dict] = None,
required_resource_keys: Optional[set] = None,
**kwargs
) -> Callable:
"""
Create Dagster assets from dbt models.
Parameters:
- manifest: Path to dbt manifest.json or parsed manifest dict
- select: dbt selection string for included models (default: "fqn:*")
- exclude: dbt selection string for excluded models (default: "")
- dagster_dbt_translator: Custom translator for dbt->Dagster mapping
- backfill_policy: Backfill policy for the assets
- op_tags: Tags to apply to the underlying op
- required_resource_keys: Additional required resource keys
Returns:
Decorated function that yields asset materializations
"""from dagster import AssetExecutionContext, Definitions
from dagster_dbt import dbt_assets, DbtCliResource
dbt_resource = DbtCliResource(project_dir="./my_dbt_project")
@dbt_assets(
manifest="./my_dbt_project/target/manifest.json",
select="tag:daily",
exclude="tag:deprecated"
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()Builds a sequence of AssetSpec objects from a dbt manifest without creating executable assets.
def build_dbt_asset_specs(
manifest: DbtManifestParam,
select: str = DBT_DEFAULT_SELECT,
exclude: str = DBT_DEFAULT_EXCLUDE,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
**kwargs
) -> Sequence[AssetSpec]:
"""
Build asset specifications from dbt manifest.
Parameters:
- manifest: Path to dbt manifest.json or parsed manifest dict
- select: dbt selection string for included models
- exclude: dbt selection string for excluded models
- dagster_dbt_translator: Custom translator for dbt->Dagster mapping
- **kwargs: Additional arguments passed to AssetSpec creation
Returns:
Sequence of AssetSpec objects representing dbt models
"""Creates a Dagster AssetSelection from dbt assets and selection criteria.
def build_dbt_asset_selection(
dbt_assets: Sequence[AssetsDefinition],
dbt_select: str = DBT_DEFAULT_SELECT,
dbt_exclude: Optional[str] = DBT_DEFAULT_EXCLUDE,
dbt_selector: Optional[str] = DBT_DEFAULT_SELECTOR
) -> AssetSelection:
"""
Build asset selection from dbt assets and selection criteria.
Parameters:
- dbt_assets: Sequence of dbt AssetsDefinition objects
- dbt_select: dbt selection string (default: "fqn:*")
- dbt_exclude: dbt exclusion string (default: "")
- dbt_selector: dbt selector string (default: "")
Returns:
AssetSelection object for use in Dagster job/schedule definitions
"""Creates a schedule definition from a dbt selection.
def build_schedule_from_dbt_selection(
dbt_assets: Sequence[AssetsDefinition],
job_name: str,
cron_schedule: str,
dbt_select: str = DBT_DEFAULT_SELECT,
dbt_exclude: Optional[str] = DBT_DEFAULT_EXCLUDE,
dbt_selector: str = DBT_DEFAULT_SELECTOR,
schedule_name: Optional[str] = None,
tags: Optional[Mapping[str, str]] = None,
config: Optional[RunConfig] = None,
execution_timezone: Optional[str] = None,
default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED
) -> ScheduleDefinition:
"""
Build schedule from dbt selection criteria.
Parameters:
- dbt_assets: Sequence of dbt assets definitions
- job_name: Name for the generated job
- cron_schedule: Cron expression for schedule timing
- dbt_select: dbt selection string (default: "fqn:*")
- dbt_exclude: dbt exclusion string (default: "")
- dbt_selector: dbt selector string (default: "")
- schedule_name: Name for the schedule (optional)
- tags: Tags to apply to the job (optional)
- config: Run configuration (optional)
- execution_timezone: Timezone for schedule execution (optional)
- default_status: Default schedule status (default: STOPPED)
Returns:
ScheduleDefinition for the selected dbt models
"""Generates the Dagster asset key for a dbt model.
def get_asset_key_for_model(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""
Get asset key for dbt model.
Parameters:
- dbt_resource_props: dbt model properties from manifest
Returns:
AssetKey for the dbt model
"""Generates the Dagster asset key for a dbt source.
def get_asset_key_for_source(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""
Get asset key for dbt source.
Parameters:
- dbt_resource_props: dbt source properties from manifest
Returns:
AssetKey for the dbt source
"""Gets all asset keys associated with a dbt source, organized by output name.
def get_asset_keys_by_output_name_for_source(
dbt_resource_props: Mapping[str, Any],
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None
) -> Mapping[str, AssetKey]:
"""
Get asset keys by output name for dbt source.
Parameters:
- dbt_resource_props: dbt source properties from manifest
- dagster_dbt_translator: Custom translator for asset key mapping
Returns:
Mapping from output names to AssetKey objects
"""Generates default Dagster metadata from dbt resource properties.
def default_metadata_from_dbt_resource_props(
dbt_resource_props: Mapping[str, Any]
) -> Mapping[str, Any]:
"""
Generate default metadata from dbt resource properties.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Dictionary of Dagster metadata
"""Determines the default group name for a dbt resource.
def default_group_from_dbt_resource_props(
dbt_resource_props: Mapping[str, Any]
) -> Optional[str]:
"""
Get default group name from dbt resource properties.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Group name string or None
"""Gets group name with fallback to directory-based naming.
def group_from_dbt_resource_props_fallback_to_directory(
dbt_resource_props: Mapping[str, Any]
) -> Optional[str]:
"""
Get group name with directory fallback.
Parameters:
- dbt_resource_props: dbt resource properties from manifest
Returns:
Group name string or None
"""# Type alias for manifest parameter
DbtManifestParam = Union[Mapping[str, Any], str, Path]DBT_DEFAULT_SELECT = "fqn:*" - Default selection includes all modelsDBT_DEFAULT_EXCLUDE = "" - Default exclusion is emptyASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"] - dbt resource types treated as assetsInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt