CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

freshness-checks.mddocs/

Freshness Checks

Build asset checks for dbt source freshness validation. This module provides functionality to automatically generate Dagster asset checks from dbt source freshness configurations, enabling data quality monitoring within Dagster pipelines.

Capabilities

Freshness Check Builder

build_freshness_checks_from_dbt_assets

Creates asset checks from dbt source freshness configurations defined in dbt assets.

def build_freshness_checks_from_dbt_assets(
    dbt_assets: Sequence[AssetsDefinition]
) -> Sequence[AssetChecksDefinition]:
    """
    Build freshness checks from dbt assets with source freshness configurations.
    
    This function analyzes dbt assets to identify sources with freshness policies
    and creates corresponding Dagster asset checks that validate data freshness
    according to dbt source configurations.
    
    Parameters:
    - dbt_assets: Sequence of AssetsDefinition objects created from dbt models
    
    Returns:
    Sequence of AssetChecksDefinition objects that validate source freshness
    
    Raises:
    DagsterDbtError: If dbt assets don't contain required metadata
    """

Usage Examples

Basic Freshness Check Creation

from dagster import Definitions, AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets

# Create dbt assets with CLI resource
dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")

@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Build freshness checks from the dbt assets
freshness_checks = build_freshness_checks_from_dbt_assets([my_dbt_assets])

# Include in definitions
defs = Definitions(
    assets=[my_dbt_assets],
    asset_checks=freshness_checks,
    resources={"dbt": dbt_cli_resource}
)

dbt Source Configuration

Configure freshness policies in your dbt schema.yml files:

version: 2

sources:
  - name: raw_data
    description: Raw data from external systems
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}
    tables:
      - name: users
        description: User data
        freshness:
          warn_after: {count: 2, period: hour}
          error_after: {count: 6, period: hour}
      - name: orders
        description: Order data
        freshness:
          warn_after: {count: 1, period: hour}
          error_after: {count: 3, period: hour}

Custom Freshness Check Configuration

from dagster import Definitions, AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets, DagsterDbtTranslator
from dagster import AssetKey

class CustomFreshnessTranslator(DagsterDbtTranslator):
    def get_freshness_policy(self, dbt_resource_props):
        """Custom freshness policy based on resource configuration."""
        config = dbt_resource_props.get("config", {})
        
        # Set custom freshness based on model tags
        tags = dbt_resource_props.get("tags", [])
        if "critical" in tags:
            return {"warn_after": {"count": 30, "period": "minute"}, 
                   "error_after": {"count": 60, "period": "minute"}}
        elif "daily" in tags:
            return {"warn_after": {"count": 25, "period": "hour"}, 
                   "error_after": {"count": 30, "period": "hour"}}
        
        return None

@dbt_assets(
    manifest="./my_dbt_project/target/manifest.json",
    dagster_dbt_translator=CustomFreshnessTranslator()
)
def my_custom_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Build freshness checks with custom configuration
freshness_checks = build_freshness_checks_from_dbt_assets([my_custom_dbt_assets])

defs = Definitions(
    assets=[my_custom_dbt_assets],
    asset_checks=freshness_checks,
    resources={"dbt": dbt_cli_resource}
)

Running Freshness Checks

from dagster import job, op, Definitions
from dagster_dbt import DbtCliResource, build_freshness_checks_from_dbt_assets

@op
def run_freshness_checks(context, dbt: DbtCliResource):
    """Execute dbt source freshness command."""
    result = dbt.cli(["source", "freshness"], context=context)
    context.log.info(f"Freshness check completed: {result.is_successful()}")
    return result

@job
def daily_freshness_job():
    """Job to run daily freshness checks."""
    run_freshness_checks()

# Combine with scheduled execution
from dagster import ScheduleDefinition

freshness_schedule = ScheduleDefinition(
    job=daily_freshness_job,
    cron_schedule="0 6 * * *",  # Run at 6 AM daily
    name="daily_freshness_schedule"
)

