A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Build asset checks for dbt source freshness validation. This module provides functionality to automatically generate Dagster asset checks from dbt source freshness configurations, enabling data quality monitoring within Dagster pipelines.
Creates asset checks from dbt source freshness configurations defined in dbt assets.
def build_freshness_checks_from_dbt_assets(
dbt_assets: Sequence[AssetsDefinition]
) -> Sequence[AssetChecksDefinition]:
"""
Build freshness checks from dbt assets with source freshness configurations.
This function analyzes dbt assets to identify sources with freshness policies
and creates corresponding Dagster asset checks that validate data freshness
according to dbt source configurations.
Parameters:
- dbt_assets: Sequence of AssetsDefinition objects created from dbt models
Returns:
Sequence of AssetChecksDefinition objects that validate source freshness
Raises:
DagsterDbtError: If dbt assets don't contain required metadata
"""from dagster import Definitions, AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets
# Create dbt assets with CLI resource
dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Build freshness checks from the dbt assets
freshness_checks = build_freshness_checks_from_dbt_assets([my_dbt_assets])
# Include in definitions
defs = Definitions(
assets=[my_dbt_assets],
asset_checks=freshness_checks,
resources={"dbt": dbt_cli_resource}
)Configure freshness policies in your dbt schema.yml files:
version: 2
sources:
- name: raw_data
description: Raw data from external systems
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: users
description: User data
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 6, period: hour}
- name: orders
description: Order data
freshness:
warn_after: {count: 1, period: hour}
error_after: {count: 3, period: hour}from dagster import Definitions, AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets, DagsterDbtTranslator
from dagster import AssetKey
class CustomFreshnessTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, dbt_resource_props):
"""Custom freshness policy based on resource configuration."""
config = dbt_resource_props.get("config", {})
# Set custom freshness based on model tags
tags = dbt_resource_props.get("tags", [])
if "critical" in tags:
return {"warn_after": {"count": 30, "period": "minute"},
"error_after": {"count": 60, "period": "minute"}}
elif "daily" in tags:
return {"warn_after": {"count": 25, "period": "hour"},
"error_after": {"count": 30, "period": "hour"}}
return None
@dbt_assets(
manifest="./my_dbt_project/target/manifest.json",
dagster_dbt_translator=CustomFreshnessTranslator()
)
def my_custom_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Build freshness checks with custom configuration
freshness_checks = build_freshness_checks_from_dbt_assets([my_custom_dbt_assets])
defs = Definitions(
assets=[my_custom_dbt_assets],
asset_checks=freshness_checks,
resources={"dbt": dbt_cli_resource}
)from dagster import job, op, Definitions
from dagster_dbt import DbtCliResource, build_freshness_checks_from_dbt_assets
@op
def run_freshness_checks(context, dbt: DbtCliResource):
"""Execute dbt source freshness command."""
result = dbt.cli(["source", "freshness"], context=context)
context.log.info(f"Freshness check completed: {result.is_successful()}")
return result
@job
def daily_freshness_job():
"""Job to run daily freshness checks."""
run_freshness_checks()
# Combine with scheduled execution
from dagster import ScheduleDefinition
freshness_schedule = ScheduleDefinition(
job=daily_freshness_job,
cron_schedule="0 6 * * *", # Run at 6 AM daily
name="daily_freshness_schedule"
)
defs = Definitions(
jobs=[daily_freshness_job],
schedules=[freshness_schedule],
resources={"dbt": dbt_cli_resource}
)from dagster import sensor, RunRequest, SkipReason, SensorDefinition
from dagster_dbt import DbtCliResource
@sensor(asset_selection="*")
def freshness_alert_sensor(context, dbt: DbtCliResource):
"""Sensor to monitor freshness check failures."""
# Check most recent freshness results
try:
result = dbt.cli(["source", "freshness", "--output", "json"])
freshness_data = result.get_artifact("sources.json")
failed_sources = []
for source_name, source_data in freshness_data.get("sources", {}).items():
for table_name, table_data in source_data.get("tables", {}).items():
freshness_status = table_data.get("freshness", {}).get("status")
if freshness_status == "error":
failed_sources.append(f"{source_name}.{table_name}")
if failed_sources:
context.log.warning(f"Freshness check failures: {failed_sources}")
# Could trigger alerts, create incidents, etc.
return RunRequest(run_key=f"freshness_alert_{context.cursor}")
else:
return SkipReason("All freshness checks passed")
except Exception as e:
context.log.error(f"Error checking freshness: {e}")
return SkipReason(f"Error checking freshness: {e}")
defs = Definitions(
sensors=[freshness_alert_sensor],
resources={"dbt": dbt_cli_resource}
)The freshness checks integrate with dbt's built-in source freshness functionality:
# Run freshness checks directly with dbt CLI
dbt source freshness
# Run specific source freshness
dbt source freshness --select source:raw_data
# Output freshness results to JSON
dbt source freshness --output json --output-path target/Combine freshness checks with other dbt data quality tests:
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
def comprehensive_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# Run models and tests
yield from dbt.cli(["build"], context=context).stream()
# Run source freshness checks
yield from dbt.cli(["source", "freshness"], context=context).stream()
# Build both model assets and freshness checks
model_assets = [comprehensive_dbt_assets]
freshness_checks = build_freshness_checks_from_dbt_assets(model_assets)
defs = Definitions(
assets=model_assets,
asset_checks=freshness_checks,
resources={"dbt": dbt_cli_resource}
)Control how freshness policies are interpreted and applied:
from dagster_dbt import DagsterDbtTranslator
class FreshnessPolicyTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, dbt_resource_props):
"""Convert dbt freshness config to Dagster freshness policy."""
freshness_config = dbt_resource_props.get("freshness")
if not freshness_config:
return None
# Convert dbt time periods to minutes
def parse_time_period(config):
count = config.get("count", 0)
period = config.get("period", "hour")
period_multipliers = {
"minute": 1,
"hour": 60,
"day": 60 * 24
}
return count * period_multipliers.get(period, 60)
warn_after = freshness_config.get("warn_after")
error_after = freshness_config.get("error_after")
if error_after:
maximum_lag_minutes = parse_time_period(error_after)
return {"maximum_lag_minutes": maximum_lag_minutes}
return NoneHandle common freshness check errors:
from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError
@op
def robust_freshness_check(context, dbt: DbtCliResource):
"""Freshness check with error handling."""
try:
result = dbt.cli(["source", "freshness"], context=context)
if not result.is_successful():
# Log details but don't fail the op
context.log.warning("Some freshness checks failed, but continuing pipeline")
return result
except DagsterDbtCliRuntimeError as e:
context.log.error(f"dbt CLI error during freshness check: {e}")
# Could choose to fail or continue based on requirements
raise
except DagsterDbtError as e:
context.log.error(f"dbt integration error: {e}")
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt