CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

sensors-schedules.mddocs/

Sensors and Schedules

This document covers Dagster's automation system, including sensors, schedules, and declarative automation policies. These systems enable event-driven and time-based execution of pipelines with comprehensive monitoring and failure handling.

Schedule System

Schedules provide time-based execution of jobs and asset materializations using cron expressions and partition-based scheduling.

Schedule Decorator

@schedule { .api }

Module: dagster._core.definitions.decorators.schedule_decorator
Type: Function decorator

Define a time-based schedule for job execution.

from dagster import schedule, job, op, asset, Config, RunRequest, SkipReason
from dagster import DailyPartitionsDefinition, WeeklyPartitionsDefinition
import pandas as pd

@op
def extract_data() -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM daily_transactions", connection)

@op
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    return df.dropna().reset_index(drop=True)

@job
def daily_etl_job():
    """Daily ETL job."""
    transform_data(extract_data())

# Basic daily schedule
@schedule(
    job=daily_etl_job,
    cron_schedule="0 2 * * *"  # 2 AM daily
)
def daily_etl_schedule():
    """Schedule daily ETL at 2 AM."""
    return {}

# Schedule with dynamic configuration
@schedule(
    job=daily_etl_job,
    cron_schedule="0 6 * * 1"  # 6 AM every Monday
)
def weekly_full_refresh(context):
    """Weekly full refresh with dynamic config."""
    
    # Generate run configuration based on schedule context
    run_config = {
        "ops": {
            "extract_data": {
                "config": {
                    "full_refresh": True,
                    "batch_size": 10000,
                    "run_date": context.scheduled_execution_time.strftime("%Y-%m-%d")
                }
            }
        },
        "resources": {
            "warehouse": {
                "config": {
                    "connection_timeout": 300  # Longer timeout for full refresh
                }
            }
        }
    }
    
    return RunRequest(
        run_key=f"weekly_refresh_{context.scheduled_execution_time.strftime('%Y_%m_%d')}",
        run_config=run_config,
        tags={
            "schedule": "weekly_full_refresh",
            "refresh_type": "full",
            "execution_date": context.scheduled_execution_time.isoformat()
        }
    )

# Conditional schedule execution
@schedule(
    job=daily_etl_job,
    cron_schedule="0 */4 * * *"  # Every 4 hours
)
def conditional_data_refresh(context):
    """Schedule that runs conditionally based on data availability."""
    
    # Check if new data is available
    last_update = get_last_data_update_time()
    last_run = get_last_successful_run_time("daily_etl_job")
    
    if last_update > last_run:
        context.log.info(f"New data available since {last_update}, scheduling run")
        
        return RunRequest(
            run_key=f"data_refresh_{int(last_update.timestamp())}",
            tags={
                "trigger": "new_data_available",
                "last_update": last_update.isoformat()
            }
        )
    else:
        return SkipReason(f"No new data since last run at {last_run}")

# Multiple run requests from single schedule
@schedule(
    cron_schedule="0 1 * * *"  # 1 AM daily
)
def multi_environment_deploy():
    """Schedule that deploys to multiple environments."""
    
    environments = ["staging", "prod", "dr"]
    run_requests = []
    
    for env in environments:
        run_requests.append(
            RunRequest(
                run_key=f"deploy_{env}_{datetime.now().strftime('%Y%m%d')}",
                job_name="deployment_job",
                run_config={
                    "resources": {
                        "deployment_target": {
                            "config": {"environment": env}
                        }
                    }
                },
                tags={
                    "environment": env,
                    "deployment_type": "scheduled"
                }
            )
        )
    
    return run_requests

Parameters:

  • job: Union[JobDefinition, UnresolvedAssetJobDefinition] - Job to schedule
  • cron_schedule: str - Cron expression for schedule timing
  • name: Optional[str] - Schedule name (defaults to function name)
  • execution_timezone: Optional[str] - Timezone for schedule execution
  • description: Optional[str] - Schedule description
  • default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED - Default schedule status
  • tags: Optional[Dict[str, str]] - Schedule tags

Partitioned Schedules

build_schedule_from_partitioned_job { .api }

Module: dagster._core.definitions.partitions.partitioned_schedule
Type: Function

Create a schedule from a partitioned job that runs for each partition.

from dagster import build_schedule_from_partitioned_job, DailyPartitionsDefinition

# Partitioned job
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")

@asset(partitions_def=daily_partitions)
def daily_sales_data(context) -> pd.DataFrame:
    """Daily sales data asset."""
    partition_date = context.partition_key
    
    # Load data for specific date
    query = f"""
    SELECT * FROM sales 
    WHERE date = '{partition_date}'
    """
    
    return pd.read_sql(query, connection)

@job(partitions_def=daily_partitions)  
def daily_sales_job():
    daily_sales_data()

# Create schedule that runs daily for the previous day's partition
daily_sales_schedule = build_schedule_from_partitioned_job(
    job=daily_sales_job,
    name="daily_sales_schedule", 
    description="Process daily sales data",
    hour_of_day=2,  # Run at 2 AM
    minute_of_hour=0,
    timezone="America/New_York"
)

# Custom partition schedule with multiple partitions
@schedule(
    cron_schedule="0 3 * * *"  # 3 AM daily
)
def backfill_schedule(context):
    """Schedule that processes multiple partitions for backfill."""
    
    # Determine partitions to process
    execution_date = context.scheduled_execution_time.date()
    
    # Process last 7 days of partitions
    partitions_to_run = []
    for i in range(7):
        partition_date = execution_date - timedelta(days=i+1)
        partitions_to_run.append(partition_date.strftime("%Y-%m-%d"))
    
    run_requests = []
    for partition_key in partitions_to_run:
        run_requests.append(
            RunRequest(
                run_key=f"backfill_{partition_key}",
                partition_key=partition_key,
                tags={
                    "partition": partition_key,
                    "run_type": "backfill"
                }
            )
        )
    
    return run_requests

Schedule Context

ScheduleEvaluationContext { .api }

Module: dagster._core.definitions.schedule_definition
Type: Class

Context provided to schedule functions with execution information.

from dagster import ScheduleEvaluationContext, build_schedule_context

@schedule(
    job=daily_etl_job,
    cron_schedule="0 8 * * 1-5"  # 8 AM weekdays
)
def business_days_schedule(context: ScheduleEvaluationContext):
    """Schedule with comprehensive context usage."""
    
    # Access schedule execution time
    scheduled_time = context.scheduled_execution_time
    execution_date = scheduled_time.date()
    
    # Access instance and run information  
    instance = context.instance
    
    # Check for recent failures
    recent_runs = instance.get_runs(
        filters=RunsFilter(
            job_name="daily_etl_job",
            created_after=scheduled_time - timedelta(days=1)
        ),
        limit=5
    )
    
    recent_failures = [run for run in recent_runs if run.status == DagsterRunStatus.FAILURE]
    
    if len(recent_failures) >= 3:
        context.log.warning(f"Found {len(recent_failures)} recent failures, skipping execution")
        return SkipReason(f"Too many recent failures ({len(recent_failures)})")
    
    # Dynamic configuration based on day of week
    is_monday = scheduled_time.weekday() == 0
    
    run_config = {
        "ops": {
            "extract_data": {
                "config": {
                    "full_refresh": is_monday,  # Full refresh on Mondays
                    "batch_size": 5000 if is_monday else 1000,
                    "execution_date": execution_date.isoformat()
                }
            }
        }
    }
    
    # Generate unique run key
    run_key = f"etl_{execution_date.strftime('%Y%m%d')}_{scheduled_time.hour}"
    
    context.log.info(f"Scheduling ETL for {execution_date} (Monday: {is_monday})")
    
    return RunRequest(
        run_key=run_key,
        run_config=run_config,
        tags={
            "execution_date": execution_date.isoformat(),
            "is_monday": str(is_monday),
            "schedule_time": scheduled_time.isoformat()
        }
    )

# Build schedule context for testing
test_context = build_schedule_context(
    scheduled_execution_time=datetime(2023, 1, 16, 8, 0),  # Monday 8 AM
    instance=DagsterInstance.ephemeral()
)

# Test schedule function
result = business_days_schedule(test_context)
print(f"Schedule result: {result}")

Key Properties:

  • scheduled_execution_time: datetime - When schedule was supposed to execute
  • instance: DagsterInstance - Dagster instance
  • log: DagsterLogManager - Logger for schedule evaluation

Sensor System

Sensors enable event-driven pipeline execution based on external events, asset changes, or custom conditions.

Sensor Decorator

@sensor { .api }

Module: dagster._core.definitions.decorators.sensor_decorator
Type: Function decorator

Define a sensor for event-driven execution.

from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
import os
import glob
from pathlib import Path

@job
def file_processing_job():
    process_new_files()

# File-based sensor
@sensor(
    job=file_processing_job,
    minimum_interval_seconds=30  # Check every 30 seconds
)
def file_arrival_sensor(context: SensorEvaluationContext):
    """Sensor that triggers on new file arrivals."""
    
    watch_directory = "/data/incoming"
    processed_directory = "/data/processed"
    
    # Find new files
    new_files = []
    for file_path in glob.glob(f"{watch_directory}/*.csv"):
        file_name = os.path.basename(file_path)
        processed_path = os.path.join(processed_directory, file_name)
        
        # Check if file hasn't been processed
        if not os.path.exists(processed_path):
            file_stats = os.stat(file_path)
            new_files.append({
                "path": file_path,
                "name": file_name,
                "size": file_stats.st_size,
                "modified": file_stats.st_mtime
            })
    
    if not new_files:
        return SkipReason("No new files found")
    
    context.log.info(f"Found {len(new_files)} new files to process")
    
    run_requests = []
    for file_info in new_files:
        run_requests.append(
            RunRequest(
                run_key=f"file_{file_info['name']}_{int(file_info['modified'])}",
                run_config={
                    "ops": {
                        "process_new_files": {
                            "config": {
                                "input_file": file_info["path"],
                                "output_directory": processed_directory
                            }
                        }
                    }
                },
                tags={
                    "file_name": file_info["name"],
                    "file_size": str(file_info["size"]),
                    "trigger": "file_arrival"
                }
            )
        )
    
    return run_requests

# API-based sensor
@sensor(
    job=api_sync_job,
    minimum_interval_seconds=300  # Check every 5 minutes
)
def api_data_sensor(context):
    """Sensor that monitors external API for new data."""
    
    # Check API for new data
    try:
        api_response = requests.get(
            "https://api.example.com/v1/status",
            headers={"Authorization": f"Bearer {API_TOKEN}"},
            timeout=30
        )
        api_response.raise_for_status()
        
        status_data = api_response.json()
        last_update = datetime.fromisoformat(status_data["last_update"])
        
        # Get cursor from sensor context (persistent state)
        last_processed = context.cursor
        if last_processed:
            last_processed_time = datetime.fromisoformat(last_processed)
        else:
            # First run, start from 1 hour ago
            last_processed_time = datetime.now() - timedelta(hours=1)
        
        if last_update > last_processed_time:
            context.log.info(f"New data available since {last_update}")
            
            # Update cursor to track progress
            context.update_cursor(last_update.isoformat())
            
            return RunRequest(
                run_key=f"api_sync_{int(last_update.timestamp())}",
                run_config={
                    "resources": {
                        "api_client": {
                            "config": {
                                "sync_from": last_processed_time.isoformat(),
                                "sync_to": last_update.isoformat()
                            }
                        }
                    }
                },
                tags={
                    "last_update": last_update.isoformat(),
                    "data_available": "true"
                }
            )
        else:
            return SkipReason(f"No new data since {last_processed_time}")
            
    except Exception as e:
        context.log.error(f"Failed to check API status: {str(e)}")
        return SkipReason(f"API check failed: {str(e)}")

# Database change sensor
@sensor(
    job=incremental_sync_job,
    minimum_interval_seconds=60
)
def database_change_sensor(context):
    """Sensor that monitors database changes."""
    
    # Query change tracking table
    query = """
    SELECT table_name, last_modified, change_count
    FROM change_tracking 
    WHERE last_modified > %s
    ORDER BY last_modified DESC
    """
    
    # Use cursor to track last processed time
    last_check = context.cursor or (datetime.now() - timedelta(minutes=5)).isoformat()
    
    changes = execute_query(query, [last_check])
    
    if not changes:
        return SkipReason(f"No database changes since {last_check}")
    
    # Update cursor to latest change time
    latest_change = max(change["last_modified"] for change in changes)
    context.update_cursor(latest_change.isoformat())
    
    # Group changes by table for efficient processing
    tables_changed = {}
    for change in changes:
        table = change["table_name"]
        if table not in tables_changed:
            tables_changed[table] = []
        tables_changed[table].append(change)
    
    context.log.info(f"Found changes in {len(tables_changed)} tables")
    
    run_requests = []
    for table_name, table_changes in tables_changed.items():
        total_changes = sum(change["change_count"] for change in table_changes)
        
        run_requests.append(
            RunRequest(
                run_key=f"sync_{table_name}_{int(latest_change.timestamp())}",
                run_config={
                    "ops": {
                        "incremental_sync": {
                            "config": {
                                "table_name": table_name,
                                "sync_from": last_check,
                                "change_count": total_changes
                            }
                        }
                    }
                },
                tags={
                    "table": table_name,
                    "change_count": str(total_changes),
                    "trigger": "database_change"
                }
            )
        )
    
    return run_requests

Parameters:

  • job: Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]] - Job to execute
  • name: Optional[str] - Sensor name (defaults to function name)
  • minimum_interval_seconds: int = 30 - Minimum interval between evaluations
  • description: Optional[str] - Sensor description
  • default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED - Default sensor status

Asset Sensors

@asset_sensor { .api }

Module: dagster._core.definitions.decorators.sensor_decorator
Type: Function decorator

Define a sensor that monitors specific assets for changes.

from dagster import asset_sensor, AssetKey, EventLogEntry, DagsterEventType

@asset
def raw_sales_data() -> pd.DataFrame:
    """Raw sales data asset."""
    return pd.read_csv("/data/sales.csv")

@asset
def processed_sales_data() -> pd.DataFrame:
    """Processed sales data asset.""" 
    return process_sales_data()

@job
def sales_analytics_job():
    generate_sales_analytics()

# Asset sensor monitoring single asset
@asset_sensor(
    asset_key=AssetKey("raw_sales_data"),
    job=sales_analytics_job
)
def sales_data_sensor(context, asset_event: EventLogEntry):
    """Sensor that triggers when raw sales data is materialized."""
    
    # Access asset event information
    asset_key = asset_event.dagster_event.asset_key
    materialization = asset_event.dagster_event.step_materialization_data.materialization
    
    # Extract metadata from materialization
    metadata = materialization.metadata_entries
    record_count = None
    for entry in metadata:
        if entry.label == "records":
            record_count = entry.entry_data.int_value
            break
    
    # Conditional execution based on data size
    if record_count and record_count > 1000:
        context.log.info(f"Large dataset materialized ({record_count} records), triggering analytics")
        
        return RunRequest(
            run_key=f"analytics_{asset_event.storage_id}_{asset_event.timestamp}",
            run_config={
                "ops": {
                    "generate_sales_analytics": {
                        "config": {
                            "input_asset_key": str(asset_key),
                            "record_count": record_count,
                            "processing_mode": "large_dataset"
                        }
                    }
                }
            },
            tags={
                "triggered_by": str(asset_key),
                "record_count": str(record_count),
                "event_id": str(asset_event.storage_id)
            }
        )
    else:
        return SkipReason(f"Dataset too small ({record_count} records), skipping analytics")

# Multi-asset sensor
@asset_sensor(
    asset_key=AssetKey("processed_sales_data"),
    job=downstream_analytics_job
)
def processed_sales_sensor(context, asset_event):
    """Sensor for processed sales data with dependency checking."""
    
    # Check if all required dependencies are fresh
    required_assets = [
        AssetKey("customer_data"),
        AssetKey("product_catalog"),
        AssetKey("processed_sales_data")
    ]
    
    # Query materialization times for dependencies
    instance = context.instance
    fresh_assets = []
    
    for asset_key in required_assets:
        latest_materialization = instance.get_latest_materialization_event(asset_key)
        if latest_materialization:
            # Consider fresh if materialized within last 6 hours
            materialization_time = datetime.fromtimestamp(latest_materialization.timestamp)
            if (datetime.now() - materialization_time) < timedelta(hours=6):
                fresh_assets.append(asset_key)
    
    if len(fresh_assets) == len(required_assets):
        context.log.info("All dependencies are fresh, triggering downstream analytics")
        
        return RunRequest(
            run_key=f"downstream_{int(asset_event.timestamp)}",
            tags={
                "trigger_asset": str(asset_event.dagster_event.asset_key),
                "fresh_dependencies": ",".join([str(k) for k in fresh_assets])
            }
        )
    else:
        stale_assets = set(required_assets) - set(fresh_assets)
        return SkipReason(f"Stale dependencies: {[str(k) for k in stale_assets]}")

@multi_asset_sensor { .api }

Module: dagster._core.definitions.decorators.sensor_decorator
Type: Function decorator

Define a sensor that monitors multiple assets simultaneously.

from dagster import multi_asset_sensor, MultiAssetSensorEvaluationContext

@multi_asset_sensor(
    monitored_assets=[
        AssetKey(["raw", "users"]),
        AssetKey(["raw", "orders"]), 
        AssetKey(["raw", "products"])
    ],
    job=complete_etl_job
)
def multi_table_sensor(context: MultiAssetSensorEvaluationContext):
    """Sensor that waits for multiple assets before triggering."""
    
    # Get materialization events for all monitored assets
    asset_events = context.latest_materialization_records_by_key()
    
    # Check if all assets have been materialized recently (within 1 hour)
    cutoff_time = datetime.now() - timedelta(hours=1)
    fresh_assets = []
    
    for asset_key, event_record in asset_events.items():
        if event_record:
            materialization_time = datetime.fromtimestamp(event_record.timestamp)
            if materialization_time > cutoff_time:
                fresh_assets.append(asset_key)
                context.log.info(f"Fresh materialization: {asset_key} at {materialization_time}")
    
    monitored_keys = {
        AssetKey(["raw", "users"]),
        AssetKey(["raw", "orders"]),
        AssetKey(["raw", "products"])
    }
    
    if set(fresh_assets) == monitored_keys:
        context.log.info("All raw tables are fresh, triggering complete ETL")
        
        # Create run key from latest materialization timestamps
        latest_timestamp = max(
            event_record.timestamp 
            for event_record in asset_events.values() 
            if event_record
        )
        
        return RunRequest(
            run_key=f"complete_etl_{int(latest_timestamp)}",
            run_config={
                "ops": {
                    "etl_processor": {
                        "config": {
                            "mode": "complete_refresh",
                            "source_tables": ["users", "orders", "products"]
                        }
                    }
                }
            },
            tags={
                "trigger": "all_raw_tables_fresh",
                "fresh_assets": ",".join([str(k) for k in fresh_assets]),
                "latest_materialization": str(latest_timestamp)
            }
        )
    else:
        missing_assets = monitored_keys - set(fresh_assets)
        return SkipReason(f"Waiting for fresh materializations: {[str(k) for k in missing_assets]}")

# Asset sensor with partition awareness
@asset_sensor(
    asset_key=AssetKey("partitioned_sales"),
    job=partition_analytics_job
)
def partitioned_sales_sensor(context, asset_event):
    """Sensor that handles partitioned assets."""
    
    # Extract partition information from event
    materialization = asset_event.dagster_event.step_materialization_data.materialization
    partition_key = materialization.partition if materialization else None
    
    if partition_key:
        # Check if this partition is part of a complete set
        # (e.g., all partitions for current month)
        current_month = datetime.now().strftime("%Y-%m")
        
        if partition_key.startswith(current_month):
            context.log.info(f"Current month partition materialized: {partition_key}")
            
            return RunRequest(
                run_key=f"analytics_{partition_key}",
                partition_key=partition_key,
                tags={
                    "partition": partition_key,
                    "month": current_month,
                    "trigger": "current_month_partition"
                }
            )
        else:
            return SkipReason(f"Historical partition {partition_key}, no immediate action needed")
    else:
        return SkipReason("No partition information in materialization")

Run Status Sensors

@run_failure_sensor { .api }

Module: dagster._core.definitions.run_status_sensor_definition
Type: Function decorator

Define a sensor that triggers on job run failures.

from dagster import run_failure_sensor, RunFailureSensorContext, DefaultSensorStatus
import smtplib
from email.mime.text import MIMEText

@run_failure_sensor(
    monitored_jobs=[daily_etl_job, weekly_analytics_job],
    default_status=DefaultSensorStatus.RUNNING
)
def job_failure_alert(context: RunFailureSensorContext):
    """Send alerts when critical jobs fail."""
    
    # Access failure information
    failed_run = context.dagster_run
    failure_event = context.failure_event
    
    job_name = failed_run.job_name
    run_id = failed_run.run_id
    failure_time = datetime.fromtimestamp(failure_event.timestamp)
    
    # Extract error information
    error_info = failure_event.dagster_event.engine_event_data
    error_message = error_info.error.message if error_info and error_info.error else "Unknown error"
    
    context.log.error(f"Job {job_name} failed with error: {error_message}")
    
    # Send email notification
    alert_message = f"""
    Job Failure Alert
    
    Job: {job_name}
    Run ID: {run_id}
    Failure Time: {failure_time.strftime('%Y-%m-%d %H:%M:%S')}
    
    Error: {error_message}
    
    Dagster UI: {context.instance.get_run_url(run_id)}
    """
    
    try:
        send_email(
            subject=f"[DAGSTER ALERT] Job {job_name} Failed",
            body=alert_message,
            recipients=["data-team@company.com", "ops-team@company.com"]
        )
        
        context.log.info(f"Failure alert sent for job {job_name}")
        
    except Exception as e:
        context.log.error(f"Failed to send alert email: {str(e)}")
    
    # Create Slack notification
    try:
        slack_message = {
            "text": f"🚨 Dagster Job Failure: {job_name}",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Job:* `{job_name}`\n*Run ID:* `{run_id}`\n*Time:* {failure_time.strftime('%Y-%m-%d %H:%M:%S')}"
                    }
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn", 
                        "text": f"*Error:*\n```{error_message[:500]}```"
                    }
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "View in Dagster"},
                            "url": context.instance.get_run_url(run_id)
                        }
                    ]
                }
            ]
        }
        
        send_slack_message(slack_message)
        context.log.info(f"Slack alert sent for job {job_name}")
        
    except Exception as e:
        context.log.error(f"Failed to send Slack alert: {str(e)}")

# Run status sensor for success notifications
@run_status_sensor(
    monitored_jobs=[critical_daily_job],
    run_status=DagsterRunStatus.SUCCESS,
    default_status=DefaultSensorStatus.RUNNING
)
def job_success_notification(context):
    """Notify on successful completion of critical jobs."""
    
    successful_run = context.dagster_run
    job_name = successful_run.job_name
    
    # Only notify for long-running jobs
    start_time = successful_run.start_time
    end_time = successful_run.end_time
    
    if start_time and end_time:
        duration = end_time - start_time
        
        if duration > timedelta(minutes=30):  # Only for jobs > 30 minutes
            context.log.info(f"Long-running job {job_name} completed successfully in {duration}")
            
            # Send success notification
            send_slack_message({
                "text": f"✅ Long-running job `{job_name}` completed successfully",
                "blocks": [
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"*Job:* `{job_name}`\n*Duration:* {str(duration)}\n*Status:* Success ✅"
                        }
                    }
                ]
            })

