0
# Sensors and Schedules
1
2
This document covers Dagster's automation system, including sensors, schedules, and declarative automation policies. These systems enable event-driven and time-based execution of pipelines with comprehensive monitoring and failure handling.
3
4
## Schedule System
5
6
Schedules provide time-based execution of jobs and asset materializations using cron expressions and partition-based scheduling.
7
8
### Schedule Decorator
9
10
#### `@schedule` { .api }
11
12
**Module:** `dagster._core.definitions.decorators.schedule_decorator`
13
**Type:** Function decorator
14
15
Define a time-based schedule for job execution.
16
17
```python
18
from dagster import schedule, job, op, asset, Config, RunRequest, SkipReason
19
from dagster import DailyPartitionsDefinition, WeeklyPartitionsDefinition
20
import pandas as pd
21
22
@op
23
def extract_data() -> pd.DataFrame:
24
return pd.read_sql("SELECT * FROM daily_transactions", connection)
25
26
@op
27
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
28
return df.dropna().reset_index(drop=True)
29
30
@job
31
def daily_etl_job():
32
"""Daily ETL job."""
33
transform_data(extract_data())
34
35
# Basic daily schedule
36
@schedule(
37
job=daily_etl_job,
38
cron_schedule="0 2 * * *" # 2 AM daily
39
)
40
def daily_etl_schedule():
41
"""Schedule daily ETL at 2 AM."""
42
return {}
43
44
# Schedule with dynamic configuration
45
@schedule(
46
job=daily_etl_job,
47
cron_schedule="0 6 * * 1" # 6 AM every Monday
48
)
49
def weekly_full_refresh(context):
50
"""Weekly full refresh with dynamic config."""
51
52
# Generate run configuration based on schedule context
53
run_config = {
54
"ops": {
55
"extract_data": {
56
"config": {
57
"full_refresh": True,
58
"batch_size": 10000,
59
"run_date": context.scheduled_execution_time.strftime("%Y-%m-%d")
60
}
61
}
62
},
63
"resources": {
64
"warehouse": {
65
"config": {
66
"connection_timeout": 300 # Longer timeout for full refresh
67
}
68
}
69
}
70
}
71
72
return RunRequest(
73
run_key=f"weekly_refresh_{context.scheduled_execution_time.strftime('%Y_%m_%d')}",
74
run_config=run_config,
75
tags={
76
"schedule": "weekly_full_refresh",
77
"refresh_type": "full",
78
"execution_date": context.scheduled_execution_time.isoformat()
79
}
80
)
81
82
# Conditional schedule execution
83
@schedule(
84
job=daily_etl_job,
85
cron_schedule="0 */4 * * *" # Every 4 hours
86
)
87
def conditional_data_refresh(context):
88
"""Schedule that runs conditionally based on data availability."""
89
90
# Check if new data is available
91
last_update = get_last_data_update_time()
92
last_run = get_last_successful_run_time("daily_etl_job")
93
94
if last_update > last_run:
95
context.log.info(f"New data available since {last_update}, scheduling run")
96
97
return RunRequest(
98
run_key=f"data_refresh_{int(last_update.timestamp())}",
99
tags={
100
"trigger": "new_data_available",
101
"last_update": last_update.isoformat()
102
}
103
)
104
else:
105
return SkipReason(f"No new data since last run at {last_run}")
106
107
# Multiple run requests from single schedule
108
@schedule(
109
cron_schedule="0 1 * * *" # 1 AM daily
110
)
111
def multi_environment_deploy():
112
"""Schedule that deploys to multiple environments."""
113
114
environments = ["staging", "prod", "dr"]
115
run_requests = []
116
117
for env in environments:
118
run_requests.append(
119
RunRequest(
120
run_key=f"deploy_{env}_{datetime.now().strftime('%Y%m%d')}",
121
job_name="deployment_job",
122
run_config={
123
"resources": {
124
"deployment_target": {
125
"config": {"environment": env}
126
}
127
}
128
},
129
tags={
130
"environment": env,
131
"deployment_type": "scheduled"
132
}
133
)
134
)
135
136
return run_requests
137
```
138
139
**Parameters:**
140
- `job: Union[JobDefinition, UnresolvedAssetJobDefinition]` - Job to schedule
141
- `cron_schedule: str` - Cron expression for schedule timing
142
- `name: Optional[str]` - Schedule name (defaults to function name)
143
- `execution_timezone: Optional[str]` - Timezone for schedule execution
144
- `description: Optional[str]` - Schedule description
145
- `default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED` - Default schedule status
146
- `tags: Optional[Dict[str, str]]` - Schedule tags
147
148
### Partitioned Schedules
149
150
#### `build_schedule_from_partitioned_job` { .api }
151
152
**Module:** `dagster._core.definitions.partitions.partitioned_schedule`
153
**Type:** Function
154
155
Create a schedule from a partitioned job that runs for each partition.
156
157
```python
158
from dagster import build_schedule_from_partitioned_job, DailyPartitionsDefinition
159
160
# Partitioned job
161
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
162
163
@asset(partitions_def=daily_partitions)
164
def daily_sales_data(context) -> pd.DataFrame:
165
"""Daily sales data asset."""
166
partition_date = context.partition_key
167
168
# Load data for specific date
169
query = f"""
170
SELECT * FROM sales
171
WHERE date = '{partition_date}'
172
"""
173
174
return pd.read_sql(query, connection)
175
176
@job(partitions_def=daily_partitions)
177
def daily_sales_job():
178
daily_sales_data()
179
180
# Create schedule that runs daily for the previous day's partition
181
daily_sales_schedule = build_schedule_from_partitioned_job(
182
job=daily_sales_job,
183
name="daily_sales_schedule",
184
description="Process daily sales data",
185
hour_of_day=2, # Run at 2 AM
186
minute_of_hour=0,
187
timezone="America/New_York"
188
)
189
190
# Custom partition schedule with multiple partitions
191
@schedule(
192
cron_schedule="0 3 * * *" # 3 AM daily
193
)
194
def backfill_schedule(context):
195
"""Schedule that processes multiple partitions for backfill."""
196
197
# Determine partitions to process
198
execution_date = context.scheduled_execution_time.date()
199
200
# Process last 7 days of partitions
201
partitions_to_run = []
202
for i in range(7):
203
partition_date = execution_date - timedelta(days=i+1)
204
partitions_to_run.append(partition_date.strftime("%Y-%m-%d"))
205
206
run_requests = []
207
for partition_key in partitions_to_run:
208
run_requests.append(
209
RunRequest(
210
run_key=f"backfill_{partition_key}",
211
partition_key=partition_key,
212
tags={
213
"partition": partition_key,
214
"run_type": "backfill"
215
}
216
)
217
)
218
219
return run_requests
220
```
221
222
### Schedule Context
223
224
#### `ScheduleEvaluationContext` { .api }
225
226
**Module:** `dagster._core.definitions.schedule_definition`
227
**Type:** Class
228
229
Context provided to schedule functions with execution information.
230
231
```python
232
from dagster import ScheduleEvaluationContext, build_schedule_context
233
234
@schedule(
235
job=daily_etl_job,
236
cron_schedule="0 8 * * 1-5" # 8 AM weekdays
237
)
238
def business_days_schedule(context: ScheduleEvaluationContext):
239
"""Schedule with comprehensive context usage."""
240
241
# Access schedule execution time
242
scheduled_time = context.scheduled_execution_time
243
execution_date = scheduled_time.date()
244
245
# Access instance and run information
246
instance = context.instance
247
248
# Check for recent failures
249
recent_runs = instance.get_runs(
250
filters=RunsFilter(
251
job_name="daily_etl_job",
252
created_after=scheduled_time - timedelta(days=1)
253
),
254
limit=5
255
)
256
257
recent_failures = [run for run in recent_runs if run.status == DagsterRunStatus.FAILURE]
258
259
if len(recent_failures) >= 3:
260
context.log.warning(f"Found {len(recent_failures)} recent failures, skipping execution")
261
return SkipReason(f"Too many recent failures ({len(recent_failures)})")
262
263
# Dynamic configuration based on day of week
264
is_monday = scheduled_time.weekday() == 0
265
266
run_config = {
267
"ops": {
268
"extract_data": {
269
"config": {
270
"full_refresh": is_monday, # Full refresh on Mondays
271
"batch_size": 5000 if is_monday else 1000,
272
"execution_date": execution_date.isoformat()
273
}
274
}
275
}
276
}
277
278
# Generate unique run key
279
run_key = f"etl_{execution_date.strftime('%Y%m%d')}_{scheduled_time.hour}"
280
281
context.log.info(f"Scheduling ETL for {execution_date} (Monday: {is_monday})")
282
283
return RunRequest(
284
run_key=run_key,
285
run_config=run_config,
286
tags={
287
"execution_date": execution_date.isoformat(),
288
"is_monday": str(is_monday),
289
"schedule_time": scheduled_time.isoformat()
290
}
291
)
292
293
# Build schedule context for testing
294
test_context = build_schedule_context(
295
scheduled_execution_time=datetime(2023, 1, 16, 8, 0), # Monday 8 AM
296
instance=DagsterInstance.ephemeral()
297
)
298
299
# Test schedule function
300
result = business_days_schedule(test_context)
301
print(f"Schedule result: {result}")
302
```
303
304
**Key Properties:**
305
- `scheduled_execution_time: datetime` - When schedule was supposed to execute
306
- `instance: DagsterInstance` - Dagster instance
307
- `log: DagsterLogManager` - Logger for schedule evaluation
308
309
## Sensor System
310
311
Sensors enable event-driven pipeline execution based on external events, asset changes, or custom conditions.
312
313
### Sensor Decorator
314
315
#### `@sensor` { .api }
316
317
**Module:** `dagster._core.definitions.decorators.sensor_decorator`
318
**Type:** Function decorator
319
320
Define a sensor for event-driven execution.
321
322
```python
323
from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext
324
import os
325
import glob
326
from pathlib import Path
327
328
@job
329
def file_processing_job():
330
process_new_files()
331
332
# File-based sensor
333
@sensor(
334
job=file_processing_job,
335
minimum_interval_seconds=30 # Check every 30 seconds
336
)
337
def file_arrival_sensor(context: SensorEvaluationContext):
338
"""Sensor that triggers on new file arrivals."""
339
340
watch_directory = "/data/incoming"
341
processed_directory = "/data/processed"
342
343
# Find new files
344
new_files = []
345
for file_path in glob.glob(f"{watch_directory}/*.csv"):
346
file_name = os.path.basename(file_path)
347
processed_path = os.path.join(processed_directory, file_name)
348
349
# Check if file hasn't been processed
350
if not os.path.exists(processed_path):
351
file_stats = os.stat(file_path)
352
new_files.append({
353
"path": file_path,
354
"name": file_name,
355
"size": file_stats.st_size,
356
"modified": file_stats.st_mtime
357
})
358
359
if not new_files:
360
return SkipReason("No new files found")
361
362
context.log.info(f"Found {len(new_files)} new files to process")
363
364
run_requests = []
365
for file_info in new_files:
366
run_requests.append(
367
RunRequest(
368
run_key=f"file_{file_info['name']}_{int(file_info['modified'])}",
369
run_config={
370
"ops": {
371
"process_new_files": {
372
"config": {
373
"input_file": file_info["path"],
374
"output_directory": processed_directory
375
}
376
}
377
}
378
},
379
tags={
380
"file_name": file_info["name"],
381
"file_size": str(file_info["size"]),
382
"trigger": "file_arrival"
383
}
384
)
385
)
386
387
return run_requests
388
389
# API-based sensor
390
@sensor(
391
job=api_sync_job,
392
minimum_interval_seconds=300 # Check every 5 minutes
393
)
394
def api_data_sensor(context):
395
"""Sensor that monitors external API for new data."""
396
397
# Check API for new data
398
try:
399
api_response = requests.get(
400
"https://api.example.com/v1/status",
401
headers={"Authorization": f"Bearer {API_TOKEN}"},
402
timeout=30
403
)
404
api_response.raise_for_status()
405
406
status_data = api_response.json()
407
last_update = datetime.fromisoformat(status_data["last_update"])
408
409
# Get cursor from sensor context (persistent state)
410
last_processed = context.cursor
411
if last_processed:
412
last_processed_time = datetime.fromisoformat(last_processed)
413
else:
414
# First run, start from 1 hour ago
415
last_processed_time = datetime.now() - timedelta(hours=1)
416
417
if last_update > last_processed_time:
418
context.log.info(f"New data available since {last_update}")
419
420
# Update cursor to track progress
421
context.update_cursor(last_update.isoformat())
422
423
return RunRequest(
424
run_key=f"api_sync_{int(last_update.timestamp())}",
425
run_config={
426
"resources": {
427
"api_client": {
428
"config": {
429
"sync_from": last_processed_time.isoformat(),
430
"sync_to": last_update.isoformat()
431
}
432
}
433
}
434
},
435
tags={
436
"last_update": last_update.isoformat(),
437
"data_available": "true"
438
}
439
)
440
else:
441
return SkipReason(f"No new data since {last_processed_time}")
442
443
except Exception as e:
444
context.log.error(f"Failed to check API status: {str(e)}")
445
return SkipReason(f"API check failed: {str(e)}")
446
447
# Database change sensor
448
@sensor(
449
job=incremental_sync_job,
450
minimum_interval_seconds=60
451
)
452
def database_change_sensor(context):
453
"""Sensor that monitors database changes."""
454
455
# Query change tracking table
456
query = """
457
SELECT table_name, last_modified, change_count
458
FROM change_tracking
459
WHERE last_modified > %s
460
ORDER BY last_modified DESC
461
"""
462
463
# Use cursor to track last processed time
464
last_check = context.cursor or (datetime.now() - timedelta(minutes=5)).isoformat()
465
466
changes = execute_query(query, [last_check])
467
468
if not changes:
469
return SkipReason(f"No database changes since {last_check}")
470
471
# Update cursor to latest change time
472
latest_change = max(change["last_modified"] for change in changes)
473
context.update_cursor(latest_change.isoformat())
474
475
# Group changes by table for efficient processing
476
tables_changed = {}
477
for change in changes:
478
table = change["table_name"]
479
if table not in tables_changed:
480
tables_changed[table] = []
481
tables_changed[table].append(change)
482
483
context.log.info(f"Found changes in {len(tables_changed)} tables")
484
485
run_requests = []
486
for table_name, table_changes in tables_changed.items():
487
total_changes = sum(change["change_count"] for change in table_changes)
488
489
run_requests.append(
490
RunRequest(
491
run_key=f"sync_{table_name}_{int(latest_change.timestamp())}",
492
run_config={
493
"ops": {
494
"incremental_sync": {
495
"config": {
496
"table_name": table_name,
497
"sync_from": last_check,
498
"change_count": total_changes
499
}
500
}
501
}
502
},
503
tags={
504
"table": table_name,
505
"change_count": str(total_changes),
506
"trigger": "database_change"
507
}
508
)
509
)
510
511
return run_requests
512
```
513
514
**Parameters:**
515
- `job: Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]` - Job to execute
516
- `name: Optional[str]` - Sensor name (defaults to function name)
517
- `minimum_interval_seconds: int = 30` - Minimum interval between evaluations
518
- `description: Optional[str]` - Sensor description
519
- `default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED` - Default sensor status
520
521
### Asset Sensors
522
523
#### `@asset_sensor` { .api }
524
525
**Module:** `dagster._core.definitions.decorators.sensor_decorator`
526
**Type:** Function decorator
527
528
Define a sensor that monitors specific assets for changes.
529
530
```python
531
from dagster import asset_sensor, AssetKey, EventLogEntry, DagsterEventType
532
533
@asset
534
def raw_sales_data() -> pd.DataFrame:
535
"""Raw sales data asset."""
536
return pd.read_csv("/data/sales.csv")
537
538
@asset
539
def processed_sales_data() -> pd.DataFrame:
540
"""Processed sales data asset."""
541
return process_sales_data()
542
543
@job
544
def sales_analytics_job():
545
generate_sales_analytics()
546
547
# Asset sensor monitoring single asset
548
@asset_sensor(
549
asset_key=AssetKey("raw_sales_data"),
550
job=sales_analytics_job
551
)
552
def sales_data_sensor(context, asset_event: EventLogEntry):
553
"""Sensor that triggers when raw sales data is materialized."""
554
555
# Access asset event information
556
asset_key = asset_event.dagster_event.asset_key
557
materialization = asset_event.dagster_event.step_materialization_data.materialization
558
559
# Extract metadata from materialization
560
metadata = materialization.metadata_entries
561
record_count = None
562
for entry in metadata:
563
if entry.label == "records":
564
record_count = entry.entry_data.int_value
565
break
566
567
# Conditional execution based on data size
568
if record_count and record_count > 1000:
569
context.log.info(f"Large dataset materialized ({record_count} records), triggering analytics")
570
571
return RunRequest(
572
run_key=f"analytics_{asset_event.storage_id}_{asset_event.timestamp}",
573
run_config={
574
"ops": {
575
"generate_sales_analytics": {
576
"config": {
577
"input_asset_key": str(asset_key),
578
"record_count": record_count,
579
"processing_mode": "large_dataset"
580
}
581
}
582
}
583
},
584
tags={
585
"triggered_by": str(asset_key),
586
"record_count": str(record_count),
587
"event_id": str(asset_event.storage_id)
588
}
589
)
590
else:
591
return SkipReason(f"Dataset too small ({record_count} records), skipping analytics")
592
593
# Multi-asset sensor
594
@asset_sensor(
595
asset_key=AssetKey("processed_sales_data"),
596
job=downstream_analytics_job
597
)
598
def processed_sales_sensor(context, asset_event):
599
"""Sensor for processed sales data with dependency checking."""
600
601
# Check if all required dependencies are fresh
602
required_assets = [
603
AssetKey("customer_data"),
604
AssetKey("product_catalog"),
605
AssetKey("processed_sales_data")
606
]
607
608
# Query materialization times for dependencies
609
instance = context.instance
610
fresh_assets = []
611
612
for asset_key in required_assets:
613
latest_materialization = instance.get_latest_materialization_event(asset_key)
614
if latest_materialization:
615
# Consider fresh if materialized within last 6 hours
616
materialization_time = datetime.fromtimestamp(latest_materialization.timestamp)
617
if (datetime.now() - materialization_time) < timedelta(hours=6):
618
fresh_assets.append(asset_key)
619
620
if len(fresh_assets) == len(required_assets):
621
context.log.info("All dependencies are fresh, triggering downstream analytics")
622
623
return RunRequest(
624
run_key=f"downstream_{int(asset_event.timestamp)}",
625
tags={
626
"trigger_asset": str(asset_event.dagster_event.asset_key),
627
"fresh_dependencies": ",".join([str(k) for k in fresh_assets])
628
}
629
)
630
else:
631
stale_assets = set(required_assets) - set(fresh_assets)
632
return SkipReason(f"Stale dependencies: {[str(k) for k in stale_assets]}")
633
```
634
635
#### `@multi_asset_sensor` { .api }
636
637
**Module:** `dagster._core.definitions.decorators.sensor_decorator`
638
**Type:** Function decorator
639
640
Define a sensor that monitors multiple assets simultaneously.
641
642
```python
643
from dagster import multi_asset_sensor, MultiAssetSensorEvaluationContext
644
645
@multi_asset_sensor(
646
monitored_assets=[
647
AssetKey(["raw", "users"]),
648
AssetKey(["raw", "orders"]),
649
AssetKey(["raw", "products"])
650
],
651
job=complete_etl_job
652
)
653
def multi_table_sensor(context: MultiAssetSensorEvaluationContext):
654
"""Sensor that waits for multiple assets before triggering."""
655
656
# Get materialization events for all monitored assets
657
asset_events = context.latest_materialization_records_by_key()
658
659
# Check if all assets have been materialized recently (within 1 hour)
660
cutoff_time = datetime.now() - timedelta(hours=1)
661
fresh_assets = []
662
663
for asset_key, event_record in asset_events.items():
664
if event_record:
665
materialization_time = datetime.fromtimestamp(event_record.timestamp)
666
if materialization_time > cutoff_time:
667
fresh_assets.append(asset_key)
668
context.log.info(f"Fresh materialization: {asset_key} at {materialization_time}")
669
670
monitored_keys = {
671
AssetKey(["raw", "users"]),
672
AssetKey(["raw", "orders"]),
673
AssetKey(["raw", "products"])
674
}
675
676
if set(fresh_assets) == monitored_keys:
677
context.log.info("All raw tables are fresh, triggering complete ETL")
678
679
# Create run key from latest materialization timestamps
680
latest_timestamp = max(
681
event_record.timestamp
682
for event_record in asset_events.values()
683
if event_record
684
)
685
686
return RunRequest(
687
run_key=f"complete_etl_{int(latest_timestamp)}",
688
run_config={
689
"ops": {
690
"etl_processor": {
691
"config": {
692
"mode": "complete_refresh",
693
"source_tables": ["users", "orders", "products"]
694
}
695
}
696
}
697
},
698
tags={
699
"trigger": "all_raw_tables_fresh",
700
"fresh_assets": ",".join([str(k) for k in fresh_assets]),
701
"latest_materialization": str(latest_timestamp)
702
}
703
)
704
else:
705
missing_assets = monitored_keys - set(fresh_assets)
706
return SkipReason(f"Waiting for fresh materializations: {[str(k) for k in missing_assets]}")
707
708
# Asset sensor with partition awareness
709
@asset_sensor(
710
asset_key=AssetKey("partitioned_sales"),
711
job=partition_analytics_job
712
)
713
def partitioned_sales_sensor(context, asset_event):
714
"""Sensor that handles partitioned assets."""
715
716
# Extract partition information from event
717
materialization = asset_event.dagster_event.step_materialization_data.materialization
718
partition_key = materialization.partition if materialization else None
719
720
if partition_key:
721
# Check if this partition is part of a complete set
722
# (e.g., all partitions for current month)
723
current_month = datetime.now().strftime("%Y-%m")
724
725
if partition_key.startswith(current_month):
726
context.log.info(f"Current month partition materialized: {partition_key}")
727
728
return RunRequest(
729
run_key=f"analytics_{partition_key}",
730
partition_key=partition_key,
731
tags={
732
"partition": partition_key,
733
"month": current_month,
734
"trigger": "current_month_partition"
735
}
736
)
737
else:
738
return SkipReason(f"Historical partition {partition_key}, no immediate action needed")
739
else:
740
return SkipReason("No partition information in materialization")
741
```
742
743
### Run Status Sensors
744
745
#### `@run_failure_sensor` { .api }
746
747
**Module:** `dagster._core.definitions.run_status_sensor_definition`
748
**Type:** Function decorator
749
750
Define a sensor that triggers on job run failures.
751
752
```python
753
from dagster import run_failure_sensor, RunFailureSensorContext, DefaultSensorStatus
754
import smtplib
755
from email.mime.text import MIMEText
756
757
@run_failure_sensor(
758
monitored_jobs=[daily_etl_job, weekly_analytics_job],
759
default_status=DefaultSensorStatus.RUNNING
760
)
761
def job_failure_alert(context: RunFailureSensorContext):
762
"""Send alerts when critical jobs fail."""
763
764
# Access failure information
765
failed_run = context.dagster_run
766
failure_event = context.failure_event
767
768
job_name = failed_run.job_name
769
run_id = failed_run.run_id
770
failure_time = datetime.fromtimestamp(failure_event.timestamp)
771
772
# Extract error information
773
error_info = failure_event.dagster_event.engine_event_data
774
error_message = error_info.error.message if error_info and error_info.error else "Unknown error"
775
776
context.log.error(f"Job {job_name} failed with error: {error_message}")
777
778
# Send email notification
779
alert_message = f"""
780
Job Failure Alert
781
782
Job: {job_name}
783
Run ID: {run_id}
784
Failure Time: {failure_time.strftime('%Y-%m-%d %H:%M:%S')}
785
786
Error: {error_message}
787
788
Dagster UI: {context.instance.get_run_url(run_id)}
789
"""
790
791
try:
792
send_email(
793
subject=f"[DAGSTER ALERT] Job {job_name} Failed",
794
body=alert_message,
795
recipients=["data-team@company.com", "ops-team@company.com"]
796
)
797
798
context.log.info(f"Failure alert sent for job {job_name}")
799
800
except Exception as e:
801
context.log.error(f"Failed to send alert email: {str(e)}")
802
803
# Create Slack notification
804
try:
805
slack_message = {
806
"text": f"🚨 Dagster Job Failure: {job_name}",
807
"blocks": [
808
{
809
"type": "section",
810
"text": {
811
"type": "mrkdwn",
812
"text": f"*Job:* `{job_name}`\n*Run ID:* `{run_id}`\n*Time:* {failure_time.strftime('%Y-%m-%d %H:%M:%S')}"
813
}
814
},
815
{
816
"type": "section",
817
"text": {
818
"type": "mrkdwn",
819
"text": f"*Error:*\n```{error_message[:500]}```"
820
}
821
},
822
{
823
"type": "actions",
824
"elements": [
825
{
826
"type": "button",
827
"text": {"type": "plain_text", "text": "View in Dagster"},
828
"url": context.instance.get_run_url(run_id)
829
}
830
]
831
}
832
]
833
}
834
835
send_slack_message(slack_message)
836
context.log.info(f"Slack alert sent for job {job_name}")
837
838
except Exception as e:
839
context.log.error(f"Failed to send Slack alert: {str(e)}")
840
841
# Run status sensor for success notifications
842
@run_status_sensor(
843
monitored_jobs=[critical_daily_job],
844
run_status=DagsterRunStatus.SUCCESS,
845
default_status=DefaultSensorStatus.RUNNING
846
)
847
def job_success_notification(context):
848
"""Notify on successful completion of critical jobs."""
849
850
successful_run = context.dagster_run
851
job_name = successful_run.job_name
852
853
# Only notify for long-running jobs
854
start_time = successful_run.start_time
855
end_time = successful_run.end_time
856
857
if start_time and end_time:
858
duration = end_time - start_time
859
860
if duration > timedelta(minutes=30): # Only for jobs > 30 minutes
861
context.log.info(f"Long-running job {job_name} completed successfully in {duration}")
862
863
# Send success notification
864
send_slack_message({
865
"text": f"✅ Long-running job `{job_name}` completed successfully",
866
"blocks": [
867
{
868
"type": "section",
869
"text": {
870
"type": "mrkdwn",
871
"text": f"*Job:* `{job_name}`\n*Duration:* {str(duration)}\n*Status:* Success ✅"
872
}
873
}
874
]
875
})
876
```
877
878
#### `@run_status_sensor` { .api }
879
880
**Module:** `dagster._core.definitions.run_status_sensor_definition`
881
**Type:** Function decorator
882
883
Define a sensor for any run status change.
884
885
```python
886
from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus
887
888
@run_status_sensor(
889
run_status=DagsterRunStatus.STARTED,
890
monitored_jobs=[expensive_ml_job]
891
)
892
def job_start_monitor(context: RunStatusSensorContext):
893
"""Monitor job starts for resource scaling."""
894
895
started_run = context.dagster_run
896
job_name = started_run.job_name
897
898
# Scale up resources for expensive jobs
899
if job_name == "expensive_ml_job":
900
context.log.info(f"Expensive job {job_name} started, scaling up resources")
901
902
# Trigger infrastructure scaling
903
try:
904
scale_compute_resources(
905
job_name=job_name,
906
run_id=started_run.run_id,
907
scale_action="up"
908
)
909
910
context.log.info("Successfully scaled up resources")
911
912
except Exception as e:
913
context.log.error(f"Failed to scale resources: {str(e)}")
914
915
@run_status_sensor(
916
run_status=DagsterRunStatus.SUCCESS,
917
monitored_jobs=[expensive_ml_job]
918
)
919
def job_completion_cleanup(context):
920
"""Clean up resources after job completion."""
921
922
completed_run = context.dagster_run
923
job_name = completed_run.job_name
924
925
context.log.info(f"Job {job_name} completed, scaling down resources")
926
927
# Scale down resources
928
try:
929
scale_compute_resources(
930
job_name=job_name,
931
run_id=completed_run.run_id,
932
scale_action="down"
933
)
934
935
context.log.info("Successfully scaled down resources")
936
937
except Exception as e:
938
context.log.error(f"Failed to scale down resources: {str(e)}")
939
```
940
941
## Automation Policies
942
943
### Auto-Materialization Policies
944
945
#### `AutoMaterializePolicy` { .api }
946
947
**Module:** `dagster._core.definitions.auto_materialize_policy`
948
**Type:** Class
949
950
Policy for automatic asset materialization based on upstream changes.
951
952
```python
953
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset
954
955
# Eager materialization - materialize immediately when upstream changes
956
@asset(auto_materialize_policy=AutoMaterializePolicy.eager())
957
def eager_asset(upstream_data):
958
"""Asset that materializes immediately when upstream changes."""
959
return process_data(upstream_data)
960
961
# Lazy materialization - materialize only when downstream requests it
962
@asset(auto_materialize_policy=AutoMaterializePolicy.lazy())
963
def lazy_asset(source_data):
964
"""Asset that materializes only when needed."""
965
return expensive_computation(source_data)
966
967
# Custom auto-materialization rules
968
@asset(
969
auto_materialize_policy=AutoMaterializePolicy.from_rules([
970
AutoMaterializeRule.materialize_on_parent_updated(),
971
AutoMaterializeRule.materialize_on_required_for_freshness(),
972
AutoMaterializeRule.skip_on_parent_missing(),
973
AutoMaterializeRule.skip_on_parent_outdated()
974
])
975
)
976
def custom_policy_asset(upstream_asset):
977
"""Asset with custom auto-materialization policy."""
978
return transform_data(upstream_asset)
979
980
# Time-based auto-materialization
981
@asset(
982
auto_materialize_policy=AutoMaterializePolicy.from_rules([
983
# Materialize daily at 2 AM
984
AutoMaterializeRule.materialize_on_cron("0 2 * * *"),
985
# But skip if upstream data is too old
986
AutoMaterializeRule.skip_on_parent_outdated(minutes=360) # 6 hours
987
])
988
)
989
def daily_summary(daily_transactions):
990
"""Asset that auto-materializes daily with freshness checks."""
991
return summarize_daily_transactions(daily_transactions)
992
993
# Conditional auto-materialization
994
@asset(
995
auto_materialize_policy=AutoMaterializePolicy.from_rules([
996
AutoMaterializeRule.materialize_on_parent_updated(),
997
# Skip during business hours to avoid impacting production
998
AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron("0 18 * * *") # 6 PM
999
])
1000
)
1001
def batch_processed_asset(real_time_data):
1002
"""Asset that processes in batches outside business hours."""
1003
return batch_process(real_time_data)
1004
```
1005
1006
### Declarative Automation
1007
1008
#### `AutomationCondition` { .api }
1009
1010
**Module:** `dagster._core.definitions.declarative_automation.automation_condition`
1011
**Type:** Class
1012
1013
Declarative conditions for sophisticated automation logic.
1014
1015
```python
1016
from dagster import AutomationCondition, asset
1017
1018
# Complex automation condition
1019
@asset(
1020
automation_condition=(
1021
# Materialize when parent is updated
1022
AutomationCondition.parent_newer()
1023
# But only during off-hours
1024
& AutomationCondition.cron_tick_passed("0 0 * * *") # Midnight
1025
# And not if recently materialized
1026
& ~AutomationCondition.materialized_since_cron("0 18 * * *") # 6 PM
1027
# And only if all dependencies are fresh
1028
& AutomationCondition.all_deps_blocking_checks_passed()
1029
)
1030
)
1031
def sophisticated_asset(upstream_data):
1032
"""Asset with sophisticated automation logic."""
1033
return complex_processing(upstream_data)
1034
1035
# Data quality gated automation
1036
@asset(
1037
automation_condition=(
1038
AutomationCondition.parent_newer()
1039
# Only materialize if upstream passes quality checks
1040
& AutomationCondition.all_deps_blocking_checks_passed()
1041
# And if we haven't failed recently
1042
& ~AutomationCondition.failed_since_cron("0 0 * * *")
1043
)
1044
)
1045
def quality_gated_asset(validated_data):
1046
"""Asset that only materializes with high-quality upstream data."""
1047
return process_validated_data(validated_data)
1048
1049
# Backfill automation
1050
@asset(
1051
automation_condition=(
1052
# Normal condition for regular updates
1053
AutomationCondition.parent_newer()
1054
# OR backfill condition for missing partitions
1055
| (
1056
AutomationCondition.missing()
1057
& AutomationCondition.parent_materialized()
1058
# Only backfill during maintenance windows
1059
& AutomationCondition.cron_tick_passed("0 2 * * 6") # Saturday 2 AM
1060
)
1061
)
1062
)
1063
def backfill_aware_asset(source_data):
1064
"""Asset with automatic backfill logic."""
1065
return process_data_with_backfill(source_data)
1066
```
1067
1068
## Sensor and Schedule Context Builders
1069
1070
### Context Building for Testing
1071
1072
```python
1073
from dagster import build_sensor_context, build_schedule_context, build_run_status_sensor_context
1074
1075
# Test sensor function
1076
def test_file_arrival_sensor():
1077
"""Test sensor with mock context."""
1078
1079
# Create test files
1080
test_files = ["/tmp/test1.csv", "/tmp/test2.csv"]
1081
for file_path in test_files:
1082
Path(file_path).touch()
1083
1084
# Build sensor context
1085
context = build_sensor_context(
1086
cursor="2023-01-01T00:00:00",
1087
instance=DagsterInstance.ephemeral()
1088
)
1089
1090
# Test sensor
1091
result = file_arrival_sensor(context)
1092
1093
# Verify results
1094
assert isinstance(result, list)
1095
assert len(result) == 2
1096
assert all(isinstance(req, RunRequest) for req in result)
1097
1098
# Test schedule function
1099
def test_business_days_schedule():
1100
"""Test schedule with different times."""
1101
1102
# Test Monday execution
1103
monday_context = build_schedule_context(
1104
scheduled_execution_time=datetime(2023, 1, 16, 8, 0), # Monday
1105
instance=DagsterInstance.ephemeral()
1106
)
1107
1108
monday_result = business_days_schedule(monday_context)
1109
assert isinstance(monday_result, RunRequest)
1110
assert monday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == True
1111
1112
# Test Tuesday execution
1113
tuesday_context = build_schedule_context(
1114
scheduled_execution_time=datetime(2023, 1, 17, 8, 0) # Tuesday
1115
)
1116
1117
tuesday_result = business_days_schedule(tuesday_context)
1118
assert tuesday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == False
1119
1120
# Test failure sensor
1121
def test_job_failure_alert():
1122
"""Test failure sensor with mock failure."""
1123
1124
# Create mock failure context
1125
context = build_run_status_sensor_context(
1126
dagster_run=mock_failed_run,
1127
dagster_event=mock_failure_event,
1128
instance=DagsterInstance.ephemeral()
1129
)
1130
1131
# Test failure sensor (should not raise)
1132
job_failure_alert(context)
1133
```
1134
1135
This comprehensive sensor and schedule system enables sophisticated automation patterns with event-driven execution, time-based scheduling, failure handling, and declarative automation policies. The system provides rich context information and flexible configuration options for building robust data pipeline automation.
1136
1137
For partitioned execution with sensors and schedules, see [Partitions System](./partitions.md). For error handling and failure management, see [Error Handling](./error-handling.md).