defs = Definitions(
    jobs=[daily_freshness_job],
    schedules=[freshness_schedule],
    resources={"dbt": dbt_cli_resource}
)

Monitoring Freshness Results

from dagster import sensor, RunRequest, SkipReason, SensorDefinition
from dagster_dbt import DbtCliResource

@sensor(asset_selection="*")
def freshness_alert_sensor(context, dbt: DbtCliResource):
    """Sensor to monitor freshness check failures."""
    
    # Check most recent freshness results
    try:
        result = dbt.cli(["source", "freshness", "--output", "json"])
        freshness_data = result.get_artifact("sources.json")
        
        failed_sources = []
        for source_name, source_data in freshness_data.get("sources", {}).items():
            for table_name, table_data in source_data.get("tables", {}).items():
                freshness_status = table_data.get("freshness", {}).get("status")
                if freshness_status == "error":
                    failed_sources.append(f"{source_name}.{table_name}")
        
        if failed_sources:
            context.log.warning(f"Freshness check failures: {failed_sources}")
            # Could trigger alerts, create incidents, etc.
            return RunRequest(run_key=f"freshness_alert_{context.cursor}")
        else:
            return SkipReason("All freshness checks passed")
            
    except Exception as e:
        context.log.error(f"Error checking freshness: {e}")
        return SkipReason(f"Error checking freshness: {e}")

defs = Definitions(
    sensors=[freshness_alert_sensor],
    resources={"dbt": dbt_cli_resource}
)

Integration with dbt Commands

Source Freshness Command

The freshness checks integrate with dbt's built-in source freshness functionality:

# Run freshness checks directly with dbt CLI
dbt source freshness

# Run specific source freshness
dbt source freshness --select source:raw_data

# Output freshness results to JSON
dbt source freshness --output json --output-path target/

Integration with dbt Tests

Combine freshness checks with other dbt data quality tests:

@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
def comprehensive_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    # Run models and tests
    yield from dbt.cli(["build"], context=context).stream()
    
    # Run source freshness checks
    yield from dbt.cli(["source", "freshness"], context=context).stream()

# Build both model assets and freshness checks
model_assets = [comprehensive_dbt_assets]
freshness_checks = build_freshness_checks_from_dbt_assets(model_assets)

defs = Definitions(
    assets=model_assets,
    asset_checks=freshness_checks,
    resources={"dbt": dbt_cli_resource}
)

Configuration Options

Freshness Policy Configuration

Control how freshness policies are interpreted and applied:

from dagster_dbt import DagsterDbtTranslator

class FreshnessPolicyTranslator(DagsterDbtTranslator):
    def get_freshness_policy(self, dbt_resource_props):
        """Convert dbt freshness config to Dagster freshness policy."""
        freshness_config = dbt_resource_props.get("freshness")
        if not freshness_config:
            return None
        
        # Convert dbt time periods to minutes
        def parse_time_period(config):
            count = config.get("count", 0)
            period = config.get("period", "hour")
            
            period_multipliers = {
                "minute": 1,
                "hour": 60,
                "day": 60 * 24
            }
            
            return count * period_multipliers.get(period, 60)
        
        warn_after = freshness_config.get("warn_after")
        error_after = freshness_config.get("error_after")
        
        if error_after:
            maximum_lag_minutes = parse_time_period(error_after)
            return {"maximum_lag_minutes": maximum_lag_minutes}
        
        return None

Error Handling

Handle common freshness check errors:

from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError

@op
def robust_freshness_check(context, dbt: DbtCliResource):
    """Freshness check with error handling."""
    try:
        result = dbt.cli(["source", "freshness"], context=context)
        
        if not result.is_successful():
            # Log details but don't fail the op
            context.log.warning("Some freshness checks failed, but continuing pipeline")
            
        return result
        
    except DagsterDbtCliRuntimeError as e:
        context.log.error(f"dbt CLI error during freshness check: {e}")
        # Could choose to fail or continue based on requirements
        raise
        
    except DagsterDbtError as e:
        context.log.error(f"dbt integration error: {e}")
        raise

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json