CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

dbt-cloud-legacy.mddocs/

dbt Cloud Integration (Legacy)

Original dbt Cloud integration providing job execution, asset loading, and operations. This module supports the legacy dbt Cloud API integration patterns and is maintained for backward compatibility.

Capabilities

Cloud Resources

dbt_cloud_resource

Factory function for creating a dbt Cloud resource with API authentication.

def dbt_cloud_resource(
    api_token: str,
    account_id: int,
    disable_schedule_on_trigger: bool = True
) -> ResourceDefinition:
    """
    Create a dbt Cloud resource.
    
    Parameters:
    - api_token: dbt Cloud API token for authentication
    - account_id: dbt Cloud account ID
    - disable_schedule_on_trigger: Whether to disable dbt Cloud schedules when triggering jobs
    
    Returns:
    ResourceDefinition for dbt Cloud integration
    """

DbtCloudResource

Legacy dbt Cloud resource class for job execution and management.

class DbtCloudResource:
    """
    Legacy dbt Cloud resource for job execution.
    
    Attributes:
    - api_token: dbt Cloud API token
    - account_id: dbt Cloud account ID
    - disable_schedule_on_trigger: Schedule disable behavior
    """
    
    def run_job_and_poll(
        self,
        job_id: int,
        cause: str = "Triggered by Dagster",
        steps_override: Optional[List[str]] = None,
        schema_override: Optional[str] = None,
        poll_interval: int = 10,
        poll_timeout: Optional[int] = None
    ) -> DbtCloudOutput:
        """
        Run a dbt Cloud job and poll for completion.
        
        Parameters:
        - job_id: dbt Cloud job ID to execute
        - cause: Description of why the job was triggered
        - steps_override: Custom dbt commands to run instead of job default
        - schema_override: Override the target schema
        - poll_interval: Seconds between polling attempts
        - poll_timeout: Maximum seconds to wait for completion
        
        Returns:
        DbtCloudOutput containing run results and metadata
        """
    
    def get_job(self, job_id: int) -> dict:
        """
        Get dbt Cloud job details.
        
        Parameters:
        - job_id: dbt Cloud job ID
        
        Returns:
        Job details dictionary from dbt Cloud API
        """
    
    def get_run(self, run_id: int) -> dict:
        """
        Get dbt Cloud run details.
        
        Parameters:
        - run_id: dbt Cloud run ID
        
        Returns:
        Run details dictionary from dbt Cloud API
        """

DbtCloudClientResource

Configurable resource implementation for dbt Cloud API client.

class DbtCloudClientResource(ConfigurableResource):
    """
    Configurable dbt Cloud client resource.
    
    Attributes:
    - api_token: dbt Cloud API token for authentication
    - account_id: dbt Cloud account ID
    - disable_schedule_on_trigger: Whether to disable schedules on trigger
    """
    
    api_token: str
    account_id: int
    disable_schedule_on_trigger: bool = True
    
    def run_job_and_poll(
        self,
        job_id: int,
        **kwargs
    ) -> DbtCloudOutput:
        """
        Run and poll dbt Cloud job.
        
        Parameters:
        - job_id: dbt Cloud job ID
        - **kwargs: Additional job execution parameters
        
        Returns:
        DbtCloudOutput with execution results
        """

Asset Loading

load_assets_from_dbt_cloud_job

Creates Dagster assets from a dbt Cloud job configuration.

def load_assets_from_dbt_cloud_job(
    dbt_cloud: ResourceDefinition,
    job_id: int,
    node_info_to_asset_key: Callable[[dict], AssetKey] = default_node_info_to_asset_key,
    node_info_to_group_fn: Callable[[dict], Optional[str]] = lambda _: None,
    node_info_to_freshness_policy_fn: Callable[[dict], Optional[FreshnessPolicy]] = lambda _: None,
    partitions_def: Optional[PartitionsDefinition] = None,
    partition_key_to_vars_fn: Optional[Callable[[str], dict]] = None,
    op_tags: Optional[dict] = None
) -> Sequence[AssetsDefinition]:
    """
    Load assets from dbt Cloud job.
    
    Parameters:
    - dbt_cloud: dbt Cloud resource definition
    - job_id: dbt Cloud job ID to load assets from
    - node_info_to_asset_key: Function to generate asset keys from dbt nodes
    - node_info_to_group_fn: Function to determine asset groups
    - node_info_to_freshness_policy_fn: Function to set freshness policies
    - partitions_def: Partitioning definition for assets
    - partition_key_to_vars_fn: Function to map partition keys to dbt vars
    - op_tags: Tags to apply to generated ops
    
    Returns:
    Sequence of AssetsDefinition objects
    """

Operations

dbt_cloud_run_op

Operation for executing dbt Cloud jobs within Dagster pipelines.

def dbt_cloud_run_op(
    context: OpExecutionContext,
    dbt_cloud: DbtCloudResource
) -> DbtCloudOutput:
    """
    Execute dbt Cloud job operation.
    
    Parameters:
    - context: Dagster operation execution context
    - dbt_cloud: dbt Cloud resource instance
    
    Returns:
    DbtCloudOutput containing job execution results
    """

Output Types

DbtCloudOutput

Output type for dbt Cloud operations containing run results and metadata.

class DbtCloudOutput:
    """
    Output from dbt Cloud job execution.
    
    Attributes:
    - run_details: Complete run details from dbt Cloud API
    - is_successful: Whether the run completed successfully
    - run_id: dbt Cloud run ID
    - job_id: dbt Cloud job ID
    """
    
    run_details: dict
    is_successful: bool
    
    @property
    def run_id(self) -> int:
        """Get the dbt Cloud run ID."""
    
    @property
    def job_id(self) -> int:
        """Get the dbt Cloud job ID."""
    
    @property
    def run_status(self) -> DbtCloudRunStatus:
        """Get the run status enum."""
    
    def get_artifact(self, artifact_name: str) -> Optional[dict]:
        """
        Get a specific artifact from the run.
        
        Parameters:
        - artifact_name: Name of the artifact to retrieve
        
        Returns:
        Artifact dictionary or None if not found
        """

DbtCloudRunStatus

Enumeration of possible dbt Cloud run statuses.

class DbtCloudRunStatus(Enum):
    """
    dbt Cloud run status enumeration.
    """
    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    ERROR = 20
    CANCELLED = 30

Usage Examples

Basic dbt Cloud Job Execution

from dagster import job, op, Definitions
from dagster_dbt.cloud import dbt_cloud_resource, dbt_cloud_run_op

# Create resource
dbt_cloud = dbt_cloud_resource(
    api_token="dbt_api_token_here",
    account_id=12345
)

@op(required_resource_keys={"dbt_cloud"})
def run_dbt_cloud_job(context):
    dbt_cloud = context.resources.dbt_cloud
    return dbt_cloud.run_job_and_poll(
        job_id=67890,
        cause="Triggered by Dagster pipeline"
    )

@job(resource_defs={"dbt_cloud": dbt_cloud})
def dbt_cloud_job():
    run_dbt_cloud_job()

defs = Definitions(jobs=[dbt_cloud_job])

Loading Assets from dbt Cloud Job

from dagster import Definitions
from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job

dbt_cloud = dbt_cloud_resource(
    api_token="dbt_api_token_here", 
    account_id=12345
)

# Load assets from dbt Cloud job
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud,
    job_id=67890
)

defs = Definitions(
    assets=dbt_cloud_assets,
    resources={"dbt_cloud": dbt_cloud}
)

Custom Asset Key Mapping

from dagster import AssetKey
from dagster_dbt.cloud import load_assets_from_dbt_cloud_job, dbt_cloud_resource

def custom_asset_key_fn(node_info: dict) -> AssetKey:
    """Custom function to generate asset keys from dbt nodes."""
    return AssetKey([
        "dbt_cloud",
        node_info["database"],
        node_info["schema"], 
        node_info["name"]
    ])

def custom_group_fn(node_info: dict) -> str:
    """Custom function to determine asset groups."""
    return node_info.get("config", {}).get("materialized", "default")

dbt_cloud = dbt_cloud_resource(
    api_token="dbt_api_token_here",
    account_id=12345
)

assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud,
    job_id=67890,
    node_info_to_asset_key=custom_asset_key_fn,
    node_info_to_group_fn=custom_group_fn
)

Migration to dbt Cloud v2

The legacy dbt Cloud integration is maintained for backward compatibility, but new projects should use the dbt Cloud v2 integration for improved features:

  • Better resource management with DbtCloudCredentials and DbtCloudWorkspace
  • Asset specifications with load_dbt_cloud_asset_specs
  • Modern @dbt_cloud_assets decorator
  • Polling sensors with build_dbt_cloud_polling_sensor

See dbt Cloud v2 for the recommended modern integration.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json