A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers Dagster's automation system, including sensors, schedules, and declarative automation policies. These systems enable event-driven and time-based execution of pipelines with comprehensive monitoring and failure handling.
Schedules provide time-based execution of jobs and asset materializations using cron expressions and partition-based scheduling.
@schedule { .api }Module: dagster._core.definitions.decorators.schedule_decorator
Type: Function decorator
Define a time-based schedule for job execution.
from dagster import schedule, job, op, asset, Config, RunRequest, SkipReason
from dagster import DailyPartitionsDefinition, WeeklyPartitionsDefinition
import pandas as pd
@op
def extract_data() -> pd.DataFrame:
return pd.read_sql("SELECT * FROM daily_transactions", connection)
@op
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
return df.dropna().reset_index(drop=True)
@job
def daily_etl_job():
"""Daily ETL job."""
transform_data(extract_data())
# Basic daily schedule
@schedule(
job=daily_etl_job,
cron_schedule="0 2 * * *" # 2 AM daily
)
def daily_etl_schedule():
"""Schedule daily ETL at 2 AM."""
return {}
# Schedule with dynamic configuration
@schedule(
job=daily_etl_job,
cron_schedule="0 6 * * 1" # 6 AM every Monday
)
def weekly_full_refresh(context):
"""Weekly full refresh with dynamic config."""
# Generate run configuration based on schedule context
run_config = {
"ops": {
"extract_data": {
"config": {
"full_refresh": True,
"batch_size": 10000,
"run_date": context.scheduled_execution_time.strftime("%Y-%m-%d")
}
}
},
"resources": {
"warehouse": {
"config": {
"connection_timeout": 300 # Longer timeout for full refresh
}
}
}
}
return RunRequest(
run_key=f"weekly_refresh_{context.scheduled_execution_time.strftime('%Y_%m_%d')}",
run_config=run_config,
tags={
"schedule": "weekly_full_refresh",
"refresh_type": "full",
"execution_date": context.scheduled_execution_time.isoformat()
}
)
# Conditional schedule execution
@schedule(
job=daily_etl_job,
cron_schedule="0 */4 * * *" # Every 4 hours
)
def conditional_data_refresh(context):
"""Schedule that runs conditionally based on data availability."""
# Check if new data is available
last_update = get_last_data_update_time()
last_run = get_last_successful_run_time("daily_etl_job")
if last_update > last_run:
context.log.info(f"New data available since {last_update}, scheduling run")
return RunRequest(
run_key=f"data_refresh_{int(last_update.timestamp())}",
tags={
"trigger": "new_data_available",
"last_update": last_update.isoformat()
}
)
else:
return SkipReason(f"No new data since last run at {last_run}")
# Multiple run requests from single schedule
@schedule(
cron_schedule="0 1 * * *" # 1 AM daily
)
def multi_environment_deploy():
"""Schedule that deploys to multiple environments."""
environments = ["staging", "prod", "dr"]
run_requests = []
for env in environments:
run_requests.append(
RunRequest(
run_key=f"deploy_{env}_{datetime.now().strftime('%Y%m%d')}",
job_name="deployment_job",
run_config={
"resources": {
"deployment_target": {
"config": {"environment": env}
}
}
},
tags={
"environment": env,
"deployment_type": "scheduled"
}
)
)
return run_requestsParameters:
job: Union[JobDefinition, UnresolvedAssetJobDefinition] - Job to schedulecron_schedule: str - Cron expression for schedule timingname: Optional[str] - Schedule name (defaults to function name)execution_timezone: Optional[str] - Timezone for schedule executiondescription: Optional[str] - Schedule descriptiondefault_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED - Default schedule statustags: Optional[Dict[str, str]] - Schedule tagsbuild_schedule_from_partitioned_job { .api }Module: dagster._core.definitions.partitions.partitioned_schedule
Type: Function
Create a schedule from a partitioned job that runs for each partition.
from dagster import build_schedule_from_partitioned_job, DailyPartitionsDefinition
# Partitioned job
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
@asset(partitions_def=daily_partitions)
def daily_sales_data(context) -> pd.DataFrame:
"""Daily sales data asset."""
partition_date = context.partition_key
# Load data for specific date
query = f"""
SELECT * FROM sales
WHERE date = '{partition_date}'
"""
return pd.read_sql(query, connection)
@job(partitions_def=daily_partitions)
def daily_sales_job():
daily_sales_data()
# Create schedule that runs daily for the previous day's partition
daily_sales_schedule = build_schedule_from_partitioned_job(
job=daily_sales_job,
name="daily_sales_schedule",
description="Process daily sales data",
hour_of_day=2, # Run at 2 AM
minute_of_hour=0,
timezone="America/New_York"
)
# Custom partition schedule with multiple partitions
@schedule(
cron_schedule="0 3 * * *" # 3 AM daily
)
def backfill_schedule(context):
"""Schedule that processes multiple partitions for backfill."""
# Determine partitions to process
execution_date = context.scheduled_execution_time.date()
# Process last 7 days of partitions
partitions_to_run = []
for i in range(7):
partition_date = execution_date - timedelta(days=i+1)
partitions_to_run.append(partition_date.strftime("%Y-%m-%d"))
run_requests = []
for partition_key in partitions_to_run:
run_requests.append(
RunRequest(
run_key=f"backfill_{partition_key}",
partition_key=partition_key,
tags={
"partition": partition_key,
"run_type": "backfill"
}
)
)
return run_requestsScheduleEvaluationContext { .api }Module: dagster._core.definitions.schedule_definition
Type: Class
Context provided to schedule functions with execution information.
from dagster import ScheduleEvaluationContext, build_schedule_context
@schedule(
job=daily_etl_job,
cron_schedule="0 8 * * 1-5" # 8 AM weekdays
)
def business_days_schedule(context: ScheduleEvaluationContext):
"""Schedule with comprehensive context usage."""
# Access schedule execution time
scheduled_time = context.scheduled_execution_time
execution_date = scheduled_time.date()
# Access instance and run information
instance = context.instance
# Check for recent failures
recent_runs = instance.get_runs(
filters=RunsFilter(
job_name="daily_etl_job",
created_after=scheduled_time - timedelta(days=1)
),
limit=5
)
recent_failures = [run for run in recent_runs if run.status == DagsterRunStatus.FAILURE]
if len(recent_failures) >= 3:
context.log.warning(f"Found {len(recent_failures)} recent failures, skipping execution")
return SkipReason(f"Too many recent failures ({len(recent_failures)})")
# Dynamic configuration based on day of week
is_monday = scheduled_time.weekday() == 0
run_config = {
"ops": {
"extract_data": {
"config": {
"full_refresh": is_monday, # Full refresh on Mondays
"batch_size": 5000 if is_monday else 1000,
"execution_date": execution_date.isoformat()
}
}
}
}
# Generate unique run key
run_key = f"etl_{execution_date.strftime('%Y%m%d')}_{scheduled_time.hour}"
context.log.info(f"Scheduling ETL for {execution_date} (Monday: {is_monday})")
return RunRequest(
run_key=run_key,
run_config=run_config,
tags={
"execution_date": execution_date.isoformat(),
"is_monday": str(is_monday),
"schedule_time": scheduled_time.isoformat()
}
)
# Build schedule context for testing
test_context = build_schedule_context(
scheduled_execution_time=datetime(2023, 1, 16, 8, 0), # Monday 8 AM
instance=DagsterInstance.ephemeral()
)
# Test schedule function
result = business_days_schedule(test_context)
print(f"Schedule result: {result}")Key Properties:
scheduled_execution_time: datetime - When schedule was supposed to executeinstance: DagsterInstance - Dagster instancelog: DagsterLogManager - Logger for schedule evaluationSensors enable event-driven pipeline execution based on external events, asset changes, or custom conditions.
@sensor { .api }Module: dagster._core.definitions.decorators.sensor_decorator
Type: Function decorator
Define a sensor for event-driven execution.
from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
import os
import glob
from pathlib import Path
@job
def file_processing_job():
process_new_files()
# File-based sensor
@sensor(
job=file_processing_job,
minimum_interval_seconds=30 # Check every 30 seconds
)
def file_arrival_sensor(context: SensorEvaluationContext):
"""Sensor that triggers on new file arrivals."""
watch_directory = "/data/incoming"
processed_directory = "/data/processed"
# Find new files
new_files = []
for file_path in glob.glob(f"{watch_directory}/*.csv"):
file_name = os.path.basename(file_path)
processed_path = os.path.join(processed_directory, file_name)
# Check if file hasn't been processed
if not os.path.exists(processed_path):
file_stats = os.stat(file_path)
new_files.append({
"path": file_path,
"name": file_name,
"size": file_stats.st_size,
"modified": file_stats.st_mtime
})
if not new_files:
return SkipReason("No new files found")
context.log.info(f"Found {len(new_files)} new files to process")
run_requests = []
for file_info in new_files:
run_requests.append(
RunRequest(
run_key=f"file_{file_info['name']}_{int(file_info['modified'])}",
run_config={
"ops": {
"process_new_files": {
"config": {
"input_file": file_info["path"],
"output_directory": processed_directory
}
}
}
},
tags={
"file_name": file_info["name"],
"file_size": str(file_info["size"]),
"trigger": "file_arrival"
}
)
)
return run_requests
# API-based sensor
@sensor(
job=api_sync_job,
minimum_interval_seconds=300 # Check every 5 minutes
)
def api_data_sensor(context):
"""Sensor that monitors external API for new data."""
# Check API for new data
try:
api_response = requests.get(
"https://api.example.com/v1/status",
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30
)
api_response.raise_for_status()
status_data = api_response.json()
last_update = datetime.fromisoformat(status_data["last_update"])
# Get cursor from sensor context (persistent state)
last_processed = context.cursor
if last_processed:
last_processed_time = datetime.fromisoformat(last_processed)
else:
# First run, start from 1 hour ago
last_processed_time = datetime.now() - timedelta(hours=1)
if last_update > last_processed_time:
context.log.info(f"New data available since {last_update}")
# Update cursor to track progress
context.update_cursor(last_update.isoformat())
return RunRequest(
run_key=f"api_sync_{int(last_update.timestamp())}",
run_config={
"resources": {
"api_client": {
"config": {
"sync_from": last_processed_time.isoformat(),
"sync_to": last_update.isoformat()
}
}
}
},
tags={
"last_update": last_update.isoformat(),
"data_available": "true"
}
)
else:
return SkipReason(f"No new data since {last_processed_time}")
except Exception as e:
context.log.error(f"Failed to check API status: {str(e)}")
return SkipReason(f"API check failed: {str(e)}")
# Database change sensor
@sensor(
job=incremental_sync_job,
minimum_interval_seconds=60
)
def database_change_sensor(context):
"""Sensor that monitors database changes."""
# Query change tracking table
query = """
SELECT table_name, last_modified, change_count
FROM change_tracking
WHERE last_modified > %s
ORDER BY last_modified DESC
"""
# Use cursor to track last processed time
last_check = context.cursor or (datetime.now() - timedelta(minutes=5)).isoformat()
changes = execute_query(query, [last_check])
if not changes:
return SkipReason(f"No database changes since {last_check}")
# Update cursor to latest change time
latest_change = max(change["last_modified"] for change in changes)
context.update_cursor(latest_change.isoformat())
# Group changes by table for efficient processing
tables_changed = {}
for change in changes:
table = change["table_name"]
if table not in tables_changed:
tables_changed[table] = []
tables_changed[table].append(change)
context.log.info(f"Found changes in {len(tables_changed)} tables")
run_requests = []
for table_name, table_changes in tables_changed.items():
total_changes = sum(change["change_count"] for change in table_changes)
run_requests.append(
RunRequest(
run_key=f"sync_{table_name}_{int(latest_change.timestamp())}",
run_config={
"ops": {
"incremental_sync": {
"config": {
"table_name": table_name,
"sync_from": last_check,
"change_count": total_changes
}
}
}
},
tags={
"table": table_name,
"change_count": str(total_changes),
"trigger": "database_change"
}
)
)
return run_requestsParameters:
job: Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]] - Job to executename: Optional[str] - Sensor name (defaults to function name)minimum_interval_seconds: int = 30 - Minimum interval between evaluationsdescription: Optional[str] - Sensor descriptiondefault_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED - Default sensor status@asset_sensor { .api }Module: dagster._core.definitions.decorators.sensor_decorator
Type: Function decorator
Define a sensor that monitors specific assets for changes.
from dagster import asset_sensor, AssetKey, EventLogEntry, DagsterEventType
@asset
def raw_sales_data() -> pd.DataFrame:
"""Raw sales data asset."""
return pd.read_csv("/data/sales.csv")
@asset
def processed_sales_data() -> pd.DataFrame:
"""Processed sales data asset."""
return process_sales_data()
@job
def sales_analytics_job():
generate_sales_analytics()
# Asset sensor monitoring single asset
@asset_sensor(
asset_key=AssetKey("raw_sales_data"),
job=sales_analytics_job
)
def sales_data_sensor(context, asset_event: EventLogEntry):
"""Sensor that triggers when raw sales data is materialized."""
# Access asset event information
asset_key = asset_event.dagster_event.asset_key
materialization = asset_event.dagster_event.step_materialization_data.materialization
# Extract metadata from materialization
metadata = materialization.metadata_entries
record_count = None
for entry in metadata:
if entry.label == "records":
record_count = entry.entry_data.int_value
break
# Conditional execution based on data size
if record_count and record_count > 1000:
context.log.info(f"Large dataset materialized ({record_count} records), triggering analytics")
return RunRequest(
run_key=f"analytics_{asset_event.storage_id}_{asset_event.timestamp}",
run_config={
"ops": {
"generate_sales_analytics": {
"config": {
"input_asset_key": str(asset_key),
"record_count": record_count,
"processing_mode": "large_dataset"
}
}
}
},
tags={
"triggered_by": str(asset_key),
"record_count": str(record_count),
"event_id": str(asset_event.storage_id)
}
)
else:
return SkipReason(f"Dataset too small ({record_count} records), skipping analytics")
# Multi-asset sensor
@asset_sensor(
asset_key=AssetKey("processed_sales_data"),
job=downstream_analytics_job
)
def processed_sales_sensor(context, asset_event):
"""Sensor for processed sales data with dependency checking."""
# Check if all required dependencies are fresh
required_assets = [
AssetKey("customer_data"),
AssetKey("product_catalog"),
AssetKey("processed_sales_data")
]
# Query materialization times for dependencies
instance = context.instance
fresh_assets = []
for asset_key in required_assets:
latest_materialization = instance.get_latest_materialization_event(asset_key)
if latest_materialization:
# Consider fresh if materialized within last 6 hours
materialization_time = datetime.fromtimestamp(latest_materialization.timestamp)
if (datetime.now() - materialization_time) < timedelta(hours=6):
fresh_assets.append(asset_key)
if len(fresh_assets) == len(required_assets):
context.log.info("All dependencies are fresh, triggering downstream analytics")
return RunRequest(
run_key=f"downstream_{int(asset_event.timestamp)}",
tags={
"trigger_asset": str(asset_event.dagster_event.asset_key),
"fresh_dependencies": ",".join([str(k) for k in fresh_assets])
}
)
else:
stale_assets = set(required_assets) - set(fresh_assets)
return SkipReason(f"Stale dependencies: {[str(k) for k in stale_assets]}")@multi_asset_sensor { .api }Module: dagster._core.definitions.decorators.sensor_decorator
Type: Function decorator
Define a sensor that monitors multiple assets simultaneously.
from dagster import multi_asset_sensor, MultiAssetSensorEvaluationContext
@multi_asset_sensor(
monitored_assets=[
AssetKey(["raw", "users"]),
AssetKey(["raw", "orders"]),
AssetKey(["raw", "products"])
],
job=complete_etl_job
)
def multi_table_sensor(context: MultiAssetSensorEvaluationContext):
"""Sensor that waits for multiple assets before triggering."""
# Get materialization events for all monitored assets
asset_events = context.latest_materialization_records_by_key()
# Check if all assets have been materialized recently (within 1 hour)
cutoff_time = datetime.now() - timedelta(hours=1)
fresh_assets = []
for asset_key, event_record in asset_events.items():
if event_record:
materialization_time = datetime.fromtimestamp(event_record.timestamp)
if materialization_time > cutoff_time:
fresh_assets.append(asset_key)
context.log.info(f"Fresh materialization: {asset_key} at {materialization_time}")
monitored_keys = {
AssetKey(["raw", "users"]),
AssetKey(["raw", "orders"]),
AssetKey(["raw", "products"])
}
if set(fresh_assets) == monitored_keys:
context.log.info("All raw tables are fresh, triggering complete ETL")
# Create run key from latest materialization timestamps
latest_timestamp = max(
event_record.timestamp
for event_record in asset_events.values()
if event_record
)
return RunRequest(
run_key=f"complete_etl_{int(latest_timestamp)}",
run_config={
"ops": {
"etl_processor": {
"config": {
"mode": "complete_refresh",
"source_tables": ["users", "orders", "products"]
}
}
}
},
tags={
"trigger": "all_raw_tables_fresh",
"fresh_assets": ",".join([str(k) for k in fresh_assets]),
"latest_materialization": str(latest_timestamp)
}
)
else:
missing_assets = monitored_keys - set(fresh_assets)
return SkipReason(f"Waiting for fresh materializations: {[str(k) for k in missing_assets]}")
# Asset sensor with partition awareness
@asset_sensor(
asset_key=AssetKey("partitioned_sales"),
job=partition_analytics_job
)
def partitioned_sales_sensor(context, asset_event):
"""Sensor that handles partitioned assets."""
# Extract partition information from event
materialization = asset_event.dagster_event.step_materialization_data.materialization
partition_key = materialization.partition if materialization else None
if partition_key:
# Check if this partition is part of a complete set
# (e.g., all partitions for current month)
current_month = datetime.now().strftime("%Y-%m")
if partition_key.startswith(current_month):
context.log.info(f"Current month partition materialized: {partition_key}")
return RunRequest(
run_key=f"analytics_{partition_key}",
partition_key=partition_key,
tags={
"partition": partition_key,
"month": current_month,
"trigger": "current_month_partition"
}
)
else:
return SkipReason(f"Historical partition {partition_key}, no immediate action needed")
else:
return SkipReason("No partition information in materialization")@run_failure_sensor { .api }Module: dagster._core.definitions.run_status_sensor_definition
Type: Function decorator
Define a sensor that triggers on job run failures.
from dagster import run_failure_sensor, RunFailureSensorContext, DefaultSensorStatus
import smtplib
from email.mime.text import MIMEText
@run_failure_sensor(
monitored_jobs=[daily_etl_job, weekly_analytics_job],
default_status=DefaultSensorStatus.RUNNING
)
def job_failure_alert(context: RunFailureSensorContext):
"""Send alerts when critical jobs fail."""
# Access failure information
failed_run = context.dagster_run
failure_event = context.failure_event
job_name = failed_run.job_name
run_id = failed_run.run_id
failure_time = datetime.fromtimestamp(failure_event.timestamp)
# Extract error information
error_info = failure_event.dagster_event.engine_event_data
error_message = error_info.error.message if error_info and error_info.error else "Unknown error"
context.log.error(f"Job {job_name} failed with error: {error_message}")
# Send email notification
alert_message = f"""
Job Failure Alert
Job: {job_name}
Run ID: {run_id}
Failure Time: {failure_time.strftime('%Y-%m-%d %H:%M:%S')}
Error: {error_message}
Dagster UI: {context.instance.get_run_url(run_id)}
"""
try:
send_email(
subject=f"[DAGSTER ALERT] Job {job_name} Failed",
body=alert_message,
recipients=["data-team@company.com", "ops-team@company.com"]
)
context.log.info(f"Failure alert sent for job {job_name}")
except Exception as e:
context.log.error(f"Failed to send alert email: {str(e)}")
# Create Slack notification
try:
slack_message = {
"text": f"🚨 Dagster Job Failure: {job_name}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Job:* `{job_name}`\n*Run ID:* `{run_id}`\n*Time:* {failure_time.strftime('%Y-%m-%d %H:%M:%S')}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Error:*\n```{error_message[:500]}```"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View in Dagster"},
"url": context.instance.get_run_url(run_id)
}
]
}
]
}
send_slack_message(slack_message)
context.log.info(f"Slack alert sent for job {job_name}")
except Exception as e:
context.log.error(f"Failed to send Slack alert: {str(e)}")
# Run status sensor for success notifications
@run_status_sensor(
monitored_jobs=[critical_daily_job],
run_status=DagsterRunStatus.SUCCESS,
default_status=DefaultSensorStatus.RUNNING
)
def job_success_notification(context):
"""Notify on successful completion of critical jobs."""
successful_run = context.dagster_run
job_name = successful_run.job_name
# Only notify for long-running jobs
start_time = successful_run.start_time
end_time = successful_run.end_time
if start_time and end_time:
duration = end_time - start_time
if duration > timedelta(minutes=30): # Only for jobs > 30 minutes
context.log.info(f"Long-running job {job_name} completed successfully in {duration}")
# Send success notification
send_slack_message({
"text": f"✅ Long-running job `{job_name}` completed successfully",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Job:* `{job_name}`\n*Duration:* {str(duration)}\n*Status:* Success ✅"
}
}
]
})@run_status_sensor { .api }Module: dagster._core.definitions.run_status_sensor_definition
Type: Function decorator
Define a sensor for any run status change.
from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus
@run_status_sensor(
run_status=DagsterRunStatus.STARTED,
monitored_jobs=[expensive_ml_job]
)
def job_start_monitor(context: RunStatusSensorContext):
"""Monitor job starts for resource scaling."""
started_run = context.dagster_run
job_name = started_run.job_name
# Scale up resources for expensive jobs
if job_name == "expensive_ml_job":
context.log.info(f"Expensive job {job_name} started, scaling up resources")
# Trigger infrastructure scaling
try:
scale_compute_resources(
job_name=job_name,
run_id=started_run.run_id,
scale_action="up"
)
context.log.info("Successfully scaled up resources")
except Exception as e:
context.log.error(f"Failed to scale resources: {str(e)}")
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
monitored_jobs=[expensive_ml_job]
)
def job_completion_cleanup(context):
"""Clean up resources after job completion."""
completed_run = context.dagster_run
job_name = completed_run.job_name
context.log.info(f"Job {job_name} completed, scaling down resources")
# Scale down resources
try:
scale_compute_resources(
job_name=job_name,
run_id=completed_run.run_id,
scale_action="down"
)
context.log.info("Successfully scaled down resources")
except Exception as e:
context.log.error(f"Failed to scale down resources: {str(e)}")AutoMaterializePolicy { .api }Module: dagster._core.definitions.auto_materialize_policy
Type: Class
Policy for automatic asset materialization based on upstream changes.
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset
# Eager materialization - materialize immediately when upstream changes
@asset(auto_materialize_policy=AutoMaterializePolicy.eager())
def eager_asset(upstream_data):
"""Asset that materializes immediately when upstream changes."""
return process_data(upstream_data)
# Lazy materialization - materialize only when downstream requests it
@asset(auto_materialize_policy=AutoMaterializePolicy.lazy())
def lazy_asset(source_data):
"""Asset that materializes only when needed."""
return expensive_computation(source_data)
# Custom auto-materialization rules
@asset(
auto_materialize_policy=AutoMaterializePolicy.from_rules([
AutoMaterializeRule.materialize_on_parent_updated(),
AutoMaterializeRule.materialize_on_required_for_freshness(),
AutoMaterializeRule.skip_on_parent_missing(),
AutoMaterializeRule.skip_on_parent_outdated()
])
)
def custom_policy_asset(upstream_asset):
"""Asset with custom auto-materialization policy."""
return transform_data(upstream_asset)
# Time-based auto-materialization
@asset(
auto_materialize_policy=AutoMaterializePolicy.from_rules([
# Materialize daily at 2 AM
AutoMaterializeRule.materialize_on_cron("0 2 * * *"),
# But skip if upstream data is too old
AutoMaterializeRule.skip_on_parent_outdated(minutes=360) # 6 hours
])
)
def daily_summary(daily_transactions):
"""Asset that auto-materializes daily with freshness checks."""
return summarize_daily_transactions(daily_transactions)
# Conditional auto-materialization
@asset(
auto_materialize_policy=AutoMaterializePolicy.from_rules([
AutoMaterializeRule.materialize_on_parent_updated(),
# Skip during business hours to avoid impacting production
AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron("0 18 * * *") # 6 PM
])
)
def batch_processed_asset(real_time_data):
"""Asset that processes in batches outside business hours."""
return batch_process(real_time_data)AutomationCondition { .api }Module: dagster._core.definitions.declarative_automation.automation_condition
Type: Class
Declarative conditions for sophisticated automation logic.
from dagster import AutomationCondition, asset
# Complex automation condition
@asset(
automation_condition=(
# Materialize when parent is updated
AutomationCondition.parent_newer()
# But only during off-hours
& AutomationCondition.cron_tick_passed("0 0 * * *") # Midnight
# And not if recently materialized
& ~AutomationCondition.materialized_since_cron("0 18 * * *") # 6 PM
# And only if all dependencies are fresh
& AutomationCondition.all_deps_blocking_checks_passed()
)
)
def sophisticated_asset(upstream_data):
"""Asset with sophisticated automation logic."""
return complex_processing(upstream_data)
# Data quality gated automation
@asset(
automation_condition=(
AutomationCondition.parent_newer()
# Only materialize if upstream passes quality checks
& AutomationCondition.all_deps_blocking_checks_passed()
# And if we haven't failed recently
& ~AutomationCondition.failed_since_cron("0 0 * * *")
)
)
def quality_gated_asset(validated_data):
"""Asset that only materializes with high-quality upstream data."""
return process_validated_data(validated_data)
# Backfill automation
@asset(
automation_condition=(
# Normal condition for regular updates
AutomationCondition.parent_newer()
# OR backfill condition for missing partitions
| (
AutomationCondition.missing()
& AutomationCondition.parent_materialized()
# Only backfill during maintenance windows
& AutomationCondition.cron_tick_passed("0 2 * * 6") # Saturday 2 AM
)
)
)
def backfill_aware_asset(source_data):
"""Asset with automatic backfill logic."""
return process_data_with_backfill(source_data)from dagster import build_sensor_context, build_schedule_context, build_run_status_sensor_context
# Test sensor function
def test_file_arrival_sensor():
"""Test sensor with mock context."""
# Create test files
test_files = ["/tmp/test1.csv", "/tmp/test2.csv"]
for file_path in test_files:
Path(file_path).touch()
# Build sensor context
context = build_sensor_context(
cursor="2023-01-01T00:00:00",
instance=DagsterInstance.ephemeral()
)
# Test sensor
result = file_arrival_sensor(context)
# Verify results
assert isinstance(result, list)
assert len(result) == 2
assert all(isinstance(req, RunRequest) for req in result)
# Test schedule function
def test_business_days_schedule():
"""Test schedule with different times."""
# Test Monday execution
monday_context = build_schedule_context(
scheduled_execution_time=datetime(2023, 1, 16, 8, 0), # Monday
instance=DagsterInstance.ephemeral()
)
monday_result = business_days_schedule(monday_context)
assert isinstance(monday_result, RunRequest)
assert monday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == True
# Test Tuesday execution
tuesday_context = build_schedule_context(
scheduled_execution_time=datetime(2023, 1, 17, 8, 0) # Tuesday
)
tuesday_result = business_days_schedule(tuesday_context)
assert tuesday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == False
# Test failure sensor
def test_job_failure_alert():
"""Test failure sensor with mock failure."""
# Create mock failure context
context = build_run_status_sensor_context(
dagster_run=mock_failed_run,
dagster_event=mock_failure_event,
instance=DagsterInstance.ephemeral()
)
# Test failure sensor (should not raise)
job_failure_alert(context)This comprehensive sensor and schedule system enables sophisticated automation patterns with event-driven execution, time-based scheduling, failure handling, and declarative automation policies. The system provides rich context information and flexible configuration options for building robust data pipeline automation.
For partitioned execution with sensors and schedules, see Partitions System. For error handling and failure management, see Error Handling.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster