CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

dbt-cloud-v2.mddocs/

dbt Cloud v2 Integration

Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors. This is the recommended approach for new dbt Cloud integrations, providing better composability and more flexible configuration options.

Capabilities

Credentials and Workspace Management

DbtCloudCredentials

Dataclass for dbt Cloud API authentication credentials.

@dataclass
class DbtCloudCredentials(Resolvable):
    """
    dbt Cloud API credentials.
    
    Attributes:
    - api_token: dbt Cloud API token for authentication
    - account_id: dbt Cloud account ID
    """
    
    api_token: str
    account_id: int

DbtCloudWorkspace

Configurable resource representing a dbt Cloud workspace with project and environment context.

@dataclass  
class DbtCloudWorkspace(ConfigurableResource):
    """
    dbt Cloud workspace resource for project and environment management.
    
    Attributes:
    - credentials: DbtCloudCredentials instance
    - project_id: dbt Cloud project ID
    - environment_id: dbt Cloud environment ID
    """
    
    credentials: DbtCloudCredentials
    project_id: int
    environment_id: int
    
    def run_job(
        self,
        job_id: int,
        cause: str = "Triggered by Dagster",
        steps_override: Optional[List[str]] = None,
        schema_override: Optional[str] = None,
        git_sha: Optional[str] = None,
        **kwargs
    ) -> DbtCloudRun:
        """
        Run a dbt Cloud job.
        
        Parameters:
        - job_id: dbt Cloud job ID to execute
        - cause: Description of why the job was triggered
        - steps_override: Custom dbt commands to run
        - schema_override: Override target schema
        - git_sha: Specific git SHA to run against
        - **kwargs: Additional job trigger parameters
        
        Returns:
        DbtCloudRun object representing the triggered run
        """
    
    def get_job(self, job_id: int) -> DbtCloudJob:
        """
        Get dbt Cloud job details.
        
        Parameters:
        - job_id: dbt Cloud job ID
        
        Returns:
        DbtCloudJob object with job details
        """
    
    def get_run(self, run_id: int) -> DbtCloudRun:
        """
        Get dbt Cloud run details.
        
        Parameters:
        - run_id: dbt Cloud run ID
        
        Returns:
        DbtCloudRun object with run details
        """
    
    def get_manifest(self, job_id: int) -> dict:
        """
        Get dbt manifest for a job.
        
        Parameters:
        - job_id: dbt Cloud job ID
        
        Returns:
        Parsed manifest.json dictionary
        """

Asset Loading and Specifications

load_dbt_cloud_asset_specs

Loads asset specifications from a dbt Cloud job without creating executable assets.

def load_dbt_cloud_asset_specs(
    workspace: DbtCloudWorkspace,
    job_id: int,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    **kwargs
) -> Sequence[AssetSpec]:
    """
    Load asset specs from dbt Cloud job.
    
    Parameters:
    - workspace: DbtCloudWorkspace resource
    - job_id: dbt Cloud job ID
    - dagster_dbt_translator: Custom translator for asset mapping
    - **kwargs: Additional parameters for spec generation
    
    Returns:
    Sequence of AssetSpec objects representing dbt models
    """

load_dbt_cloud_check_specs

Loads data quality check specifications from a dbt Cloud job.

def load_dbt_cloud_check_specs(
    workspace: DbtCloudWorkspace,
    job_id: int,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    **kwargs
) -> Sequence[AssetCheckSpec]:
    """
    Load check specs from dbt Cloud job.
    
    Parameters:
    - workspace: DbtCloudWorkspace resource
    - job_id: dbt Cloud job ID
    - dagster_dbt_translator: Custom translator for check mapping
    - **kwargs: Additional parameters for spec generation
    
    Returns:
    Sequence of AssetCheckSpec objects for dbt tests
    """

Asset Decorators

dbt_cloud_assets

Modern decorator for creating assets from dbt Cloud jobs.

def dbt_cloud_assets(
    job_id: int,
    workspace: DbtCloudWorkspace,
    name: Optional[str] = None,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    partitions_def: Optional[PartitionsDefinition] = None,
    backfill_policy: Optional[BackfillPolicy] = None,
    op_tags: Optional[Mapping[str, Any]] = None,
    **kwargs
) -> Callable[..., AssetsDefinition]:
    """
    Create Dagster assets from dbt Cloud job.
    
    Parameters:
    - job_id: dbt Cloud job ID
    - workspace: DbtCloudWorkspace resource
    - dagster_dbt_translator: Custom translator for asset mapping
    - backfill_policy: Backfill policy for assets
    - op_tags: Tags to apply to the underlying op
    - **kwargs: Additional decorator parameters
    
    Returns:
    Decorated function that materializes dbt Cloud assets
    """

Sensor Integration

build_dbt_cloud_polling_sensor

Creates a sensor that polls dbt Cloud for job completion and triggers downstream assets.

def build_dbt_cloud_polling_sensor(
    workspace: DbtCloudWorkspace,
    job_id: int,
    sensor_name: str,
    dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
    minimum_interval_seconds: Optional[int] = None,
    run_tags: Optional[Mapping[str, str]] = None,
) -> SensorDefinition:
    """
    Build polling sensor for dbt Cloud job.
    
    Parameters:
    - job_id: dbt Cloud job ID to monitor
    - workspace: DbtCloudWorkspace resource
    - dagster_dbt_translator: Custom translator for asset mapping
    - minimum_interval_seconds: Minimum time between sensor evaluations
    - **kwargs: Additional sensor configuration
    
    Returns:
    SensorDefinition that monitors dbt Cloud job completion
    """

Data Types

DbtCloudAccount

Represents a dbt Cloud account with metadata and configuration.