@run_status_sensor { .api }

Module: dagster._core.definitions.run_status_sensor_definition
Type: Function decorator

Define a sensor for any run status change.

from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus

@run_status_sensor(
    run_status=DagsterRunStatus.STARTED,
    monitored_jobs=[expensive_ml_job]
)
def job_start_monitor(context: RunStatusSensorContext):
    """Monitor job starts for resource scaling."""
    
    started_run = context.dagster_run
    job_name = started_run.job_name
    
    # Scale up resources for expensive jobs
    if job_name == "expensive_ml_job":
        context.log.info(f"Expensive job {job_name} started, scaling up resources")
        
        # Trigger infrastructure scaling
        try:
            scale_compute_resources(
                job_name=job_name,
                run_id=started_run.run_id,
                scale_action="up"
            )
            
            context.log.info("Successfully scaled up resources")
            
        except Exception as e:
            context.log.error(f"Failed to scale resources: {str(e)}")

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    monitored_jobs=[expensive_ml_job]
)
def job_completion_cleanup(context):
    """Clean up resources after job completion."""
    
    completed_run = context.dagster_run
    job_name = completed_run.job_name
    
    context.log.info(f"Job {job_name} completed, scaling down resources")
    
    # Scale down resources
    try:
        scale_compute_resources(
            job_name=job_name,
            run_id=completed_run.run_id,
            scale_action="down"
        )
        
        context.log.info("Successfully scaled down resources")
        
    except Exception as e:
        context.log.error(f"Failed to scale down resources: {str(e)}")

Automation Policies

Auto-Materialization Policies

AutoMaterializePolicy { .api }

Module: dagster._core.definitions.auto_materialize_policy
Type: Class

Policy for automatic asset materialization based on upstream changes.

from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset

# Eager materialization - materialize immediately when upstream changes
@asset(auto_materialize_policy=AutoMaterializePolicy.eager())
def eager_asset(upstream_data):
    """Asset that materializes immediately when upstream changes."""
    return process_data(upstream_data)

# Lazy materialization - materialize only when downstream requests it
@asset(auto_materialize_policy=AutoMaterializePolicy.lazy())
def lazy_asset(source_data):
    """Asset that materializes only when needed."""
    return expensive_computation(source_data)

# Custom auto-materialization rules
@asset(
    auto_materialize_policy=AutoMaterializePolicy.from_rules([
        AutoMaterializeRule.materialize_on_parent_updated(),
        AutoMaterializeRule.materialize_on_required_for_freshness(),
        AutoMaterializeRule.skip_on_parent_missing(),
        AutoMaterializeRule.skip_on_parent_outdated()
    ])
)
def custom_policy_asset(upstream_asset):
    """Asset with custom auto-materialization policy."""
    return transform_data(upstream_asset)

# Time-based auto-materialization
@asset(
    auto_materialize_policy=AutoMaterializePolicy.from_rules([
        # Materialize daily at 2 AM
        AutoMaterializeRule.materialize_on_cron("0 2 * * *"),
        # But skip if upstream data is too old
        AutoMaterializeRule.skip_on_parent_outdated(minutes=360)  # 6 hours
    ])
)
def daily_summary(daily_transactions):
    """Asset that auto-materializes daily with freshness checks."""
    return summarize_daily_transactions(daily_transactions)

# Conditional auto-materialization
@asset(
    auto_materialize_policy=AutoMaterializePolicy.from_rules([
        AutoMaterializeRule.materialize_on_parent_updated(),
        # Skip during business hours to avoid impacting production
        AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron("0 18 * * *")  # 6 PM
    ])
)
def batch_processed_asset(real_time_data):
    """Asset that processes in batches outside business hours."""
    return batch_process(real_time_data)

