or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

sensors-schedules.mddocs/

0

# Sensors and Schedules

1

2

This document covers Dagster's automation system, including sensors, schedules, and declarative automation policies. These systems enable event-driven and time-based execution of pipelines with comprehensive monitoring and failure handling.

3

4

## Schedule System

5

6

Schedules provide time-based execution of jobs and asset materializations using cron expressions and partition-based scheduling.

7

8

### Schedule Decorator

9

10

#### `@schedule` { .api }

11

12

**Module:** `dagster._core.definitions.decorators.schedule_decorator`

13

**Type:** Function decorator

14

15

Define a time-based schedule for job execution.

16

17

```python

18

from dagster import schedule, job, op, asset, Config, RunRequest, SkipReason

19

from dagster import DailyPartitionsDefinition, WeeklyPartitionsDefinition

20

import pandas as pd

21

22

@op

23

def extract_data() -> pd.DataFrame:

24

return pd.read_sql("SELECT * FROM daily_transactions", connection)

25

26

@op

27

def transform_data(df: pd.DataFrame) -> pd.DataFrame:

28

return df.dropna().reset_index(drop=True)

29

30

@job

31

def daily_etl_job():

32

"""Daily ETL job."""

33

transform_data(extract_data())

34

35

# Basic daily schedule

36

@schedule(

37

job=daily_etl_job,

38

cron_schedule="0 2 * * *" # 2 AM daily

39

)

40

def daily_etl_schedule():

41

"""Schedule daily ETL at 2 AM."""

42

return {}

43

44

# Schedule with dynamic configuration

45

@schedule(

46

job=daily_etl_job,

47

cron_schedule="0 6 * * 1" # 6 AM every Monday

48

)

49

def weekly_full_refresh(context):

50

"""Weekly full refresh with dynamic config."""

51

52

# Generate run configuration based on schedule context

53

run_config = {

54

"ops": {

55

"extract_data": {

56

"config": {

57

"full_refresh": True,

58

"batch_size": 10000,

59

"run_date": context.scheduled_execution_time.strftime("%Y-%m-%d")

60

}

61

}

62

},

63

"resources": {

64

"warehouse": {

65

"config": {

66

"connection_timeout": 300 # Longer timeout for full refresh

67

}

68

}

69

}

70

}

71

72

return RunRequest(

73

run_key=f"weekly_refresh_{context.scheduled_execution_time.strftime('%Y_%m_%d')}",

74

run_config=run_config,

75

tags={

76

"schedule": "weekly_full_refresh",

77

"refresh_type": "full",

78

"execution_date": context.scheduled_execution_time.isoformat()

79

}

80

)

81

82

# Conditional schedule execution

83

@schedule(

84

job=daily_etl_job,

85

cron_schedule="0 */4 * * *" # Every 4 hours

86

)

87

def conditional_data_refresh(context):

88

"""Schedule that runs conditionally based on data availability."""

89

90

# Check if new data is available

91

last_update = get_last_data_update_time()

92

last_run = get_last_successful_run_time("daily_etl_job")

93

94

if last_update > last_run:

95

context.log.info(f"New data available since {last_update}, scheduling run")

96

97

return RunRequest(

98

run_key=f"data_refresh_{int(last_update.timestamp())}",

99

tags={

100

"trigger": "new_data_available",

101

"last_update": last_update.isoformat()

102

}

103

)

104

else:

105

return SkipReason(f"No new data since last run at {last_run}")

106

107

# Multiple run requests from single schedule

108

@schedule(

109

cron_schedule="0 1 * * *" # 1 AM daily

110

)

111

def multi_environment_deploy():

112

"""Schedule that deploys to multiple environments."""

113

114

environments = ["staging", "prod", "dr"]

115

run_requests = []

116

117

for env in environments:

118

run_requests.append(

119

RunRequest(

120

run_key=f"deploy_{env}_{datetime.now().strftime('%Y%m%d')}",

121

job_name="deployment_job",

122

run_config={

123

"resources": {

124

"deployment_target": {

125

"config": {"environment": env}

126

}

127

}

128

},

129

tags={

130

"environment": env,

131

"deployment_type": "scheduled"

132

}

133

)

134

)

135

136

return run_requests

137

```

138

139

**Parameters:**

140

- `job: Union[JobDefinition, UnresolvedAssetJobDefinition]` - Job to schedule

141

- `cron_schedule: str` - Cron expression for schedule timing

142

- `name: Optional[str]` - Schedule name (defaults to function name)

143

- `execution_timezone: Optional[str]` - Timezone for schedule execution

144

- `description: Optional[str]` - Schedule description

