CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

asset-creation.mddocs/

Asset Creation and Management

Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities. This module provides the primary interface for integrating dbt models into Dagster pipelines as first-class assets.

Capabilities

Asset Decorators

dbt_assets

Creates a set of Dagster assets from dbt models defined in a manifest.json file. This is the primary decorator for local dbt integration.

def dbt_assets(
    manifest: DbtManifestParam,
    select: str = DBT_DEFAULT_SELECT,
    exclude: str = DBT_DEFAULT_EXCLUDE,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    backfill_policy: Optional[BackfillPolicy] = None,
    op_tags: Optional[dict] = None,
    required_resource_keys: Optional[set] = None,
    **kwargs
) -> Callable:
    """
    Create Dagster assets from dbt models.
    
    Parameters:
    - manifest: Path to dbt manifest.json or parsed manifest dict
    - select: dbt selection string for included models (default: "fqn:*")
    - exclude: dbt selection string for excluded models (default: "")
    - dagster_dbt_translator: Custom translator for dbt->Dagster mapping
    - backfill_policy: Backfill policy for the assets
    - op_tags: Tags to apply to the underlying op
    - required_resource_keys: Additional required resource keys
    
    Returns:
    Decorated function that yields asset materializations
    """

Usage Example

from dagster import AssetExecutionContext, Definitions
from dagster_dbt import dbt_assets, DbtCliResource

dbt_resource = DbtCliResource(project_dir="./my_dbt_project")

@dbt_assets(
    manifest="./my_dbt_project/target/manifest.json",
    select="tag:daily",
    exclude="tag:deprecated"
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Asset Specifications

build_dbt_asset_specs

Builds a sequence of AssetSpec objects from a dbt manifest without creating executable assets.

def build_dbt_asset_specs(
    manifest: DbtManifestParam,
    select: str = DBT_DEFAULT_SELECT,
    exclude: str = DBT_DEFAULT_EXCLUDE,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    **kwargs
) -> Sequence[AssetSpec]:
    """
    Build asset specifications from dbt manifest.
    
    Parameters:
    - manifest: Path to dbt manifest.json or parsed manifest dict
    - select: dbt selection string for included models
    - exclude: dbt selection string for excluded models
    - dagster_dbt_translator: Custom translator for dbt->Dagster mapping
    - **kwargs: Additional arguments passed to AssetSpec creation
    
    Returns:
    Sequence of AssetSpec objects representing dbt models
    """

Asset Selection

build_dbt_asset_selection

Creates a Dagster AssetSelection from dbt assets and selection criteria.

def build_dbt_asset_selection(
    dbt_assets: Sequence[AssetsDefinition],
    dbt_select: str = DBT_DEFAULT_SELECT,
    dbt_exclude: Optional[str] = DBT_DEFAULT_EXCLUDE,
    dbt_selector: Optional[str] = DBT_DEFAULT_SELECTOR
) -> AssetSelection:
    """
    Build asset selection from dbt assets and selection criteria.
    
    Parameters:
    - dbt_assets: Sequence of dbt AssetsDefinition objects
    - dbt_select: dbt selection string (default: "fqn:*")
    - dbt_exclude: dbt exclusion string (default: "")
    - dbt_selector: dbt selector string (default: "")
    
    Returns:
    AssetSelection object for use in Dagster job/schedule definitions
    """

build_schedule_from_dbt_selection

Creates a schedule definition from a dbt selection.

def build_schedule_from_dbt_selection(
    dbt_assets: Sequence[AssetsDefinition],
    job_name: str,
    cron_schedule: str,
    dbt_select: str = DBT_DEFAULT_SELECT,
    dbt_exclude: Optional[str] = DBT_DEFAULT_EXCLUDE,
    dbt_selector: str = DBT_DEFAULT_SELECTOR,
    schedule_name: Optional[str] = None,
    tags: Optional[Mapping[str, str]] = None,
    config: Optional[RunConfig] = None,
    execution_timezone: Optional[str] = None,
    default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED
) -> ScheduleDefinition:
    """
    Build schedule from dbt selection criteria.
    
    Parameters:
    - dbt_assets: Sequence of dbt assets definitions
    - job_name: Name for the generated job
    - cron_schedule: Cron expression for schedule timing
    - dbt_select: dbt selection string (default: "fqn:*")
    - dbt_exclude: dbt exclusion string (default: "")
    - dbt_selector: dbt selector string (default: "")
    - schedule_name: Name for the schedule (optional)
    - tags: Tags to apply to the job (optional)
    - config: Run configuration (optional)
    - execution_timezone: Timezone for schedule execution (optional)
    - default_status: Default schedule status (default: STOPPED)
    
    Returns:
    ScheduleDefinition for the selected dbt models
    """

Asset Key Utilities

get_asset_key_for_model

Generates the Dagster asset key for a dbt model.

def get_asset_key_for_model(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
    """
    Get asset key for dbt model.
    
    Parameters:
    - dbt_resource_props: dbt model properties from manifest
    
    Returns:
    AssetKey for the dbt model
    """

get_asset_key_for_source

Generates the Dagster asset key for a dbt source.

def get_asset_key_for_source(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
    """
    Get asset key for dbt source.
    
    Parameters:
    - dbt_resource_props: dbt source properties from manifest
    
    Returns:
    AssetKey for the dbt source
    """

get_asset_keys_by_output_name_for_source

Gets all asset keys associated with a dbt source, organized by output name.

def get_asset_keys_by_output_name_for_source(
    dbt_resource_props: Mapping[str, Any],
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None
) -> Mapping[str, AssetKey]:
    """
    Get asset keys by output name for dbt source.
    
    Parameters:
    - dbt_resource_props: dbt source properties from manifest
    - dagster_dbt_translator: Custom translator for asset key mapping
    
    Returns:
    Mapping from output names to AssetKey objects
    """

Metadata Utilities

default_metadata_from_dbt_resource_props

Generates default Dagster metadata from dbt resource properties.

def default_metadata_from_dbt_resource_props(
    dbt_resource_props: Mapping[str, Any]
) -> Mapping[str, Any]:
    """
    Generate default metadata from dbt resource properties.
    
    Parameters:
    - dbt_resource_props: dbt resource properties from manifest
    
    Returns:
    Dictionary of Dagster metadata
    """

default_group_from_dbt_resource_props

Determines the default group name for a dbt resource.

def default_group_from_dbt_resource_props(
    dbt_resource_props: Mapping[str, Any]
) -> Optional[str]:
    """
    Get default group name from dbt resource properties.
    
    Parameters:
    - dbt_resource_props: dbt resource properties from manifest
    
    Returns:
    Group name string or None
    """

group_from_dbt_resource_props_fallback_to_directory

Gets group name with fallback to directory-based naming.

def group_from_dbt_resource_props_fallback_to_directory(
    dbt_resource_props: Mapping[str, Any]
) -> Optional[str]:
    """
    Get group name with directory fallback.
    
    Parameters:
    - dbt_resource_props: dbt resource properties from manifest
    
    Returns:
    Group name string or None
    """

Type Definitions

# Type alias for manifest parameter
DbtManifestParam = Union[Mapping[str, Any], str, Path]

Constants

  • DBT_DEFAULT_SELECT = "fqn:*" - Default selection includes all models
  • DBT_DEFAULT_EXCLUDE = "" - Default exclusion is empty
  • ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"] - dbt resource types treated as assets

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json