Declarative Automation

AutomationCondition { .api }

Module: dagster._core.definitions.declarative_automation.automation_condition
Type: Class

Declarative conditions for sophisticated automation logic.

from dagster import AutomationCondition, asset

# Complex automation condition
@asset(
    automation_condition=(
        # Materialize when parent is updated
        AutomationCondition.parent_newer() 
        # But only during off-hours
        & AutomationCondition.cron_tick_passed("0 0 * * *")  # Midnight
        # And not if recently materialized
        & ~AutomationCondition.materialized_since_cron("0 18 * * *")  # 6 PM
        # And only if all dependencies are fresh
        & AutomationCondition.all_deps_blocking_checks_passed()
    )
)
def sophisticated_asset(upstream_data):
    """Asset with sophisticated automation logic."""
    return complex_processing(upstream_data)

# Data quality gated automation
@asset(
    automation_condition=(
        AutomationCondition.parent_newer()
        # Only materialize if upstream passes quality checks
        & AutomationCondition.all_deps_blocking_checks_passed() 
        # And if we haven't failed recently
        & ~AutomationCondition.failed_since_cron("0 0 * * *")
    )
)
def quality_gated_asset(validated_data):
    """Asset that only materializes with high-quality upstream data."""
    return process_validated_data(validated_data)

# Backfill automation
@asset(
    automation_condition=(
        # Normal condition for regular updates
        AutomationCondition.parent_newer()
        # OR backfill condition for missing partitions
        | (
            AutomationCondition.missing()
            & AutomationCondition.parent_materialized()
            # Only backfill during maintenance windows
            & AutomationCondition.cron_tick_passed("0 2 * * 6")  # Saturday 2 AM
        )
    )
)
def backfill_aware_asset(source_data):
    """Asset with automatic backfill logic."""
    return process_data_with_backfill(source_data)

Sensor and Schedule Context Builders

Context Building for Testing

from dagster import build_sensor_context, build_schedule_context, build_run_status_sensor_context

# Test sensor function
def test_file_arrival_sensor():
    """Test sensor with mock context."""
    
    # Create test files
    test_files = ["/tmp/test1.csv", "/tmp/test2.csv"]
    for file_path in test_files:
        Path(file_path).touch()
    
    # Build sensor context
    context = build_sensor_context(
        cursor="2023-01-01T00:00:00",
        instance=DagsterInstance.ephemeral()
    )
    
    # Test sensor
    result = file_arrival_sensor(context)
    
    # Verify results
    assert isinstance(result, list)
    assert len(result) == 2
    assert all(isinstance(req, RunRequest) for req in result)

# Test schedule function  
def test_business_days_schedule():
    """Test schedule with different times."""
    
    # Test Monday execution
    monday_context = build_schedule_context(
        scheduled_execution_time=datetime(2023, 1, 16, 8, 0),  # Monday
        instance=DagsterInstance.ephemeral()
    )
    
    monday_result = business_days_schedule(monday_context)
    assert isinstance(monday_result, RunRequest)
    assert monday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == True
    
    # Test Tuesday execution  
    tuesday_context = build_schedule_context(
        scheduled_execution_time=datetime(2023, 1, 17, 8, 0)  # Tuesday
    )
    
    tuesday_result = business_days_schedule(tuesday_context)
    assert tuesday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == False

# Test failure sensor
def test_job_failure_alert():
    """Test failure sensor with mock failure."""
    
    # Create mock failure context
    context = build_run_status_sensor_context(
        dagster_run=mock_failed_run,
        dagster_event=mock_failure_event,
        instance=DagsterInstance.ephemeral()
    )
    
    # Test failure sensor (should not raise)
    job_failure_alert(context)

This comprehensive sensor and schedule system enables sophisticated automation patterns with event-driven execution, time-based scheduling, failure handling, and declarative automation policies. The system provides rich context information and flexible configuration options for building robust data pipeline automation.

For partitioned execution with sensors and schedules, see Partitions System. For error handling and failure management, see Error Handling.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json