145

- `default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED` - Default schedule status

146

- `tags: Optional[Dict[str, str]]` - Schedule tags

147

148

### Partitioned Schedules

149

150

#### `build_schedule_from_partitioned_job` { .api }

151

152

**Module:** `dagster._core.definitions.partitions.partitioned_schedule`

153

**Type:** Function

154

155

Create a schedule from a partitioned job that runs for each partition.

156

157

```python

158

from dagster import build_schedule_from_partitioned_job, DailyPartitionsDefinition

159

160

# Partitioned job

161

daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")

162

163

@asset(partitions_def=daily_partitions)

164

def daily_sales_data(context) -> pd.DataFrame:

165

"""Daily sales data asset."""

166

partition_date = context.partition_key

167

168

# Load data for specific date

169

query = f"""

170

SELECT * FROM sales

171

WHERE date = '{partition_date}'

172

"""

173

174

return pd.read_sql(query, connection)

175

176

@job(partitions_def=daily_partitions)

177

def daily_sales_job():

178

daily_sales_data()

179

180

# Create schedule that runs daily for the previous day's partition

181

daily_sales_schedule = build_schedule_from_partitioned_job(

182

job=daily_sales_job,

183

name="daily_sales_schedule",

184

description="Process daily sales data",

185

hour_of_day=2, # Run at 2 AM

186

minute_of_hour=0,

187

timezone="America/New_York"

188

)

189

190

# Custom partition schedule with multiple partitions

191

@schedule(

192

cron_schedule="0 3 * * *" # 3 AM daily

193

)

194

def backfill_schedule(context):

195

"""Schedule that processes multiple partitions for backfill."""

196

197

# Determine partitions to process

198

execution_date = context.scheduled_execution_time.date()

199

200

# Process last 7 days of partitions

201

partitions_to_run = []

202

for i in range(7):

203

partition_date = execution_date - timedelta(days=i+1)

204

partitions_to_run.append(partition_date.strftime("%Y-%m-%d"))

205

206

run_requests = []

207

for partition_key in partitions_to_run:

208

run_requests.append(

209

RunRequest(

210

run_key=f"backfill_{partition_key}",

211

partition_key=partition_key,

212

tags={

213

"partition": partition_key,

214

"run_type": "backfill"

215

}

216

)

217

)

218

219

return run_requests

220

```

221

222

### Schedule Context

223

224

#### `ScheduleEvaluationContext` { .api }

225

226

**Module:** `dagster._core.definitions.schedule_definition`

227

**Type:** Class

228

229

Context provided to schedule functions with execution information.

230

231

```python

232

from dagster import ScheduleEvaluationContext, build_schedule_context

233

234

@schedule(

235

job=daily_etl_job,

236

cron_schedule="0 8 * * 1-5" # 8 AM weekdays

237

)

238

def business_days_schedule(context: ScheduleEvaluationContext):

239

"""Schedule with comprehensive context usage."""

240

241

# Access schedule execution time

242

scheduled_time = context.scheduled_execution_time

243

execution_date = scheduled_time.date()

244

245

# Access instance and run information

246

instance = context.instance

247

248

# Check for recent failures

249

recent_runs = instance.get_runs(

250

filters=RunsFilter(

251

job_name="daily_etl_job",

252

created_after=scheduled_time - timedelta(days=1)

253

),

254

limit=5

255

)

256

257

recent_failures = [run for run in recent_runs if run.status == DagsterRunStatus.FAILURE]

258

259

if len(recent_failures) >= 3:

260

context.log.warning(f"Found {len(recent_failures)} recent failures, skipping execution")

261

return SkipReason(f"Too many recent failures ({len(recent_failures)})")

262

263

# Dynamic configuration based on day of week

264

is_monday = scheduled_time.weekday() == 0

265

266

run_config = {

267

"ops": {

268

"extract_data": {

269

"config": {

270

"full_refresh": is_monday, # Full refresh on Mondays

271

"batch_size": 5000 if is_monday else 1000,

272

"execution_date": execution_date.isoformat()

273

}

274

}

275

}

276

}

277

278

# Generate unique run key

279

run_key = f"etl_{execution_date.strftime('%Y%m%d')}_{scheduled_time.hour}"

280

281

context.log.info(f"Scheduling ETL for {execution_date} (Monday: {is_monday})")

282

283

return RunRequest(

284

run_key=run_key,

285

run_config=run_config,

286

tags={

287

"execution_date": execution_date.isoformat(),

288

"is_monday": str(is_monday),

289

"schedule_time": scheduled_time.isoformat()

290

}

291

)

292

293

# Build schedule context for testing

294

test_context = build_schedule_context(

295

scheduled_execution_time=datetime(2023, 1, 16, 8, 0), # Monday 8 AM

296

instance=DagsterInstance.ephemeral()

297

)

298

299

# Test schedule function

300

result = business_days_schedule(test_context)

301

print(f"Schedule result: {result}")

302

```

303

304

**Key Properties:**

305

- `scheduled_execution_time: datetime` - When schedule was supposed to execute

306

- `instance: DagsterInstance` - Dagster instance

307

- `log: DagsterLogManager` - Logger for schedule evaluation

308

309

## Sensor System

310

311

Sensors enable event-driven pipeline execution based on external events, asset changes, or custom conditions.

312

313

### Sensor Decorator

314

315

#### `@sensor` { .api }

316

317

**Module:** `dagster._core.definitions.decorators.sensor_decorator`

318

**Type:** Function decorator

319

320

Define a sensor for event-driven execution.

321

322

```python

323

from dagster import sensor, RunRequest, SkipReason, SensorEvaluationContext

324

import os

325

import glob

326

from pathlib import Path

327

328

@job

329

def file_processing_job():

330

process_new_files()

331

332

# File-based sensor

333

@sensor(

334

job=file_processing_job,

335

minimum_interval_seconds=30 # Check every 30 seconds

336

)

337

def file_arrival_sensor(context: SensorEvaluationContext):

338

"""Sensor that triggers on new file arrivals."""

339

340

watch_directory = "/data/incoming"

341

processed_directory = "/data/processed"

342

343

# Find new files

344

new_files = []

345

for file_path in glob.glob(f"{watch_directory}/*.csv"):

346

file_name = os.path.basename(file_path)

347

processed_path = os.path.join(processed_directory, file_name)

348

349

# Check if file hasn't been processed

350

if not os.path.exists(processed_path):

351

file_stats = os.stat(file_path)

352

new_files.append({

353

"path": file_path,

354

"name": file_name,

355

"size": file_stats.st_size,

356

"modified": file_stats.st_mtime

357

})

358

359

if not new_files:

360

return SkipReason("No new files found")

361

362

context.log.info(f"Found {len(new_files)} new files to process")

363

364

run_requests = []

365

for file_info in new_files:

366

run_requests.append(

367

RunRequest(

368

run_key=f"file_{file_info['name']}_{int(file_info['modified'])}",

369

run_config={

370

"ops": {

371

"process_new_files": {

372

"config": {

373

"input_file": file_info["path"],

374

"output_directory": processed_directory

375

}

376

}

377

}

378

},

379

tags={

380

"file_name": file_info["name"],

381

"file_size": str(file_info["size"]),

382

"trigger": "file_arrival"

383

}

384

)

385

)

386

387

return run_requests

388

389

# API-based sensor

390

@sensor(

391

job=api_sync_job,

392

minimum_interval_seconds=300 # Check every 5 minutes

393

)

394

def api_data_sensor(context):

395

"""Sensor that monitors external API for new data."""

396

397

# Check API for new data

398

try:

399

api_response = requests.get(

400

"https://api.example.com/v1/status",

401

headers={"Authorization": f"Bearer {API_TOKEN}"},

402

timeout=30

403

)

404

api_response.raise_for_status()

405

406

status_data = api_response.json()

407

last_update = datetime.fromisoformat(status_data["last_update"])

408

409

# Get cursor from sensor context (persistent state)

410

last_processed = context.cursor

411

if last_processed:

412

last_processed_time = datetime.fromisoformat(last_processed)

413

else:

414

# First run, start from 1 hour ago

415

last_processed_time = datetime.now() - timedelta(hours=1)

416

417

if last_update > last_processed_time:

418

context.log.info(f"New data available since {last_update}")

419

420

# Update cursor to track progress

421

context.update_cursor(last_update.isoformat())

422

423

return RunRequest(

424

run_key=f"api_sync_{int(last_update.timestamp())}",

425

run_config={

426

"resources": {

427

"api_client": {

428

"config": {

429

"sync_from": last_processed_time.isoformat(),

430

"sync_to": last_update.isoformat()

431

}

432

}

433

}

434

},

435

tags={

436

"last_update": last_update.isoformat(),

437

"data_available": "true"

438

}

439

)

440

else:

441

return SkipReason(f"No new data since {last_processed_time}")

442

443

except Exception as e:

444

context.log.error(f"Failed to check API status: {str(e)}")

445

return SkipReason(f"API check failed: {str(e)}")

446

447

# Database change sensor

448

@sensor(

449

job=incremental_sync_job,

450

minimum_interval_seconds=60

451

)

452

def database_change_sensor(context):

453

"""Sensor that monitors database changes."""

454

455

# Query change tracking table

456

query = """

457

SELECT table_name, last_modified, change_count

458

FROM change_tracking

459

WHERE last_modified > %s

460

ORDER BY last_modified DESC

461

"""

462

463

# Use cursor to track last processed time

464

last_check = context.cursor or (datetime.now() - timedelta(minutes=5)).isoformat()

465

466

changes = execute_query(query, [last_check])

467

468

if not changes:

469

return SkipReason(f"No database changes since {last_check}")

470

471

# Update cursor to latest change time

472

latest_change = max(change["last_modified"] for change in changes)

473

context.update_cursor(latest_change.isoformat())

474

475

# Group changes by table for efficient processing

476

tables_changed = {}

477

for change in changes:

478

table = change["table_name"]

479

if table not in tables_changed:

480

tables_changed[table] = []

481

tables_changed[table].append(change)

482

483

context.log.info(f"Found changes in {len(tables_changed)} tables")

484

485

run_requests = []

486

for table_name, table_changes in tables_changed.items():

487

total_changes = sum(change["change_count"] for change in table_changes)

488

489

run_requests.append(

490

RunRequest(

491

run_key=f"sync_{table_name}_{int(latest_change.timestamp())}",

492

run_config={

493

"ops": {

494

"incremental_sync": {

495

"config": {

496

"table_name": table_name,

497

"sync_from": last_check,

498

"change_count": total_changes

499

}

500

}

501

}

502

},

503

tags={

504

"table": table_name,

505

"change_count": str(total_changes),

506

"trigger": "database_change"

507

}

508

)

509

)

510

511

return run_requests

512

```

513

514

**Parameters:**

515

- `job: Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]` - Job to execute

516

- `name: Optional[str]` - Sensor name (defaults to function name)

517

- `minimum_interval_seconds: int = 30` - Minimum interval between evaluations

518

- `description: Optional[str]` - Sensor description

519

- `default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED` - Default sensor status

520

521

### Asset Sensors

522

523

#### `@asset_sensor` { .api }

524

525

**Module:** `dagster._core.definitions.decorators.sensor_decorator`

526

**Type:** Function decorator

527

528

Define a sensor that monitors specific assets for changes.

529

530

```python

531

from dagster import asset_sensor, AssetKey, EventLogEntry, DagsterEventType

532

533

@asset

534

def raw_sales_data() -> pd.DataFrame:

535

"""Raw sales data asset."""

536

return pd.read_csv("/data/sales.csv")

537

538

@asset

539

def processed_sales_data() -> pd.DataFrame:

540

"""Processed sales data asset."""

541

return process_sales_data()

542

543

@job

544

def sales_analytics_job():

545

generate_sales_analytics()

546

547

# Asset sensor monitoring single asset

548

@asset_sensor(

549

asset_key=AssetKey("raw_sales_data"),

550

job=sales_analytics_job

551

)

552

def sales_data_sensor(context, asset_event: EventLogEntry):

553

"""Sensor that triggers when raw sales data is materialized."""

554

555

# Access asset event information

556

asset_key = asset_event.dagster_event.asset_key

557

materialization = asset_event.dagster_event.step_materialization_data.materialization

558

559

# Extract metadata from materialization

560

metadata = materialization.metadata_entries

561

record_count = None

562

for entry in metadata:

563

if entry.label == "records":

564

record_count = entry.entry_data.int_value

565

break

566

567

# Conditional execution based on data size

568

if record_count and record_count > 1000:

569

context.log.info(f"Large dataset materialized ({record_count} records), triggering analytics")

570

571

return RunRequest(

572

run_key=f"analytics_{asset_event.storage_id}_{asset_event.timestamp}",

573

run_config={

574

"ops": {

575

"generate_sales_analytics": {

576

"config": {

577

"input_asset_key": str(asset_key),

578

"record_count": record_count,

579

"processing_mode": "large_dataset"

580

}

581

}

582

}

583

},

584

tags={

585

"triggered_by": str(asset_key),

586

"record_count": str(record_count),

587

"event_id": str(asset_event.storage_id)

588

}

589

)

590

else:

591

return SkipReason(f"Dataset too small ({record_count} records), skipping analytics")

592

593

# Multi-asset sensor

594

@asset_sensor(

595

asset_key=AssetKey("processed_sales_data"),

596

job=downstream_analytics_job

597

)

598

def processed_sales_sensor(context, asset_event):

599

"""Sensor for processed sales data with dependency checking."""

600

601

# Check if all required dependencies are fresh

602

required_assets = [

603

AssetKey("customer_data"),

604

AssetKey("product_catalog"),

605

AssetKey("processed_sales_data")

606

]

607

608

# Query materialization times for dependencies

609

instance = context.instance

610

fresh_assets = []

611

612

for asset_key in required_assets:

613

latest_materialization = instance.get_latest_materialization_event(asset_key)

614

if latest_materialization:

615

# Consider fresh if materialized within last 6 hours

616

materialization_time = datetime.fromtimestamp(latest_materialization.timestamp)

617

if (datetime.now() - materialization_time) < timedelta(hours=6):

618

fresh_assets.append(asset_key)

619

620

if len(fresh_assets) == len(required_assets):

621

context.log.info("All dependencies are fresh, triggering downstream analytics")

622

623

return RunRequest(

624

run_key=f"downstream_{int(asset_event.timestamp)}",

625

tags={

626

"trigger_asset": str(asset_event.dagster_event.asset_key),

627

"fresh_dependencies": ",".join([str(k) for k in fresh_assets])

628

}

629

)

630

else:

631

stale_assets = set(required_assets) - set(fresh_assets)

632

return SkipReason(f"Stale dependencies: {[str(k) for k in stale_assets]}")

633

```

634

635

#### `@multi_asset_sensor` { .api }

636

637

**Module:** `dagster._core.definitions.decorators.sensor_decorator`

638

**Type:** Function decorator

639

640

Define a sensor that monitors multiple assets simultaneously.

641

642

```python

643

from dagster import multi_asset_sensor, MultiAssetSensorEvaluationContext

644

645

@multi_asset_sensor(

646

monitored_assets=[

647

AssetKey(["raw", "users"]),

648

AssetKey(["raw", "orders"]),

649

AssetKey(["raw", "products"])

650

],

651

job=complete_etl_job

652

)

653

def multi_table_sensor(context: MultiAssetSensorEvaluationContext):

654

"""Sensor that waits for multiple assets before triggering."""

655

656

# Get materialization events for all monitored assets

657

asset_events = context.latest_materialization_records_by_key()

658

659

# Check if all assets have been materialized recently (within 1 hour)

660

cutoff_time = datetime.now() - timedelta(hours=1)

661

fresh_assets = []

662

663

for asset_key, event_record in asset_events.items():

664

if event_record:

665

materialization_time = datetime.fromtimestamp(event_record.timestamp)

666

if materialization_time > cutoff_time:

667

fresh_assets.append(asset_key)

668

context.log.info(f"Fresh materialization: {asset_key} at {materialization_time}")

669

670

monitored_keys = {

671

AssetKey(["raw", "users"]),

672

AssetKey(["raw", "orders"]),

673

AssetKey(["raw", "products"])

674

}

675

676

if set(fresh_assets) == monitored_keys:

677

context.log.info("All raw tables are fresh, triggering complete ETL")

678

679

# Create run key from latest materialization timestamps

680

latest_timestamp = max(

681

event_record.timestamp

682

for event_record in asset_events.values()

683

if event_record

684

)

685

686

return RunRequest(

687

run_key=f"complete_etl_{int(latest_timestamp)}",

688

run_config={

689

"ops": {

690

"etl_processor": {

691

"config": {

692

"mode": "complete_refresh",

693

"source_tables": ["users", "orders", "products"]

694

}

695

}

696

}

697

},

698

tags={

699

"trigger": "all_raw_tables_fresh",

700

"fresh_assets": ",".join([str(k) for k in fresh_assets]),

701

"latest_materialization": str(latest_timestamp)

702

}

703

)

704

else:

705

missing_assets = monitored_keys - set(fresh_assets)

706

return SkipReason(f"Waiting for fresh materializations: {[str(k) for k in missing_assets]}")

707

708

# Asset sensor with partition awareness

709

@asset_sensor(

710

asset_key=AssetKey("partitioned_sales"),

711

job=partition_analytics_job

712

)

713

def partitioned_sales_sensor(context, asset_event):

714

"""Sensor that handles partitioned assets."""

715

716

# Extract partition information from event

717

materialization = asset_event.dagster_event.step_materialization_data.materialization

718

partition_key = materialization.partition if materialization else None

719

720

if partition_key:

721

# Check if this partition is part of a complete set

722

# (e.g., all partitions for current month)

723

current_month = datetime.now().strftime("%Y-%m")

724

725

if partition_key.startswith(current_month):

726

context.log.info(f"Current month partition materialized: {partition_key}")

727

728

return RunRequest(

729

run_key=f"analytics_{partition_key}",

730

partition_key=partition_key,

731

tags={

732

"partition": partition_key,

733

"month": current_month,

734

"trigger": "current_month_partition"

735

}

736

)

737

else:

738

return SkipReason(f"Historical partition {partition_key}, no immediate action needed")

739

else:

740

return SkipReason("No partition information in materialization")

741

```

