A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Original dbt Cloud integration providing job execution, asset loading, and operations. This module supports the legacy dbt Cloud API integration patterns and is maintained for backward compatibility.
Factory function for creating a dbt Cloud resource with API authentication.
def dbt_cloud_resource(
api_token: str,
account_id: int,
disable_schedule_on_trigger: bool = True
) -> ResourceDefinition:
"""
Create a dbt Cloud resource.
Parameters:
- api_token: dbt Cloud API token for authentication
- account_id: dbt Cloud account ID
- disable_schedule_on_trigger: Whether to disable dbt Cloud schedules when triggering jobs
Returns:
ResourceDefinition for dbt Cloud integration
"""Legacy dbt Cloud resource class for job execution and management.
class DbtCloudResource:
"""
Legacy dbt Cloud resource for job execution.
Attributes:
- api_token: dbt Cloud API token
- account_id: dbt Cloud account ID
- disable_schedule_on_trigger: Schedule disable behavior
"""
def run_job_and_poll(
self,
job_id: int,
cause: str = "Triggered by Dagster",
steps_override: Optional[List[str]] = None,
schema_override: Optional[str] = None,
poll_interval: int = 10,
poll_timeout: Optional[int] = None
) -> DbtCloudOutput:
"""
Run a dbt Cloud job and poll for completion.
Parameters:
- job_id: dbt Cloud job ID to execute
- cause: Description of why the job was triggered
- steps_override: Custom dbt commands to run instead of job default
- schema_override: Override the target schema
- poll_interval: Seconds between polling attempts
- poll_timeout: Maximum seconds to wait for completion
Returns:
DbtCloudOutput containing run results and metadata
"""
def get_job(self, job_id: int) -> dict:
"""
Get dbt Cloud job details.
Parameters:
- job_id: dbt Cloud job ID
Returns:
Job details dictionary from dbt Cloud API
"""
def get_run(self, run_id: int) -> dict:
"""
Get dbt Cloud run details.
Parameters:
- run_id: dbt Cloud run ID
Returns:
Run details dictionary from dbt Cloud API
"""Configurable resource implementation for dbt Cloud API client.
class DbtCloudClientResource(ConfigurableResource):
"""
Configurable dbt Cloud client resource.
Attributes:
- api_token: dbt Cloud API token for authentication
- account_id: dbt Cloud account ID
- disable_schedule_on_trigger: Whether to disable schedules on trigger
"""
api_token: str
account_id: int
disable_schedule_on_trigger: bool = True
def run_job_and_poll(
self,
job_id: int,
**kwargs
) -> DbtCloudOutput:
"""
Run and poll dbt Cloud job.
Parameters:
- job_id: dbt Cloud job ID
- **kwargs: Additional job execution parameters
Returns:
DbtCloudOutput with execution results
"""Creates Dagster assets from a dbt Cloud job configuration.
def load_assets_from_dbt_cloud_job(
dbt_cloud: ResourceDefinition,
job_id: int,
node_info_to_asset_key: Callable[[dict], AssetKey] = default_node_info_to_asset_key,
node_info_to_group_fn: Callable[[dict], Optional[str]] = lambda _: None,
node_info_to_freshness_policy_fn: Callable[[dict], Optional[FreshnessPolicy]] = lambda _: None,
partitions_def: Optional[PartitionsDefinition] = None,
partition_key_to_vars_fn: Optional[Callable[[str], dict]] = None,
op_tags: Optional[dict] = None
) -> Sequence[AssetsDefinition]:
"""
Load assets from dbt Cloud job.
Parameters:
- dbt_cloud: dbt Cloud resource definition
- job_id: dbt Cloud job ID to load assets from
- node_info_to_asset_key: Function to generate asset keys from dbt nodes
- node_info_to_group_fn: Function to determine asset groups
- node_info_to_freshness_policy_fn: Function to set freshness policies
- partitions_def: Partitioning definition for assets
- partition_key_to_vars_fn: Function to map partition keys to dbt vars
- op_tags: Tags to apply to generated ops
Returns:
Sequence of AssetsDefinition objects
"""Operation for executing dbt Cloud jobs within Dagster pipelines.
def dbt_cloud_run_op(
context: OpExecutionContext,
dbt_cloud: DbtCloudResource
) -> DbtCloudOutput:
"""
Execute dbt Cloud job operation.
Parameters:
- context: Dagster operation execution context
- dbt_cloud: dbt Cloud resource instance
Returns:
DbtCloudOutput containing job execution results
"""Output type for dbt Cloud operations containing run results and metadata.
class DbtCloudOutput:
"""
Output from dbt Cloud job execution.
Attributes:
- run_details: Complete run details from dbt Cloud API
- is_successful: Whether the run completed successfully
- run_id: dbt Cloud run ID
- job_id: dbt Cloud job ID
"""
run_details: dict
is_successful: bool
@property
def run_id(self) -> int:
"""Get the dbt Cloud run ID."""
@property
def job_id(self) -> int:
"""Get the dbt Cloud job ID."""
@property
def run_status(self) -> DbtCloudRunStatus:
"""Get the run status enum."""
def get_artifact(self, artifact_name: str) -> Optional[dict]:
"""
Get a specific artifact from the run.
Parameters:
- artifact_name: Name of the artifact to retrieve
Returns:
Artifact dictionary or None if not found
"""Enumeration of possible dbt Cloud run statuses.
class DbtCloudRunStatus(Enum):
"""
dbt Cloud run status enumeration.
"""
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30from dagster import job, op, Definitions
from dagster_dbt.cloud import dbt_cloud_resource, dbt_cloud_run_op
# Create resource
dbt_cloud = dbt_cloud_resource(
api_token="dbt_api_token_here",
account_id=12345
)
@op(required_resource_keys={"dbt_cloud"})
def run_dbt_cloud_job(context):
dbt_cloud = context.resources.dbt_cloud
return dbt_cloud.run_job_and_poll(
job_id=67890,
cause="Triggered by Dagster pipeline"
)
@job(resource_defs={"dbt_cloud": dbt_cloud})
def dbt_cloud_job():
run_dbt_cloud_job()
defs = Definitions(jobs=[dbt_cloud_job])from dagster import Definitions
from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job
dbt_cloud = dbt_cloud_resource(
api_token="dbt_api_token_here",
account_id=12345
)
# Load assets from dbt Cloud job
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
dbt_cloud=dbt_cloud,
job_id=67890
)
defs = Definitions(
assets=dbt_cloud_assets,
resources={"dbt_cloud": dbt_cloud}
)from dagster import AssetKey
from dagster_dbt.cloud import load_assets_from_dbt_cloud_job, dbt_cloud_resource
def custom_asset_key_fn(node_info: dict) -> AssetKey:
"""Custom function to generate asset keys from dbt nodes."""
return AssetKey([
"dbt_cloud",
node_info["database"],
node_info["schema"],
node_info["name"]
])
def custom_group_fn(node_info: dict) -> str:
"""Custom function to determine asset groups."""
return node_info.get("config", {}).get("materialized", "default")
dbt_cloud = dbt_cloud_resource(
api_token="dbt_api_token_here",
account_id=12345
)
assets = load_assets_from_dbt_cloud_job(
dbt_cloud=dbt_cloud,
job_id=67890,
node_info_to_asset_key=custom_asset_key_fn,
node_info_to_group_fn=custom_group_fn
)The legacy dbt Cloud integration is maintained for backward compatibility, but new projects should use the dbt Cloud v2 integration for improved features:
DbtCloudCredentials and DbtCloudWorkspaceload_dbt_cloud_asset_specs@dbt_cloud_assets decoratorbuild_dbt_cloud_polling_sensorSee dbt Cloud v2 for the recommended modern integration.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt