A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors. This is the recommended approach for new dbt Cloud integrations, providing better composability and more flexible configuration options.
Dataclass for dbt Cloud API authentication credentials.
@dataclass
class DbtCloudCredentials(Resolvable):
"""
dbt Cloud API credentials.
Attributes:
- api_token: dbt Cloud API token for authentication
- account_id: dbt Cloud account ID
"""
api_token: str
account_id: intConfigurable resource representing a dbt Cloud workspace with project and environment context.
@dataclass
class DbtCloudWorkspace(ConfigurableResource):
"""
dbt Cloud workspace resource for project and environment management.
Attributes:
- credentials: DbtCloudCredentials instance
- project_id: dbt Cloud project ID
- environment_id: dbt Cloud environment ID
"""
credentials: DbtCloudCredentials
project_id: int
environment_id: int
def run_job(
self,
job_id: int,
cause: str = "Triggered by Dagster",
steps_override: Optional[List[str]] = None,
schema_override: Optional[str] = None,
git_sha: Optional[str] = None,
**kwargs
) -> DbtCloudRun:
"""
Run a dbt Cloud job.
Parameters:
- job_id: dbt Cloud job ID to execute
- cause: Description of why the job was triggered
- steps_override: Custom dbt commands to run
- schema_override: Override target schema
- git_sha: Specific git SHA to run against
- **kwargs: Additional job trigger parameters
Returns:
DbtCloudRun object representing the triggered run
"""
def get_job(self, job_id: int) -> DbtCloudJob:
"""
Get dbt Cloud job details.
Parameters:
- job_id: dbt Cloud job ID
Returns:
DbtCloudJob object with job details
"""
def get_run(self, run_id: int) -> DbtCloudRun:
"""
Get dbt Cloud run details.
Parameters:
- run_id: dbt Cloud run ID
Returns:
DbtCloudRun object with run details
"""
def get_manifest(self, job_id: int) -> dict:
"""
Get dbt manifest for a job.
Parameters:
- job_id: dbt Cloud job ID
Returns:
Parsed manifest.json dictionary
"""Loads asset specifications from a dbt Cloud job without creating executable assets.
def load_dbt_cloud_asset_specs(
workspace: DbtCloudWorkspace,
job_id: int,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
**kwargs
) -> Sequence[AssetSpec]:
"""
Load asset specs from dbt Cloud job.
Parameters:
- workspace: DbtCloudWorkspace resource
- job_id: dbt Cloud job ID
- dagster_dbt_translator: Custom translator for asset mapping
- **kwargs: Additional parameters for spec generation
Returns:
Sequence of AssetSpec objects representing dbt models
"""Loads data quality check specifications from a dbt Cloud job.
def load_dbt_cloud_check_specs(
workspace: DbtCloudWorkspace,
job_id: int,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
**kwargs
) -> Sequence[AssetCheckSpec]:
"""
Load check specs from dbt Cloud job.
Parameters:
- workspace: DbtCloudWorkspace resource
- job_id: dbt Cloud job ID
- dagster_dbt_translator: Custom translator for check mapping
- **kwargs: Additional parameters for spec generation
Returns:
Sequence of AssetCheckSpec objects for dbt tests
"""Modern decorator for creating assets from dbt Cloud jobs.
def dbt_cloud_assets(
job_id: int,
workspace: DbtCloudWorkspace,
name: Optional[str] = None,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
partitions_def: Optional[PartitionsDefinition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
op_tags: Optional[Mapping[str, Any]] = None,
**kwargs
) -> Callable[..., AssetsDefinition]:
"""
Create Dagster assets from dbt Cloud job.
Parameters:
- job_id: dbt Cloud job ID
- workspace: DbtCloudWorkspace resource
- dagster_dbt_translator: Custom translator for asset mapping
- backfill_policy: Backfill policy for assets
- op_tags: Tags to apply to the underlying op
- **kwargs: Additional decorator parameters
Returns:
Decorated function that materializes dbt Cloud assets
"""Creates a sensor that polls dbt Cloud for job completion and triggers downstream assets.
def build_dbt_cloud_polling_sensor(
workspace: DbtCloudWorkspace,
job_id: int,
sensor_name: str,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
minimum_interval_seconds: Optional[int] = None,
run_tags: Optional[Mapping[str, str]] = None,
) -> SensorDefinition:
"""
Build polling sensor for dbt Cloud job.
Parameters:
- job_id: dbt Cloud job ID to monitor
- workspace: DbtCloudWorkspace resource
- dagster_dbt_translator: Custom translator for asset mapping
- minimum_interval_seconds: Minimum time between sensor evaluations
- **kwargs: Additional sensor configuration
Returns:
SensorDefinition that monitors dbt Cloud job completion
"""Represents a dbt Cloud account with metadata and configuration.
class DbtCloudAccount:
"""
dbt Cloud account representation.
Attributes:
- id: Account ID
- name: Account name
- plan: Account plan type
- state: Account state
"""
id: int
name: str
plan: str
state: intRepresents a dbt Cloud project within an account.
class DbtCloudProject:
"""
dbt Cloud project representation.
Attributes:
- id: Project ID
- name: Project name
- account_id: Parent account ID
- repository_id: Associated repository ID
- state: Project state
"""
id: int
name: str
account_id: int
repository_id: Optional[int]
state: intRepresents a dbt Cloud environment configuration.
class DbtCloudEnvironment:
"""
dbt Cloud environment representation.
Attributes:
- id: Environment ID
- name: Environment name
- project_id: Parent project ID
- type: Environment type (development/deployment)
- state: Environment state
"""
id: int
name: str
project_id: int
type: str
state: intRepresents a dbt Cloud job configuration.
class DbtCloudJob:
"""
dbt Cloud job representation.
Attributes:
- id: Job ID
- name: Job name
- project_id: Parent project ID
- environment_id: Target environment ID
- execute_steps: List of dbt commands to execute
- triggers: Job trigger configuration
- state: Job state
"""
id: int
name: str
project_id: int
environment_id: int
execute_steps: List[str]
triggers: dict
state: intRepresents a dbt Cloud job run instance.
class DbtCloudRun:
"""
dbt Cloud run representation.
Attributes:
- id: Run ID
- job_id: Parent job ID
- trigger: Run trigger information
- status: Current run status
- started_at: Run start timestamp
- finished_at: Run completion timestamp
- duration: Run duration in seconds
"""
id: int
job_id: int
trigger: dict
status: int
started_at: Optional[str]
finished_at: Optional[str]
duration: Optional[int]
@property
def status_humanized(self) -> str:
"""Get human-readable status string."""
@property
def is_success(self) -> bool:
"""Check if run completed successfully."""
@property
def is_complete(self) -> bool:
"""Check if run has completed (success or failure)."""Enumeration of dbt Cloud job run statuses.
class DbtCloudJobRunStatusType(Enum):
"""
dbt Cloud job run status enumeration.
"""
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30from dagster import Definitions, AssetExecutionContext
from dagster_dbt.cloud_v2 import (
DbtCloudCredentials,
DbtCloudWorkspace,
dbt_cloud_assets
)
# Configure credentials and workspace
credentials = DbtCloudCredentials(
account_id=12345,
token="dbt_api_token_here",
access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
credentials=credentials,
project_id=67890
)
# Create assets from dbt Cloud job
@dbt_cloud_assets(
job_id=123,
workspace=workspace
)
def my_dbt_cloud_assets(context: AssetExecutionContext):
# Trigger and monitor dbt Cloud job
run = workspace.run_job(
job_id=123,
cause="Triggered by Dagster asset materialization"
)
# Poll for completion and yield events
yield from run.stream_events(context=context)
defs = Definitions(
assets=[my_dbt_cloud_assets],
resources={"workspace": workspace}
)from dagster import Definitions
from dagster_dbt.cloud_v2 import (
DbtCloudCredentials,
DbtCloudWorkspace,
load_dbt_cloud_asset_specs
)
credentials = DbtCloudCredentials(
account_id=12345,
token="dbt_api_token_here",
access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
credentials=credentials,
project_id=67890
)
# Load asset specs without creating executable assets
asset_specs = load_dbt_cloud_asset_specs(
workspace=workspace,
job_id=123
)
# Use specs to create custom assets or for analysis
defs = Definitions(assets=asset_specs)from dagster import Definitions
from dagster_dbt.cloud_v2 import (
DbtCloudCredentials,
DbtCloudWorkspace,
build_dbt_cloud_polling_sensor
)
credentials = DbtCloudCredentials(
account_id=12345,
token="dbt_api_token_here",
access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
credentials=credentials,
project_id=67890
)
# Create sensor to monitor dbt Cloud job
dbt_cloud_sensor = build_dbt_cloud_polling_sensor(
job_id=123,
workspace=workspace,
minimum_interval_seconds=120
)
defs = Definitions(sensors=[dbt_cloud_sensor])from dagster import AssetKey
from dagster_dbt.cloud_v2 import DbtCloudWorkspace, dbt_cloud_assets
from dagster_dbt import DagsterDbtTranslator
class CustomDbtCloudTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
return AssetKey([
"dbt_cloud",
dbt_resource_props["database"],
dbt_resource_props["schema"],
dbt_resource_props["name"]
])
def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
return dbt_resource_props.get("config", {}).get("group", "default")
@dbt_cloud_assets(
job_id=123,
workspace=workspace,
dagster_dbt_translator=CustomDbtCloudTranslator()
)
def my_custom_assets(context):
yield from workspace.run_job(job_id=123).stream_events(context=context)Install with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt