or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md
tile.json

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-dbt@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-dbt@0.27.0

index.mddocs/

Dagster dbt Integration

A comprehensive integration library that enables orchestrating dbt models as Dagster assets with full lineage tracking, metadata propagation, and observability features. This package bridges the gap between dbt's SQL-based transformation capabilities and Dagster's data orchestration framework, allowing data engineers to leverage both tools' strengths in modern data stacks.

Package Information

  • Package Name: dagster-dbt
  • Language: Python
  • Installation: pip install dagster-dbt

Core Imports

from dagster_dbt import dbt_assets, DbtCliResource

For dbt Cloud integration:

from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job

For dbt Cloud v2 (recommended):

from dagster_dbt.cloud_v2 import DbtCloudCredentials, DbtCloudWorkspace, dbt_cloud_assets

Basic Usage

Local dbt Project with Assets

from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, dbt_assets

# Define dbt CLI resource
dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")

# Create assets from dbt models
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Define the Dagster code location
defs = Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": dbt_cli_resource}
)

dbt Cloud Integration

from dagster import Definitions
from dagster_dbt.cloud_v2 import (
    DbtCloudCredentials,
    DbtCloudWorkspace,
    dbt_cloud_assets
)

# Configure dbt Cloud credentials
credentials = DbtCloudCredentials(
    account_id=12345,
    token="your_token",
    access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
    credentials=credentials,
    project_id=67890
)

# Create assets from dbt Cloud job
@dbt_cloud_assets(
    job_id=123,
    workspace=workspace
)
def my_dbt_cloud_assets(context):
    yield from workspace.run_job(job_id=123, context=context)

defs = Definitions(assets=[my_dbt_cloud_assets])

Architecture

The dagster-dbt integration follows a layered architecture:

  • Asset Layer: @dbt_assets decorators convert dbt models into Dagster assets
  • Resource Layer: DbtCliResource and Cloud resources handle execution and communication
  • Translation Layer: DagsterDbtTranslator maps dbt metadata to Dagster concepts
  • Project Layer: DbtProject manages dbt project structure and manifest parsing
  • Event Layer: CLI event handlers provide execution observability

This design enables seamless integration while maintaining separation of concerns between dbt's transformation logic and Dagster's orchestration capabilities.

Capabilities

Asset Creation and Management

Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities.

def dbt_assets(
    manifest: DbtManifestParam,
    select: str = "fqn:*",
    exclude: Optional[str] = "",
    selector: Optional[str] = "",
    name: Optional[str] = None,
    io_manager_key: Optional[str] = None,
    partitions_def: Optional[PartitionsDefinition] = None,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    backfill_policy: Optional[BackfillPolicy] = None,
    op_tags: Optional[Mapping[str, Any]] = None,
    required_resource_keys: Optional[set[str]] = None,
    project: Optional[DbtProject] = None,
    retry_policy: Optional[RetryPolicy] = None,
    pool: Optional[str] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...

def build_dbt_asset_specs(
    manifest: DbtManifestParam,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    select: str = "fqn:*",
    exclude: Optional[str] = "",
    selector: Optional[str] = "",
    project: Optional[DbtProject] = None,
) -> Sequence[AssetSpec]: ...

def build_dbt_asset_selection(
    dbt_assets: Sequence[AssetsDefinition],
    dbt_select: str = "fqn:*",
    dbt_exclude: Optional[str] = "",
    dbt_selector: Optional[str] = "",
) -> AssetSelection: ...

Asset Creation

CLI Resource and Execution

Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management.

class DbtCliResource(ConfigurableResource):
    project_dir: str
    global_config_flags: list[str] = []
    profiles_dir: Optional[str] = None
    profile: Optional[str] = None
    target: Optional[str] = None
    dbt_executable: str = "dbt"
    state_path: Optional[str] = None
    
    def cli(
        self,
        args: Sequence[str],
        raise_on_error: bool = True,
        manifest: Optional[DbtManifestParam] = None,
        dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
        context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,
        target_path: Optional[Path] = None,
    ) -> DbtCliInvocation: ...

CLI Resource

dbt Cloud Integration (Legacy)

Original dbt Cloud integration providing job execution, asset loading, and operations.

def dbt_cloud_resource(api_token: str, account_id: int): ...
def load_assets_from_dbt_cloud_job(dbt_cloud: ResourceDefinition, job_id: int): ...
def dbt_cloud_run_op(context: OpExecutionContext, dbt_cloud: DbtCloudResource): ...

dbt Cloud Legacy

dbt Cloud v2 Integration

Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors.

@dataclass
class DbtCloudCredentials(Resolvable):
    api_token: str
    account_id: int

@dataclass
class DbtCloudWorkspace(ConfigurableResource):
    credentials: DbtCloudCredentials
    project_id: int
    environment_id: int

def dbt_cloud_assets(
    job_id: int,
    workspace: DbtCloudWorkspace,
    name: Optional[str] = None,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    partitions_def: Optional[PartitionsDefinition] = None,
) -> Callable[..., AssetsDefinition]: ...

dbt Cloud v2

Translation System

Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys.

class DagsterDbtTranslator:
    def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...
    
    def get_asset_spec(
        self, 
        manifest: Mapping[str, Any], 
        unique_id: str, 
        project: Optional[DbtProject]
    ) -> AssetSpec: ...
    
    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: ...
    def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...

Translation System

Project Management

dbt project handling, manifest parsing, and project preparation for integration with Dagster.

@record_custom
class DbtProject(IHaveNew):
    name: str
    project_dir: Path
    target_path: Path
    profiles_dir: Path
    profile: Optional[str]
    target: Optional[str]
    manifest_path: Path
    packaged_project_dir: Optional[Path]
    state_path: Optional[Path]
    has_uninstalled_deps: bool
    preparer: DbtProjectPreparer
    
    def prepare_if_dev(self) -> None: ...

class DbtProjectPreparer:
    def prepare(self) -> None: ...

class DagsterDbtProjectPreparer(DbtProjectPreparer):
    pass

Project Management

Component System

Integration with Dagster's component system for declarative dbt project configuration.

class DbtProjectComponent(Component):
    dbt_project_path: str
    dbt_profiles_path: Optional[str] = None

Component System

Utilities and Helpers

Utility functions for asset selection, naming conventions, metadata handling, and manifest operations.

def get_asset_key_for_model(dbt_assets: Sequence[AssetsDefinition], model_name: str) -> AssetKey: ...
def get_asset_key_for_source(dbt_assets: Sequence[AssetsDefinition], source_name: str) -> AssetKey: ...
def get_asset_keys_by_output_name_for_source(
    dbt_assets: Sequence[AssetsDefinition], 
    source_name: str
) -> Mapping[str, AssetKey]: ...

def default_metadata_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...
def default_group_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...
def group_from_dbt_resource_props_fallback_to_directory(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...

Utilities

Error Handling

Comprehensive exception hierarchy for dbt integration error handling and debugging.

class DagsterDbtError(Failure, ABC): ...
class DagsterDbtCliRuntimeError(DagsterDbtError, ABC): ...
class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError, DagsterInvariantViolationError): ...
class DagsterDbtProjectNotFoundError(DagsterDbtError): ...
class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError): ...
class DagsterDbtManifestNotFoundError(DagsterDbtError): ...
class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError): ...