742

743

### Run Status Sensors

744

745

#### `@run_failure_sensor` { .api }

746

747

**Module:** `dagster._core.definitions.run_status_sensor_definition`

748

**Type:** Function decorator

749

750

Define a sensor that triggers on job run failures.

751

752

```python

753

from dagster import run_failure_sensor, RunFailureSensorContext, DefaultSensorStatus

754

import smtplib

755

from email.mime.text import MIMEText

756

757

@run_failure_sensor(

758

monitored_jobs=[daily_etl_job, weekly_analytics_job],

759

default_status=DefaultSensorStatus.RUNNING

760

)

761

def job_failure_alert(context: RunFailureSensorContext):

762

"""Send alerts when critical jobs fail."""

763

764

# Access failure information

765

failed_run = context.dagster_run

766

failure_event = context.failure_event

767

768

job_name = failed_run.job_name

769

run_id = failed_run.run_id

770

failure_time = datetime.fromtimestamp(failure_event.timestamp)

771

772

# Extract error information

773

error_info = failure_event.dagster_event.engine_event_data

774

error_message = error_info.error.message if error_info and error_info.error else "Unknown error"

775

776

context.log.error(f"Job {job_name} failed with error: {error_message}")

777

778

# Send email notification

779

alert_message = f"""

780

Job Failure Alert

781

782

Job: {job_name}

783

Run ID: {run_id}

784

Failure Time: {failure_time.strftime('%Y-%m-%d %H:%M:%S')}

785

786

Error: {error_message}

787

788

Dagster UI: {context.instance.get_run_url(run_id)}

789

"""

790

791

try:

792

send_email(

793

subject=f"[DAGSTER ALERT] Job {job_name} Failed",

794

body=alert_message,

795

recipients=["data-team@company.com", "ops-team@company.com"]

796

)

797

798

context.log.info(f"Failure alert sent for job {job_name}")

799

800

except Exception as e:

801

context.log.error(f"Failed to send alert email: {str(e)}")

802

803

# Create Slack notification

804

try:

805

slack_message = {

806

"text": f"🚨 Dagster Job Failure: {job_name}",

807

"blocks": [

808

{

809

"type": "section",

810

"text": {

811

"type": "mrkdwn",

812

"text": f"*Job:* `{job_name}`\n*Run ID:* `{run_id}`\n*Time:* {failure_time.strftime('%Y-%m-%d %H:%M:%S')}"

813

}

814

},

815

{

816

"type": "section",

817

"text": {

818

"type": "mrkdwn",

819

"text": f"*Error:*\n```{error_message[:500]}```"

820

}

821

},

822

{

823

"type": "actions",

824

"elements": [

825

{

826

"type": "button",

827

"text": {"type": "plain_text", "text": "View in Dagster"},

828

"url": context.instance.get_run_url(run_id)

829

}

830

]

831

}

832

]

833

}

834

835

send_slack_message(slack_message)

836

context.log.info(f"Slack alert sent for job {job_name}")

837

838

except Exception as e:

839

context.log.error(f"Failed to send Slack alert: {str(e)}")

840

841

# Run status sensor for success notifications

842

@run_status_sensor(

843

monitored_jobs=[critical_daily_job],

844

run_status=DagsterRunStatus.SUCCESS,

845

default_status=DefaultSensorStatus.RUNNING

846

)

847

def job_success_notification(context):

848

"""Notify on successful completion of critical jobs."""

849

850

successful_run = context.dagster_run

851

job_name = successful_run.job_name

852

853

# Only notify for long-running jobs

854

start_time = successful_run.start_time

855

end_time = successful_run.end_time

856

857

if start_time and end_time:

858

duration = end_time - start_time

859

860

if duration > timedelta(minutes=30): # Only for jobs > 30 minutes

861

context.log.info(f"Long-running job {job_name} completed successfully in {duration}")

862

863

# Send success notification

864

send_slack_message({

865

"text": f"✅ Long-running job `{job_name}` completed successfully",

866

"blocks": [

867

{

868

"type": "section",

869

"text": {

870

"type": "mrkdwn",

871

"text": f"*Job:* `{job_name}`\n*Duration:* {str(duration)}\n*Status:* Success ✅"

872

}

873

}

874

]

875

})

876

```

877

878

#### `@run_status_sensor` { .api }