class DbtCloudAccount:
    """
    dbt Cloud account representation.
    
    Attributes:
    - id: Account ID
    - name: Account name
    - plan: Account plan type
    - state: Account state
    """
    
    id: int
    name: str
    plan: str
    state: int

DbtCloudProject

Represents a dbt Cloud project within an account.

class DbtCloudProject:
    """
    dbt Cloud project representation.
    
    Attributes:
    - id: Project ID
    - name: Project name
    - account_id: Parent account ID
    - repository_id: Associated repository ID
    - state: Project state
    """
    
    id: int
    name: str
    account_id: int
    repository_id: Optional[int]
    state: int

DbtCloudEnvironment

Represents a dbt Cloud environment configuration.

class DbtCloudEnvironment:
    """
    dbt Cloud environment representation.
    
    Attributes:
    - id: Environment ID
    - name: Environment name
    - project_id: Parent project ID
    - type: Environment type (development/deployment)
    - state: Environment state
    """
    
    id: int
    name: str
    project_id: int
    type: str
    state: int

DbtCloudJob

Represents a dbt Cloud job configuration.

class DbtCloudJob:
    """
    dbt Cloud job representation.
    
    Attributes:
    - id: Job ID
    - name: Job name
    - project_id: Parent project ID
    - environment_id: Target environment ID
    - execute_steps: List of dbt commands to execute
    - triggers: Job trigger configuration
    - state: Job state
    """
    
    id: int
    name: str
    project_id: int
    environment_id: int
    execute_steps: List[str]
    triggers: dict
    state: int

DbtCloudRun

Represents a dbt Cloud job run instance.

class DbtCloudRun:
    """
    dbt Cloud run representation.
    
    Attributes:
    - id: Run ID
    - job_id: Parent job ID
    - trigger: Run trigger information
    - status: Current run status
    - started_at: Run start timestamp
    - finished_at: Run completion timestamp
    - duration: Run duration in seconds
    """
    
    id: int
    job_id: int
    trigger: dict
    status: int
    started_at: Optional[str]
    finished_at: Optional[str]
    duration: Optional[int]
    
    @property
    def status_humanized(self) -> str:
        """Get human-readable status string."""
    
    @property
    def is_success(self) -> bool:
        """Check if run completed successfully."""
    
    @property
    def is_complete(self) -> bool:
        """Check if run has completed (success or failure)."""

DbtCloudJobRunStatusType

Enumeration of dbt Cloud job run statuses.

class DbtCloudJobRunStatusType(Enum):
    """
    dbt Cloud job run status enumeration.
    """
    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    ERROR = 20
    CANCELLED = 30

Usage Examples

Basic dbt Cloud v2 Asset Creation

from dagster import Definitions, AssetExecutionContext
from dagster_dbt.cloud_v2 import (
    DbtCloudCredentials,
    DbtCloudWorkspace,
    dbt_cloud_assets
)

# Configure credentials and workspace
credentials = DbtCloudCredentials(
    account_id=12345,
    token="dbt_api_token_here",
    access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
    credentials=credentials,
    project_id=67890
)

# Create assets from dbt Cloud job
@dbt_cloud_assets(
    job_id=123,
    workspace=workspace
)
def my_dbt_cloud_assets(context: AssetExecutionContext):
    # Trigger and monitor dbt Cloud job
    run = workspace.run_job(
        job_id=123,
        cause="Triggered by Dagster asset materialization"
    )
    
    # Poll for completion and yield events
    yield from run.stream_events(context=context)

defs = Definitions(
    assets=[my_dbt_cloud_assets],
    resources={"workspace": workspace}
)

Using Asset Specifications

from dagster import Definitions
from dagster_dbt.cloud_v2 import (
    DbtCloudCredentials,
    DbtCloudWorkspace, 
    load_dbt_cloud_asset_specs
)

credentials = DbtCloudCredentials(
    account_id=12345,
    token="dbt_api_token_here", 
    access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
    credentials=credentials,
    project_id=67890
)

# Load asset specs without creating executable assets
asset_specs = load_dbt_cloud_asset_specs(
    workspace=workspace,
    job_id=123
)

# Use specs to create custom assets or for analysis
defs = Definitions(assets=asset_specs)

Polling Sensor Integration

from dagster import Definitions
from dagster_dbt.cloud_v2 import (
    DbtCloudCredentials,
    DbtCloudWorkspace,
    build_dbt_cloud_polling_sensor
)

credentials = DbtCloudCredentials(
    account_id=12345,
    token="dbt_api_token_here", 
    access_url="https://cloud.getdbt.com"
)
workspace = DbtCloudWorkspace(
    credentials=credentials,
    project_id=67890
)

# Create sensor to monitor dbt Cloud job
dbt_cloud_sensor = build_dbt_cloud_polling_sensor(
    job_id=123,
    workspace=workspace,
    minimum_interval_seconds=120
)

defs = Definitions(sensors=[dbt_cloud_sensor])

Custom Translation

from dagster import AssetKey
from dagster_dbt.cloud_v2 import DbtCloudWorkspace, dbt_cloud_assets
from dagster_dbt import DagsterDbtTranslator

class CustomDbtCloudTranslator(DagsterDbtTranslator):
    def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
        return AssetKey([
            "dbt_cloud",
            dbt_resource_props["database"],
            dbt_resource_props["schema"],
            dbt_resource_props["name"]
        ])
    
    def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
        return dbt_resource_props.get("config", {}).get("group", "default")

@dbt_cloud_assets(
    job_id=123,
    workspace=workspace,
    dagster_dbt_translator=CustomDbtCloudTranslator()
)
def my_custom_assets(context):
    yield from workspace.run_job(job_id=123).stream_events(context=context)

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json