Error Handling

Freshness Checks

Build asset checks for dbt source freshness validation.

def build_freshness_checks_from_dbt_assets(
    dbt_assets: Sequence[AssetsDefinition]
) -> Sequence[AssetChecksDefinition]: ...

Freshness Checks

Asset Selection

Advanced asset selection capabilities using dbt manifest information.

class DbtManifestAssetSelection(AssetSelection):
    manifest: Mapping[str, Any]
    select: str
    exclude: str
    selector: str
    dagster_dbt_translator: DagsterDbtTranslator
    project: Optional[DbtProject]

Types

# Type aliases for common dbt-related types
DbtManifestParam = Union[Mapping[str, Any], str, Path]

# CLI Event Message classes
class DbtCliEventMessage(ABC):
    raw_event: dict[str, Any]
    
    def to_default_asset_events(
        self,
        manifest: DbtManifestParam,
        dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
        context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,
        target_path: Optional[Path] = None,
        project: Optional[DbtProject] = None,
    ) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult, AssetCheckEvaluation]]: ...

class DbtCliInvocation:
    process: subprocess.Popen
    manifest: Mapping[str, Any]
    dagster_dbt_translator: DagsterDbtTranslator
    project_dir: Path
    target_path: Path
    raise_on_error: bool
    
    def wait(self) -> "DbtCliInvocation": ...
    def is_successful(self) -> bool: ...
    def stream(self) -> Iterator[DbtCliEventMessage]: ...
    def get_artifact(self, artifact: Literal["manifest.json", "catalog.json", "run_results.json", "sources.json"]) -> dict[str, Any]: ...

Constants

Metadata Keys

  • DAGSTER_DBT_MANIFEST_METADATA_KEY = "dagster_dbt/manifest"
  • DAGSTER_DBT_TRANSLATOR_METADATA_KEY = "dagster_dbt/dagster_dbt_translator"
  • DAGSTER_DBT_PROJECT_METADATA_KEY = "dagster_dbt/project"
  • DAGSTER_DBT_SELECT_METADATA_KEY = "dagster_dbt/select"
  • DAGSTER_DBT_EXCLUDE_METADATA_KEY = "dagster_dbt/exclude"
  • DAGSTER_DBT_SELECTOR_METADATA_KEY = "dagster_dbt/selector"
  • DAGSTER_DBT_UNIQUE_ID_METADATA_KEY = "dagster_dbt/unique_id"

Selection Defaults

  • DBT_DEFAULT_SELECT = "fqn:*"
  • DBT_DEFAULT_EXCLUDE = ""
  • DBT_DEFAULT_SELECTOR = ""

Environment Variables

  • DBT_INDIRECT_SELECTION_ENV = "DBT_INDIRECT_SELECTION"
  • DBT_EMPTY_INDIRECT_SELECTION = "empty"

Asset Types

  • ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"]

Version Information

  • __version__ = "0.27.9"
  • DBT_CORE_VERSION_UPPER_BOUND = "1.11"