879

880

**Module:** `dagster._core.definitions.run_status_sensor_definition`

881

**Type:** Function decorator

882

883

Define a sensor for any run status change.

884

885

```python

886

from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus

887

888

@run_status_sensor(

889

run_status=DagsterRunStatus.STARTED,

890

monitored_jobs=[expensive_ml_job]

891

)

892

def job_start_monitor(context: RunStatusSensorContext):

893

"""Monitor job starts for resource scaling."""

894

895

started_run = context.dagster_run

896

job_name = started_run.job_name

897

898

# Scale up resources for expensive jobs

899

if job_name == "expensive_ml_job":

900

context.log.info(f"Expensive job {job_name} started, scaling up resources")

901

902

# Trigger infrastructure scaling

903

try:

904

scale_compute_resources(

905

job_name=job_name,

906

run_id=started_run.run_id,

907

scale_action="up"

908

)

909

910

context.log.info("Successfully scaled up resources")

911

912

except Exception as e:

913

context.log.error(f"Failed to scale resources: {str(e)}")

914

915

@run_status_sensor(

916

run_status=DagsterRunStatus.SUCCESS,

917

monitored_jobs=[expensive_ml_job]

918

)

919

def job_completion_cleanup(context):

920

"""Clean up resources after job completion."""

921

922

completed_run = context.dagster_run

923

job_name = completed_run.job_name

924

925

context.log.info(f"Job {job_name} completed, scaling down resources")

926

927

# Scale down resources

928

try:

929

scale_compute_resources(

930

job_name=job_name,

931

run_id=completed_run.run_id,

932

scale_action="down"

933

)

934

935

context.log.info("Successfully scaled down resources")

936

937

except Exception as e:

938

context.log.error(f"Failed to scale down resources: {str(e)}")

939

```

940

941

## Automation Policies

942

943

### Auto-Materialization Policies

944

945

#### `AutoMaterializePolicy` { .api }

946

947

**Module:** `dagster._core.definitions.auto_materialize_policy`

948

**Type:** Class

949

950

Policy for automatic asset materialization based on upstream changes.

951

952

```python

953

from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset

954

955

# Eager materialization - materialize immediately when upstream changes

956

@asset(auto_materialize_policy=AutoMaterializePolicy.eager())

957

def eager_asset(upstream_data):

958

"""Asset that materializes immediately when upstream changes."""

959

return process_data(upstream_data)

960

961

# Lazy materialization - materialize only when downstream requests it

962

@asset(auto_materialize_policy=AutoMaterializePolicy.lazy())

963

def lazy_asset(source_data):

964

"""Asset that materializes only when needed."""

965

return expensive_computation(source_data)

966

967

# Custom auto-materialization rules

968

@asset(

969

auto_materialize_policy=AutoMaterializePolicy.from_rules([

970

AutoMaterializeRule.materialize_on_parent_updated(),

971

AutoMaterializeRule.materialize_on_required_for_freshness(),

972

AutoMaterializeRule.skip_on_parent_missing(),

973

AutoMaterializeRule.skip_on_parent_outdated()

974

])

975

)

976

def custom_policy_asset(upstream_asset):

977

"""Asset with custom auto-materialization policy."""

978

return transform_data(upstream_asset)

979

980

# Time-based auto-materialization

981

@asset(

982

auto_materialize_policy=AutoMaterializePolicy.from_rules([

983

# Materialize daily at 2 AM

984

AutoMaterializeRule.materialize_on_cron("0 2 * * *"),

985

# But skip if upstream data is too old

986

AutoMaterializeRule.skip_on_parent_outdated(minutes=360) # 6 hours

987

])

988

)

989

def daily_summary(daily_transactions):

990

"""Asset that auto-materializes daily with freshness checks."""

991

return summarize_daily_transactions(daily_transactions)

992

993

# Conditional auto-materialization

994

@asset(

995

auto_materialize_policy=AutoMaterializePolicy.from_rules([

996

AutoMaterializeRule.materialize_on_parent_updated(),

997

# Skip during business hours to avoid impacting production

998

AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron("0 18 * * *") # 6 PM

999

])

1000

)

1001

def batch_processed_asset(real_time_data):

1002

"""Asset that processes in batches outside business hours."""

1003

return batch_process(real_time_data)

1004

```

1005

1006

### Declarative Automation

1007

1008

#### `AutomationCondition` { .api }

1009

1010

**Module:** `dagster._core.definitions.declarative_automation.automation_condition`

1011

**Type:** Class

1012

1013

Declarative conditions for sophisticated automation logic.

1014

1015

