A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
npx @tessl/cli install tessl/pypi-dagster-dbt@0.27.0A comprehensive integration library that enables orchestrating dbt models as Dagster assets with full lineage tracking, metadata propagation, and observability features. This package bridges the gap between dbt's SQL-based transformation capabilities and Dagster's data orchestration framework, allowing data engineers to leverage both tools' strengths in modern data stacks.
pip install dagster-dbtfrom dagster_dbt import dbt_assets, DbtCliResourceFor dbt Cloud integration:
from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_jobFor dbt Cloud v2 (recommended):
from dagster_dbt.cloud_v2 import DbtCloudCredentials, DbtCloudWorkspace, dbt_cloud_assetsfrom dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, dbt_assets
# Define dbt CLI resource
dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")
# Create assets from dbt models
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Define the Dagster code location
defs = Definitions(
assets=[my_dbt_assets],
resources={"dbt": dbt_cli_resource}
)from dagster import Definitions
from dagster_dbt.cloud_v2 import (
DbtCloudCredentials,
DbtCloudWorkspace,
dbt_cloud_assets
)
# Configure dbt Cloud credentials
credentials = DbtCloudCredentials(
account_id=12345,
token="your_token",
access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
credentials=credentials,
project_id=67890
)
# Create assets from dbt Cloud job
@dbt_cloud_assets(
job_id=123,
workspace=workspace
)
def my_dbt_cloud_assets(context):
yield from workspace.run_job(job_id=123, context=context)
defs = Definitions(assets=[my_dbt_cloud_assets])The dagster-dbt integration follows a layered architecture:
@dbt_assets decorators convert dbt models into Dagster assetsDbtCliResource and Cloud resources handle execution and communicationDagsterDbtTranslator maps dbt metadata to Dagster conceptsDbtProject manages dbt project structure and manifest parsingThis design enables seamless integration while maintaining separation of concerns between dbt's transformation logic and Dagster's orchestration capabilities.
Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities.
def dbt_assets(
manifest: DbtManifestParam,
select: str = "fqn:*",
exclude: Optional[str] = "",
selector: Optional[str] = "",
name: Optional[str] = None,
io_manager_key: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[Mapping[str, Any]] = None,
required_resource_keys: Optional[set[str]] = None,
project: Optional[DbtProject] = None,
retry_policy: Optional[RetryPolicy] = None,
pool: Optional[str] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...
def build_dbt_asset_specs(
manifest: DbtManifestParam,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
select: str = "fqn:*",
exclude: Optional[str] = "",
selector: Optional[str] = "",
project: Optional[DbtProject] = None,
) -> Sequence[AssetSpec]: ...
def build_dbt_asset_selection(
dbt_assets: Sequence[AssetsDefinition],
dbt_select: str = "fqn:*",
dbt_exclude: Optional[str] = "",
dbt_selector: Optional[str] = "",
) -> AssetSelection: ...Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management.
class DbtCliResource(ConfigurableResource):
project_dir: str
global_config_flags: list[str] = []
profiles_dir: Optional[str] = None
profile: Optional[str] = None
target: Optional[str] = None
dbt_executable: str = "dbt"
state_path: Optional[str] = None
def cli(
self,
args: Sequence[str],
raise_on_error: bool = True,
manifest: Optional[DbtManifestParam] = None,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,
target_path: Optional[Path] = None,
) -> DbtCliInvocation: ...Original dbt Cloud integration providing job execution, asset loading, and operations.
def dbt_cloud_resource(api_token: str, account_id: int): ...
def load_assets_from_dbt_cloud_job(dbt_cloud: ResourceDefinition, job_id: int): ...
def dbt_cloud_run_op(context: OpExecutionContext, dbt_cloud: DbtCloudResource): ...Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors.
@dataclass
class DbtCloudCredentials(Resolvable):
api_token: str
account_id: int
@dataclass
class DbtCloudWorkspace(ConfigurableResource):
credentials: DbtCloudCredentials
project_id: int
environment_id: int
def dbt_cloud_assets(
job_id: int,
workspace: DbtCloudWorkspace,
name: Optional[str] = None,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
partitions_def: Optional[PartitionsDefinition] = None,
) -> Callable[..., AssetsDefinition]: ...Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys.
class DagsterDbtTranslator:
def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...
def get_asset_spec(
self,
manifest: Mapping[str, Any],
unique_id: str,
project: Optional[DbtProject]
) -> AssetSpec: ...
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: ...
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...dbt project handling, manifest parsing, and project preparation for integration with Dagster.
@record_custom
class DbtProject(IHaveNew):
name: str
project_dir: Path
target_path: Path
profiles_dir: Path
profile: Optional[str]
target: Optional[str]
manifest_path: Path
packaged_project_dir: Optional[Path]
state_path: Optional[Path]
has_uninstalled_deps: bool
preparer: DbtProjectPreparer
def prepare_if_dev(self) -> None: ...
class DbtProjectPreparer:
def prepare(self) -> None: ...
class DagsterDbtProjectPreparer(DbtProjectPreparer):
passIntegration with Dagster's component system for declarative dbt project configuration.
class DbtProjectComponent(Component):
dbt_project_path: str
dbt_profiles_path: Optional[str] = NoneUtility functions for asset selection, naming conventions, metadata handling, and manifest operations.
def get_asset_key_for_model(dbt_assets: Sequence[AssetsDefinition], model_name: str) -> AssetKey: ...
def get_asset_key_for_source(dbt_assets: Sequence[AssetsDefinition], source_name: str) -> AssetKey: ...
def get_asset_keys_by_output_name_for_source(
dbt_assets: Sequence[AssetsDefinition],
source_name: str
) -> Mapping[str, AssetKey]: ...
def default_metadata_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...
def default_group_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...
def group_from_dbt_resource_props_fallback_to_directory(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...Comprehensive exception hierarchy for dbt integration error handling and debugging.
class DagsterDbtError(Failure, ABC): ...
class DagsterDbtCliRuntimeError(DagsterDbtError, ABC): ...
class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError, DagsterInvariantViolationError): ...
class DagsterDbtProjectNotFoundError(DagsterDbtError): ...
class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError): ...
class DagsterDbtManifestNotFoundError(DagsterDbtError): ...
class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError): ...Build asset checks for dbt source freshness validation.
def build_freshness_checks_from_dbt_assets(
dbt_assets: Sequence[AssetsDefinition]
) -> Sequence[AssetChecksDefinition]: ...Advanced asset selection capabilities using dbt manifest information.
class DbtManifestAssetSelection(AssetSelection):
manifest: Mapping[str, Any]
select: str
exclude: str
selector: str
dagster_dbt_translator: DagsterDbtTranslator
project: Optional[DbtProject]# Type aliases for common dbt-related types
DbtManifestParam = Union[Mapping[str, Any], str, Path]
# CLI Event Message classes
class DbtCliEventMessage(ABC):
raw_event: dict[str, Any]
def to_default_asset_events(
self,
manifest: DbtManifestParam,
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,
target_path: Optional[Path] = None,
project: Optional[DbtProject] = None,
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult, AssetCheckEvaluation]]: ...
class DbtCliInvocation:
process: subprocess.Popen
manifest: Mapping[str, Any]
dagster_dbt_translator: DagsterDbtTranslator
project_dir: Path
target_path: Path
raise_on_error: bool
def wait(self) -> "DbtCliInvocation": ...
def is_successful(self) -> bool: ...
def stream(self) -> Iterator[DbtCliEventMessage]: ...
def get_artifact(self, artifact: Literal["manifest.json", "catalog.json", "run_results.json", "sources.json"]) -> dict[str, Any]: ...DAGSTER_DBT_MANIFEST_METADATA_KEY = "dagster_dbt/manifest"DAGSTER_DBT_TRANSLATOR_METADATA_KEY = "dagster_dbt/dagster_dbt_translator"DAGSTER_DBT_PROJECT_METADATA_KEY = "dagster_dbt/project"DAGSTER_DBT_SELECT_METADATA_KEY = "dagster_dbt/select"DAGSTER_DBT_EXCLUDE_METADATA_KEY = "dagster_dbt/exclude"DAGSTER_DBT_SELECTOR_METADATA_KEY = "dagster_dbt/selector"DAGSTER_DBT_UNIQUE_ID_METADATA_KEY = "dagster_dbt/unique_id"DBT_DEFAULT_SELECT = "fqn:*"DBT_DEFAULT_EXCLUDE = ""DBT_DEFAULT_SELECTOR = ""DBT_INDIRECT_SELECTION_ENV = "DBT_INDIRECT_SELECTION"DBT_EMPTY_INDIRECT_SELECTION = "empty"ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"]__version__ = "0.27.9"DBT_CORE_VERSION_UPPER_BOUND = "1.11"