```python

1016

from dagster import AutomationCondition, asset

1017

1018

# Complex automation condition

1019

@asset(

1020

automation_condition=(

1021

# Materialize when parent is updated

1022

AutomationCondition.parent_newer()

1023

# But only during off-hours

1024

& AutomationCondition.cron_tick_passed("0 0 * * *") # Midnight

1025

# And not if recently materialized

1026

& ~AutomationCondition.materialized_since_cron("0 18 * * *") # 6 PM

1027

# And only if all dependencies are fresh

1028

& AutomationCondition.all_deps_blocking_checks_passed()

1029

)

1030

)

1031

def sophisticated_asset(upstream_data):

1032

"""Asset with sophisticated automation logic."""

1033

return complex_processing(upstream_data)

1034

1035

# Data quality gated automation

1036

@asset(

1037

automation_condition=(

1038

AutomationCondition.parent_newer()

1039

# Only materialize if upstream passes quality checks

1040

& AutomationCondition.all_deps_blocking_checks_passed()

1041

# And if we haven't failed recently

1042

& ~AutomationCondition.failed_since_cron("0 0 * * *")

1043

)

1044

)

1045

def quality_gated_asset(validated_data):

1046

"""Asset that only materializes with high-quality upstream data."""

1047

return process_validated_data(validated_data)

1048

1049

# Backfill automation

1050

@asset(

1051

automation_condition=(

1052

# Normal condition for regular updates

1053

AutomationCondition.parent_newer()

1054

# OR backfill condition for missing partitions

1055

| (

1056

AutomationCondition.missing()

1057

& AutomationCondition.parent_materialized()

1058

# Only backfill during maintenance windows

1059

& AutomationCondition.cron_tick_passed("0 2 * * 6") # Saturday 2 AM

1060

)

1061

)

1062

)

1063

def backfill_aware_asset(source_data):

1064

"""Asset with automatic backfill logic."""

1065

return process_data_with_backfill(source_data)

1066

```

1067

1068

## Sensor and Schedule Context Builders

1069

1070

### Context Building for Testing

1071

1072

```python

1073

from dagster import build_sensor_context, build_schedule_context, build_run_status_sensor_context

1074

1075

# Test sensor function

1076

def test_file_arrival_sensor():

1077

"""Test sensor with mock context."""

1078

1079

# Create test files

1080

test_files = ["/tmp/test1.csv", "/tmp/test2.csv"]

1081

for file_path in test_files:

1082

Path(file_path).touch()

1083

1084

# Build sensor context

1085

context = build_sensor_context(

1086

cursor="2023-01-01T00:00:00",

1087

instance=DagsterInstance.ephemeral()

1088

)

1089

1090

# Test sensor

1091

result = file_arrival_sensor(context)

1092

1093

# Verify results

1094

assert isinstance(result, list)

1095

assert len(result) == 2

1096

assert all(isinstance(req, RunRequest) for req in result)

1097

1098

# Test schedule function

1099

def test_business_days_schedule():

1100

"""Test schedule with different times."""

1101

1102

# Test Monday execution

1103

monday_context = build_schedule_context(

1104

scheduled_execution_time=datetime(2023, 1, 16, 8, 0), # Monday

1105

instance=DagsterInstance.ephemeral()

1106

)

1107

1108

monday_result = business_days_schedule(monday_context)

1109

assert isinstance(monday_result, RunRequest)

1110

assert monday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == True

1111

1112

# Test Tuesday execution

1113

tuesday_context = build_schedule_context(

1114

scheduled_execution_time=datetime(2023, 1, 17, 8, 0) # Tuesday

1115

)

1116

1117

tuesday_result = business_days_schedule(tuesday_context)

1118

assert tuesday_result.run_config["ops"]["extract_data"]["config"]["full_refresh"] == False

1119

1120

# Test failure sensor

1121

def test_job_failure_alert():

1122

"""Test failure sensor with mock failure."""

1123

1124

# Create mock failure context

1125

context = build_run_status_sensor_context(

1126

dagster_run=mock_failed_run,

1127

dagster_event=mock_failure_event,

1128

instance=DagsterInstance.ephemeral()

1129

)

1130

1131

# Test failure sensor (should not raise)

1132

job_failure_alert(context)

1133

```

1134

1135

This comprehensive sensor and schedule system enables sophisticated automation patterns with event-driven execution, time-based scheduling, failure handling, and declarative automation policies. The system provides rich context information and flexible configuration options for building robust data pipeline automation.

1136

1137

For partitioned execution with sensors and schedules, see [Partitions System](./partitions.md). For error handling and failure management, see [Error Handling](./error-handling.md).