or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

partitions.mddocs/

0

# Partitions System

1

2

This document covers Dagster's comprehensive partitioning system, including partition definitions, mappings, time-based partitions, dynamic partitions, and multi-dimensional partitioning. The partitions system enables efficient processing of large datasets by breaking them into manageable, parallelizable chunks.

3

4

## Partition Definitions

5

6

Partition definitions specify how data is divided into discrete, processable units with automatic dependency tracking and backfill capabilities.

7

8

### Base Partition Classes

9

10

#### `PartitionsDefinition` { .api }

11

12

**Module:** `dagster._core.definitions.partitions.definition`

13

**Type:** Abstract base class

14

15

Base class for all partition definitions with common partition operations.

16

17

```python

18

from dagster import PartitionsDefinition, asset, job, op

19

from typing import Sequence

20

import pandas as pd

21

22

class CustomPartitionsDefinition(PartitionsDefinition):

23

"""Custom partition definition implementation."""

24

25

def __init__(self, partition_keys: Sequence[str]):

26

self._partition_keys = partition_keys

27

28

@property

29

def partition_keys(self) -> Sequence[str]:

30

"""Return all partition keys."""

31

return self._partition_keys

32

33

def get_partition_keys_in_range(self, partition_key_range) -> Sequence[str]:

34

"""Get partition keys in specified range."""

35

start_idx = self._partition_keys.index(partition_key_range.start) if partition_key_range.start else 0

36

end_idx = self._partition_keys.index(partition_key_range.end) + 1 if partition_key_range.end else len(self._partition_keys)

37

return self._partition_keys[start_idx:end_idx]

38

39

# Custom business quarter partitions

40

quarters_partition_def = CustomPartitionsDefinition([

41

"2023-Q1", "2023-Q2", "2023-Q3", "2023-Q4",

42

"2024-Q1", "2024-Q2", "2024-Q3", "2024-Q4"

43

])

44

45

@asset(partitions_def=quarters_partition_def)

46

def quarterly_sales_report(context) -> pd.DataFrame:

47

"""Asset partitioned by business quarters."""

48

partition_key = context.partition_key

49

quarter, year = partition_key.split("-")

50

51

context.log.info(f"Processing {quarter} {year} sales report")

52

53

# Load data for specific quarter

54

quarter_num = int(quarter[1]) # Extract number from Q1, Q2, etc.

55

start_month = (quarter_num - 1) * 3 + 1

56

end_month = quarter_num * 3

57

58

query = f"""

59

SELECT * FROM sales

60

WHERE YEAR(date) = {year}

61

AND MONTH(date) BETWEEN {start_month} AND {end_month}

62

"""

63

64

return pd.read_sql(query, connection)

65

```

66

67

### Static Partitions

68

69

#### `StaticPartitionsDefinition` { .api }

70

71

**Module:** `dagster._core.definitions.partitions.definition`

72

**Type:** PartitionsDefinition subclass

73

74

Partition definition with a fixed set of partition keys.

75

76

```python

77

from dagster import StaticPartitionsDefinition, asset

78

79

# Geographic partitions

80

regions_partitions = StaticPartitionsDefinition([

81

"north_america", "europe", "asia_pacific", "latin_america", "africa"

82

])

83

84

@asset(partitions_def=regions_partitions)

85

def regional_sales_data(context) -> pd.DataFrame:

86

"""Sales data partitioned by geographic region."""

87

88

region = context.partition_key

89

context.log.info(f"Processing sales data for region: {region}")

90

91

# Load region-specific data

92

query = f"SELECT * FROM sales WHERE region = '{region}'"

93

df = pd.read_sql(query, connection)

94

95

# Add partition metadata

96

context.add_output_metadata({

97

"region": region,

98

"record_count": len(df),

99

"processing_date": pd.Timestamp.now().isoformat()

100

})

101

102

return df

103

104

# Product category partitions

105

product_categories = StaticPartitionsDefinition([

106

"electronics", "clothing", "books", "home_garden", "sports", "automotive"

107

])

108

109

@asset(partitions_def=product_categories)

110

def category_analytics(context, regional_sales_data: pd.DataFrame) -> dict:

111

"""Analytics partitioned by product category."""

112

113

category = context.partition_key

114

115

# Filter data for category

116

category_data = regional_sales_data[

117

regional_sales_data["category"] == category

118

]

119

120

analytics = {

121

"category": category,

122

"total_sales": category_data["amount"].sum(),

123

"transaction_count": len(category_data),

124

"avg_transaction": category_data["amount"].mean(),

125

"top_products": category_data.groupby("product")["amount"].sum().nlargest(5).to_dict()

126

}

127

128

context.add_output_metadata({

129

"category": category,

130

"total_sales": analytics["total_sales"],

131

"transaction_count": analytics["transaction_count"]

132

})

133

134

return analytics

135

136

# Environment-based partitions

137

environments_partitions = StaticPartitionsDefinition([

138

"development", "staging", "production"

139

])

140

141

@job(partitions_def=environments_partitions)

142

def deployment_job():

143

"""Job partitioned by deployment environment."""

144

145

@op

146

def deploy_to_environment(context):

147

environment = context.partition_key

148

149

context.log.info(f"Deploying to {environment} environment")

150

151

# Environment-specific deployment logic

152

if environment == "production":

153

# Additional validation and approvals

154

run_production_checks()

155

require_manual_approval()

156

157

deploy_application(environment)

158

run_health_checks(environment)

159

160

return f"Deployed to {environment}"

161

162

deploy_to_environment()

163

```

164

165

### Time-Based Partitions

166

167

#### `TimeWindowPartitionsDefinition` { .api }

168

169

**Module:** `dagster._core.definitions.partitions.definition`

170

**Type:** PartitionsDefinition subclass

171

172

Base class for time-based partitions with configurable time windows.

173

174

```python

175

from dagster import TimeWindowPartitionsDefinition, asset

176

from datetime import datetime, timedelta

177

178

# Custom hourly partitions with business hours only

179

business_hours_partitions = TimeWindowPartitionsDefinition(

180

start=datetime(2023, 1, 1, 9), # 9 AM

181

end=datetime(2024, 1, 1, 18), # 6 PM

182

cron_schedule="0 9-17 * * 1-5", # Business hours, weekdays only

183

timezone="America/New_York"

184

)

185

186

@asset(partitions_def=business_hours_partitions)

187

def business_hours_metrics(context) -> pd.DataFrame:

188

"""Metrics calculated only during business hours."""

189

190

time_window = context.partition_time_window

191

start_time = time_window.start

192

end_time = time_window.end

193

194

context.log.info(f"Processing metrics for {start_time} to {end_time}")

195

196

# Load data for specific time window

197

query = f"""

198

SELECT * FROM user_events

199

WHERE timestamp >= '{start_time}'

200

AND timestamp < '{end_time}'

201

"""

202

203

df = pd.read_sql(query, connection)

204

205

# Calculate business metrics

206

metrics = df.groupby("event_type").agg({

207

"user_id": "nunique",

208

"session_id": "nunique",

209

"timestamp": "count"

210

}).reset_index()

211

212

context.add_output_metadata({

213

"time_window_start": start_time.isoformat(),

214

"time_window_end": end_time.isoformat(),

215

"total_events": len(df),

216

"unique_users": df["user_id"].nunique()

217

})

218

219

return metrics

220

```

221

222

#### `DailyPartitionsDefinition` { .api }

223

224

**Module:** `dagster._core.definitions.partitions.definition`

225

**Type:** TimeWindowPartitionsDefinition subclass

226

227

Daily time-based partitions with automatic date-based partitioning.

228

229

```python

230

from dagster import DailyPartitionsDefinition, asset, AssetIn

231

import pandas as pd

232

233

# Basic daily partitions

234

daily_partitions = DailyPartitionsDefinition(

235

start_date="2023-01-01",

236

timezone="UTC"

237

)

238

239

@asset(partitions_def=daily_partitions)

240

def daily_user_events(context) -> pd.DataFrame:

241

"""Daily user events data."""

242

243

partition_date = context.partition_key # Format: "2023-01-01"

244

context.log.info(f"Processing user events for {partition_date}")

245

246

# Load events for specific date

247

query = f"""

248

SELECT user_id, event_type, timestamp, properties

249

FROM user_events

250

WHERE DATE(timestamp) = '{partition_date}'

251

"""

252

253

df = pd.read_sql(query, connection)

254

255

context.add_output_metadata({

256

"partition_date": partition_date,

257

"event_count": len(df),

258

"unique_users": df["user_id"].nunique(),

259

"event_types": df["event_type"].nunique()

260

})

261

262

return df

263

264

@asset(partitions_def=daily_partitions)

265

def daily_user_summary(context, daily_user_events: pd.DataFrame) -> pd.DataFrame:

266

"""Daily summary of user activity."""

267

268

partition_date = context.partition_key

269

270

# Aggregate events by user

271

user_summary = daily_user_events.groupby("user_id").agg({

272

"event_type": "count",

273

"timestamp": ["min", "max"]

274

}).reset_index()

275

276

user_summary.columns = ["user_id", "event_count", "first_event", "last_event"]

277

user_summary["session_duration"] = (

278

pd.to_datetime(user_summary["last_event"]) -

279

pd.to_datetime(user_summary["first_event"])

280

).dt.total_seconds() / 60 # Minutes

281

282

user_summary["date"] = partition_date

283

284

context.add_output_metadata({

285

"partition_date": partition_date,

286

"active_users": len(user_summary),

287

"avg_events_per_user": user_summary["event_count"].mean(),

288

"avg_session_duration_minutes": user_summary["session_duration"].mean()

289

})

290

291

return user_summary

292

293

# Daily partitions with custom start time

294

daily_partitions_6am = DailyPartitionsDefinition(

295

start_date="2023-01-01",

296

minute_offset=0,

297

hour_offset=6, # Partitions start at 6 AM

298

timezone="America/New_York"

299

)

300

301

@asset(partitions_def=daily_partitions_6am)

302

def business_day_data(context) -> pd.DataFrame:

303

"""Business day data starting at 6 AM."""

304

305

time_window = context.partition_time_window

306

business_day_start = time_window.start # 6 AM

307

business_day_end = time_window.end # Next day 6 AM

308

309

context.log.info(f"Processing business day from {business_day_start} to {business_day_end}")

310

311

query = f"""

312

SELECT * FROM transactions

313

WHERE timestamp >= '{business_day_start}'

314

AND timestamp < '{business_day_end}'

315

"""

316

317

return pd.read_sql(query, connection)

318

```

319

320

#### `HourlyPartitionsDefinition` { .api }

321

322

**Module:** `dagster._core.definitions.partitions.definition`

323

**Type:** TimeWindowPartitionsDefinition subclass

324

325

Hourly time-based partitions for high-frequency data processing.

326

327

```python

328

from dagster import HourlyPartitionsDefinition, asset

329

330

# Hourly partitions for real-time analytics

331

hourly_partitions = HourlyPartitionsDefinition(

332

start_date="2023-01-01-00:00",

333

timezone="UTC"

334

)

335

336

@asset(partitions_def=hourly_partitions)

337

def hourly_api_metrics(context) -> pd.DataFrame:

338

"""Hourly API metrics and performance data."""

339

340

partition_key = context.partition_key # Format: "2023-01-01-14:00"

341

time_window = context.partition_time_window

342

343

hour_start = time_window.start

344

hour_end = time_window.end

345

346

context.log.info(f"Processing API metrics for hour: {partition_key}")

347

348

# Load API logs for specific hour

349

query = f"""

350

SELECT endpoint, status_code, response_time, timestamp

351

FROM api_logs

352

WHERE timestamp >= '{hour_start}'

353

AND timestamp < '{hour_end}'

354

"""

355

356

df = pd.read_sql(query, connection)

357

358

if len(df) > 0:

359

# Calculate hourly metrics

360

metrics = df.groupby(["endpoint", "status_code"]).agg({

361

"response_time": ["count", "mean", "median", "p95"],

362

"timestamp": ["min", "max"]

363

}).reset_index()

364

365

# Flatten column names

366

metrics.columns = [

367

"endpoint", "status_code", "request_count",

368

"avg_response_time", "median_response_time", "p95_response_time",

369

"first_request", "last_request"

370

]

371

372

metrics["hour"] = partition_key

373

374

context.add_output_metadata({

375

"hour": partition_key,

376

"total_requests": len(df),

377

"unique_endpoints": df["endpoint"].nunique(),

378

"error_rate": len(df[df["status_code"] >= 400]) / len(df)

379

})

380

else:

381

# Handle empty hour

382

metrics = pd.DataFrame({

383

"hour": [partition_key],

384

"total_requests": [0],

385

"message": ["No API requests in this hour"]

386

})

387

388

context.add_output_metadata({

389

"hour": partition_key,

390

"total_requests": 0,

391

"status": "no_data"

392

})

393

394

return metrics

395

396

# Hourly partitions with business hours filter

397

business_hours_only = HourlyPartitionsDefinition(

398

start_date="2023-01-01-09:00",

399

end_date="2023-12-31-18:00",

400

timezone="America/New_York"

401

)

402

403

@asset(partitions_def=business_hours_only)

404

def business_hours_analytics(context) -> dict:

405

"""Analytics only during business hours."""

406

407

time_window = context.partition_time_window

408

hour = time_window.start.hour

409

410

# Only process during business hours (9 AM - 6 PM)

411

if 9 <= hour < 18:

412

context.log.info(f"Processing business hour analytics for {context.partition_key}")

413

414

# Business analytics logic

415

return {

416

"hour": context.partition_key,

417

"is_business_hour": True,

418

"analytics_data": "processed"

419

}

420

else:

421

context.log.info(f"Skipping non-business hour: {context.partition_key}")

422

return {

423

"hour": context.partition_key,

424

"is_business_hour": False,

425

"message": "Outside business hours"

426

}

427

```

428

429

#### `WeeklyPartitionsDefinition` { .api }

430

431

**Module:** `dagster._core.definitions.partitions.definition`

432

**Type:** TimeWindowPartitionsDefinition subclass

433

434

Weekly time-based partitions for weekly aggregations and reports.

435

436

```python

437

from dagster import WeeklyPartitionsDefinition, asset

438

439

# Weekly partitions starting on Monday

440

weekly_partitions = WeeklyPartitionsDefinition(

441

start_date="2023-01-02", # First Monday of 2023

442

timezone="UTC"

443

)

444

445

@asset(partitions_def=weekly_partitions)

446

def weekly_sales_summary(context) -> pd.DataFrame:

447

"""Weekly sales summary and trends."""

448

449

time_window = context.partition_time_window

450

week_start = time_window.start

451

week_end = time_window.end

452

453

partition_key = context.partition_key # Format: "2023-01-02"

454

455

context.log.info(f"Processing weekly sales summary for week {partition_key}")

456

context.log.info(f"Week span: {week_start} to {week_end}")

457

458

# Load sales data for the week

459

query = f"""

460

SELECT DATE(sale_date) as day, product_category, SUM(amount) as daily_sales

461

FROM sales

462

WHERE sale_date >= '{week_start.date()}'

463

AND sale_date < '{week_end.date()}'

464

GROUP BY DATE(sale_date), product_category

465

ORDER BY day, product_category

466

"""

467

468

df = pd.read_sql(query, connection)

469

470

if len(df) > 0:

471

# Calculate weekly aggregations

472

weekly_summary = df.groupby("product_category").agg({

473

"daily_sales": ["sum", "mean", "std", "count"]

474

}).reset_index()

475

476

weekly_summary.columns = [

477

"product_category", "total_weekly_sales",

478

"avg_daily_sales", "daily_sales_std", "days_with_sales"

479

]

480

481

weekly_summary["week_start"] = week_start.date()

482

weekly_summary["week_end"] = (week_end - timedelta(days=1)).date()

483

484

context.add_output_metadata({

485

"week_start": week_start.isoformat(),

486

"week_end": week_end.isoformat(),

487

"total_sales": df["daily_sales"].sum(),

488

"categories_sold": df["product_category"].nunique(),

489

"sales_days": df["day"].nunique()

490

})

491

else:

492

weekly_summary = pd.DataFrame({

493

"week_start": [week_start.date()],

494

"week_end": [(week_end - timedelta(days=1)).date()],

495

"message": ["No sales data for this week"]

496

})

497

498

context.add_output_metadata({

499

"week_start": week_start.isoformat(),

500

"week_end": week_end.isoformat(),

501

"total_sales": 0,

502

"status": "no_data"

503

})

504

505

return weekly_summary

506

507

@asset(partitions_def=weekly_partitions)

508

def weekly_user_cohorts(context, daily_user_summary: pd.DataFrame) -> pd.DataFrame:

509

"""Weekly user cohort analysis."""

510

511

time_window = context.partition_time_window

512

week_dates = pd.date_range(

513

start=time_window.start.date(),

514

end=(time_window.end - timedelta(days=1)).date(),

515

freq="D"

516

).strftime("%Y-%m-%d").tolist()

517

518

context.log.info(f"Analyzing user cohorts for dates: {week_dates}")

519

520

# Filter daily summaries for this week

521

week_data = daily_user_summary[

522

daily_user_summary["date"].isin(week_dates)

523

]

524

525

if len(week_data) > 0:

526

# Cohort analysis

527

cohort_analysis = week_data.groupby("user_id").agg({

528

"event_count": "sum",

529

"session_duration": "sum",

530

"date": ["count", "min", "max"]

531

}).reset_index()

532

533

cohort_analysis.columns = [

534

"user_id", "total_events", "total_session_minutes",

535

"active_days", "first_seen", "last_seen"

536

]

537

538

# User engagement segmentation

539

cohort_analysis["engagement_level"] = pd.cut(

540

cohort_analysis["total_events"],

541

bins=[0, 5, 20, 50, float("inf")],

542

labels=["low", "medium", "high", "power_user"]

543

)

544

545

cohort_analysis["week_start"] = time_window.start.date()

546

547

context.add_output_metadata({

548

"week_start": time_window.start.isoformat(),

549

"active_users": len(cohort_analysis),

550

"avg_events_per_user": cohort_analysis["total_events"].mean(),

551

"engagement_distribution": cohort_analysis["engagement_level"].value_counts().to_dict()

552

})

553

else:

554

cohort_analysis = pd.DataFrame({

555

"week_start": [time_window.start.date()],

556

"message": ["No user data for this week"]

557

})

558

559

context.add_output_metadata({

560

"week_start": time_window.start.isoformat(),

561

"active_users": 0,

562

"status": "no_data"

563

})

564

565

return cohort_analysis

566

```

567

568

#### `MonthlyPartitionsDefinition` { .api }

569

570

**Module:** `dagster._core.definitions.partitions.definition`

571

**Type:** TimeWindowPartitionsDefinition subclass

572

573

Monthly time-based partitions for monthly reporting and aggregations.

574

575

```python

576

from dagster import MonthlyPartitionsDefinition, asset

577

578

# Monthly partitions for financial reporting

579

monthly_partitions = MonthlyPartitionsDefinition(

580

start_date="2023-01-01",

581

timezone="UTC"

582

)

583

584

@asset(partitions_def=monthly_partitions)

585

def monthly_financial_report(context) -> pd.DataFrame:

586

"""Monthly financial report with comprehensive metrics."""

587

588

time_window = context.partition_time_window

589

month_start = time_window.start

590

month_end = time_window.end

591

592

partition_key = context.partition_key # Format: "2023-01-01"

593

month_name = month_start.strftime("%B %Y")

594

595

context.log.info(f"Processing financial report for {month_name}")

596

597

# Load financial data for the month

598

query = f"""

599

SELECT

600

department,

601

SUM(revenue) as total_revenue,

602

SUM(costs) as total_costs,

603

COUNT(DISTINCT customer_id) as unique_customers,

604

COUNT(*) as transaction_count

605

FROM financial_transactions

606

WHERE transaction_date >= '{month_start.date()}'

607

AND transaction_date < '{month_end.date()}'

608

GROUP BY department

609

"""

610

611

df = pd.read_sql(query, connection)

612

613

if len(df) > 0:

614

# Calculate financial metrics

615

df["profit"] = df["total_revenue"] - df["total_costs"]

616

df["profit_margin"] = df["profit"] / df["total_revenue"]

617

df["revenue_per_customer"] = df["total_revenue"] / df["unique_customers"]

618

df["avg_transaction_value"] = df["total_revenue"] / df["transaction_count"]

619

620

# Add month information

621

df["month"] = month_name

622

df["month_start"] = month_start.date()

623

df["month_end"] = (month_end - timedelta(days=1)).date()

624

625

# Company-wide totals

626

total_revenue = df["total_revenue"].sum()

627

total_profit = df["profit"].sum()

628

total_customers = df["unique_customers"].sum()

629

630

context.add_output_metadata({

631

"month": month_name,

632

"total_revenue": total_revenue,

633

"total_profit": total_profit,

634

"company_profit_margin": total_profit / total_revenue if total_revenue > 0 else 0,

635

"total_customers": total_customers,

636

"departments": len(df)

637

})

638

else:

639

df = pd.DataFrame({

640

"month": [month_name],

641

"message": ["No financial data for this month"]

642

})

643

644

context.add_output_metadata({

645

"month": month_name,

646

"total_revenue": 0,

647

"status": "no_data"

648

})

649

650

return df

651

652

# Fiscal year partitions (custom monthly with fiscal year start)

653

fiscal_monthly_partitions = MonthlyPartitionsDefinition(

654

start_date="2023-04-01", # Fiscal year starts in April

655

timezone="America/New_York"

656

)

657

658

@asset(partitions_def=fiscal_monthly_partitions)

659

def fiscal_monthly_budget_analysis(context) -> dict:

660

"""Fiscal year monthly budget analysis."""

661

662

time_window = context.partition_time_window

663

fiscal_month_start = time_window.start

664

665

# Determine fiscal year and month

666

if fiscal_month_start.month >= 4: # April-December

667

fiscal_year = fiscal_month_start.year

668

fiscal_month = fiscal_month_start.month - 3 # April = Month 1

669

else: # January-March

670

fiscal_year = fiscal_month_start.year - 1

671

fiscal_month = fiscal_month_start.month + 9 # January = Month 10

672

673

context.log.info(f"Processing FY{fiscal_year} Month {fiscal_month}")

674

675

budget_analysis = {

676

"fiscal_year": fiscal_year,

677

"fiscal_month": fiscal_month,

678

"calendar_month": fiscal_month_start.strftime("%B %Y"),

679

"quarter": f"Q{((fiscal_month - 1) // 3) + 1}",

680

"is_fiscal_year_end": fiscal_month == 12

681

}

682

683

context.add_output_metadata({

684

"fiscal_year": fiscal_year,

685

"fiscal_month": fiscal_month,

686

"calendar_month": fiscal_month_start.strftime("%B %Y"),

687

"quarter": budget_analysis["quarter"]

688

})

689

690

return budget_analysis

691

```

692

693

### Dynamic Partitions

694

695

#### `DynamicPartitionsDefinition` { .api }

696

697

**Module:** `dagster._core.definitions.partitions.definition`

698

**Type:** PartitionsDefinition subclass

699

700

Partition definition where partitions can be added and removed at runtime.

701

702

```python

703

from dagster import DynamicPartitionsDefinition, asset

704

from dagster import AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest

705

706

# Dynamic customer partitions

707

customers_partitions = DynamicPartitionsDefinition(name="customers")

708

709

@asset(partitions_def=customers_partitions)

710

def customer_analytics(context) -> pd.DataFrame:

711

"""Analytics for individual customers using dynamic partitions."""

712

713

customer_id = context.partition_key

714

context.log.info(f"Processing analytics for customer: {customer_id}")

715

716

# Load customer-specific data

717

query = f"""

718

SELECT

719

DATE(order_date) as date,

720

product_category,

721

SUM(order_value) as daily_spend

722

FROM orders

723

WHERE customer_id = '{customer_id}'

724

AND order_date >= CURRENT_DATE - INTERVAL 90 DAY

725

GROUP BY DATE(order_date), product_category

726

ORDER BY date DESC

727

"""

728

729

df = pd.read_sql(query, connection)

730

731

context.add_output_metadata({

732

"customer_id": customer_id,

733

"days_with_orders": len(df),

734

"total_spend": df["daily_spend"].sum() if len(df) > 0 else 0,

735

"categories_purchased": df["product_category"].nunique() if len(df) > 0 else 0

736

})

737

738

return df

739

740

# Sensor to dynamically add new customer partitions

741

@sensor(minimum_interval_seconds=3600) # Check hourly

742

def new_customer_sensor(context):

743

"""Add partitions for new customers."""

744

745

# Find customers added in the last hour

746

query = """

747

SELECT DISTINCT customer_id

748

FROM customers

749

WHERE created_at >= NOW() - INTERVAL 1 HOUR

750

"""

751

752

new_customers = execute_query(query)

753

754

if new_customers:

755

customer_ids = [row["customer_id"] for row in new_customers]

756

757

context.log.info(f"Adding partitions for {len(customer_ids)} new customers")

758

759

return AddDynamicPartitionsRequest(

760

partitions_def_name="customers",

761

partition_keys=customer_ids

762

)

763

764

return SkipReason("No new customers found")

765

766

# Job to clean up inactive customer partitions

767

@job

768

def cleanup_inactive_customers():

769

"""Remove partitions for inactive customers."""

770

771

@op

772

def identify_inactive_customers(context) -> list:

773

# Find customers with no activity in 6 months

774

query = """

775

SELECT DISTINCT c.customer_id

776

FROM customers c

777

LEFT JOIN orders o ON c.customer_id = o.customer_id

778

AND o.order_date >= CURRENT_DATE - INTERVAL 6 MONTH

779

WHERE o.customer_id IS NULL

780

AND c.created_at < CURRENT_DATE - INTERVAL 6 MONTH

781

"""

782

783

inactive_customers = execute_query(query)

784

customer_ids = [row["customer_id"] for row in inactive_customers]

785

786

context.log.info(f"Found {len(customer_ids)} inactive customers")

787

return customer_ids

788

789

@op

790

def remove_customer_partitions(context, inactive_customers: list):

791

if inactive_customers:

792

context.log.info(f"Removing partitions for {len(inactive_customers)} inactive customers")

793

794

return DeleteDynamicPartitionsRequest(

795

partitions_def_name="customers",

796

partition_keys=inactive_customers

797

)

798

else:

799

context.log.info("No inactive customers to remove")

800

return None

801

802

remove_customer_partitions(identify_inactive_customers())

803

804

# Dynamic experiment partitions for A/B testing

805

experiments_partitions = DynamicPartitionsDefinition(name="experiments")

806

807

@asset(partitions_def=experiments_partitions)

808

def experiment_results(context) -> dict:

809

"""Results analysis for A/B test experiments."""

810

811

experiment_id = context.partition_key

812

context.log.info(f"Analyzing results for experiment: {experiment_id}")

813

814

# Load experiment data

815

query = f"""

816

SELECT

817

variant,

818

COUNT(*) as participants,

819

SUM(converted) as conversions,

820

AVG(metric_value) as avg_metric

821

FROM experiment_data

822

WHERE experiment_id = '{experiment_id}'

823

GROUP BY variant

824

"""

825

826

df = pd.read_sql(query, connection)

827

828

if len(df) > 0:

829

# Calculate experiment results

830

results = {

831

"experiment_id": experiment_id,

832

"variants": df["variant"].tolist(),

833

"total_participants": df["participants"].sum(),

834

"results_by_variant": df.to_dict("records")

835

}

836

837

# Statistical significance testing

838

if len(df) == 2: # A/B test

839

control = df[df["variant"] == "control"].iloc[0] if len(df[df["variant"] == "control"]) > 0 else None

840

treatment = df[df["variant"] == "treatment"].iloc[0] if len(df[df["variant"] == "treatment"]) > 0 else None

841

842

if control is not None and treatment is not None:

843

control_rate = control["conversions"] / control["participants"]

844

treatment_rate = treatment["conversions"] / treatment["participants"]

845

lift = (treatment_rate - control_rate) / control_rate if control_rate > 0 else 0

846

847

results["lift"] = lift

848

results["control_conversion_rate"] = control_rate

849

results["treatment_conversion_rate"] = treatment_rate

850

851

context.add_output_metadata({

852

"experiment_id": experiment_id,

853

"total_participants": results["total_participants"],

854

"variants_count": len(df),

855

"lift": results.get("lift", "N/A")

856

})

857

else:

858

results = {

859

"experiment_id": experiment_id,

860

"error": "No data found for experiment"

861

}

862

863

context.add_output_metadata({

864

"experiment_id": experiment_id,

865

"total_participants": 0,

866

"status": "no_data"

867

})

868

869

return results

870

```

871

872

### Multi-Dimensional Partitions

873

874

#### `MultiPartitionsDefinition` { .api }

875

876

**Module:** `dagster._core.definitions.partitions.definition`

877

**Type:** PartitionsDefinition subclass

878

879

Partition definition combining multiple partition dimensions.

880

881

```python

882

from dagster import MultiPartitionsDefinition, MultiPartitionKey, asset

883

884

# Multi-dimensional partitioning: Date x Region

885

multi_partitions = MultiPartitionsDefinition({

886

"date": DailyPartitionsDefinition(start_date="2023-01-01"),

887

"region": StaticPartitionsDefinition(["us", "eu", "asia"])

888

})

889

890

@asset(partitions_def=multi_partitions)

891

def regional_daily_sales(context) -> pd.DataFrame:

892

"""Sales data partitioned by both date and region."""

893

894

# Access multi-dimensional partition key

895

partition_key = context.partition_key

896

897

if isinstance(partition_key, MultiPartitionKey):

898

date_key = partition_key.keys_by_dimension["date"]

899

region_key = partition_key.keys_by_dimension["region"]

900

else:

901

# Handle string representation

902

date_key, region_key = partition_key.split("|")

903

904

context.log.info(f"Processing sales for {region_key} on {date_key}")

905

906

# Load region and date specific data

907

query = f"""

908

SELECT product_id, SUM(amount) as sales, COUNT(*) as transactions

909

FROM sales

910

WHERE DATE(sale_date) = '{date_key}'

911

AND region = '{region_key}'

912

GROUP BY product_id

913

"""

914

915

df = pd.read_sql(query, connection)

916

917

# Add partition information

918

df["partition_date"] = date_key

919

df["partition_region"] = region_key

920

921

context.add_output_metadata({

922

"partition_date": date_key,

923

"partition_region": region_key,

924

"total_sales": df["sales"].sum() if len(df) > 0 else 0,

925

"unique_products": len(df),

926

"total_transactions": df["transactions"].sum() if len(df) > 0 else 0

927

})

928

929

return df

930

931

# Three-dimensional partitioning: Date x Region x Product Category

932

three_dim_partitions = MultiPartitionsDefinition({

933

"date": DailyPartitionsDefinition(start_date="2023-01-01"),

934

"region": StaticPartitionsDefinition(["us", "eu", "asia", "latam"]),

935

"category": StaticPartitionsDefinition(["electronics", "clothing", "books", "home"])

936

})

937

938

@asset(partitions_def=three_dim_partitions)

939

def detailed_sales_metrics(context) -> dict:

940

"""Highly granular sales metrics with three-dimensional partitioning."""

941

942

partition_key = context.partition_key

943

944

if isinstance(partition_key, MultiPartitionKey):

945

date_key = partition_key.keys_by_dimension["date"]

946

region_key = partition_key.keys_by_dimension["region"]

947

category_key = partition_key.keys_by_dimension["category"]

948

else:

949

# Parse string representation

950

parts = str(partition_key).split("|")

951

date_key, region_key, category_key = parts

952

953

context.log.info(f"Processing {category_key} sales in {region_key} on {date_key}")

954

955

# Highly specific query

956

query = f"""

957

SELECT

958

AVG(amount) as avg_sale_amount,

959

MIN(amount) as min_sale_amount,

960

MAX(amount) as max_sale_amount,

961

COUNT(*) as transaction_count,

962

SUM(amount) as total_sales,

963

COUNT(DISTINCT customer_id) as unique_customers

964

FROM sales

965

WHERE DATE(sale_date) = '{date_key}'

966

AND region = '{region_key}'

967

AND category = '{category_key}'

968

"""

969

970

result = execute_query(query)

971

972

if result and len(result) > 0:

973

metrics = result[0]

974

metrics.update({

975

"partition_date": date_key,

976

"partition_region": region_key,

977

"partition_category": category_key,

978

"has_data": metrics["transaction_count"] > 0

979

})

980

else:

981

metrics = {

982

"partition_date": date_key,

983

"partition_region": region_key,

984

"partition_category": category_key,

985

"has_data": False,

986

"transaction_count": 0,

987

"total_sales": 0

988

}

989

990

context.add_output_metadata({

991

"date": date_key,

992

"region": region_key,

993

"category": category_key,

994

"transaction_count": metrics["transaction_count"],

995

"total_sales": metrics["total_sales"]

996

})

997

998

return metrics

999

1000

# Complex multi-dimensional example with time and business dimensions

1001

business_multi_partitions = MultiPartitionsDefinition({

1002

"month": MonthlyPartitionsDefinition(start_date="2023-01-01"),

1003

"department": StaticPartitionsDefinition(["sales", "marketing", "engineering", "support"]),

1004

"metric_type": StaticPartitionsDefinition(["revenue", "costs", "headcount", "productivity"])

1005

})

1006

1007

@asset(partitions_def=business_multi_partitions)

1008

def department_kpi_metrics(context) -> dict:

1009

"""KPI metrics partitioned by month, department, and metric type."""

1010

1011

partition_key = context.partition_key

1012

1013

if isinstance(partition_key, MultiPartitionKey):

1014

month_key = partition_key.keys_by_dimension["month"]

1015

department_key = partition_key.keys_by_dimension["department"]

1016

metric_type_key = partition_key.keys_by_dimension["metric_type"]

1017

else:

1018

month_key, department_key, metric_type_key = str(partition_key).split("|")

1019

1020

context.log.info(f"Processing {metric_type_key} for {department_key} in {month_key}")

1021

1022

# Metric-specific data loading

1023

if metric_type_key == "revenue":

1024

query = f"""

1025

SELECT SUM(amount) as value, 'USD' as unit

1026

FROM revenue

1027

WHERE department = '{department_key}'

1028

AND YEAR(date) = YEAR('{month_key}')

1029

AND MONTH(date) = MONTH('{month_key}')

1030

"""

1031

elif metric_type_key == "costs":

1032

query = f"""

1033

SELECT SUM(cost) as value, 'USD' as unit

1034

FROM expenses

1035

WHERE department = '{department_key}'

1036

AND YEAR(date) = YEAR('{month_key}')

1037

AND MONTH(date) = MONTH('{month_key}')

1038

"""

1039

elif metric_type_key == "headcount":

1040

query = f"""

1041

SELECT COUNT(DISTINCT employee_id) as value, 'people' as unit

1042

FROM employees

1043

WHERE department = '{department_key}'

1044

AND start_date <= '{month_key}'

1045

AND (end_date IS NULL OR end_date > LAST_DAY('{month_key}'))

1046

"""

1047

elif metric_type_key == "productivity":

1048

query = f"""

1049

SELECT AVG(productivity_score) as value, 'score' as unit

1050

FROM productivity_metrics

1051

WHERE department = '{department_key}'

1052

AND YEAR(date) = YEAR('{month_key}')

1053

AND MONTH(date) = MONTH('{month_key}')

1054

"""

1055

1056

result = execute_query(query)

1057

1058

if result and len(result) > 0:

1059

kpi_data = {

1060

"month": month_key,

1061

"department": department_key,

1062

"metric_type": metric_type_key,

1063

"value": result[0]["value"] or 0,

1064

"unit": result[0]["unit"],

1065

"has_data": result[0]["value"] is not None

1066

}

1067

else:

1068

kpi_data = {

1069

"month": month_key,

1070

"department": department_key,

1071

"metric_type": metric_type_key,

1072

"value": 0,

1073

"unit": "unknown",

1074

"has_data": False

1075

}

1076

1077

context.add_output_metadata({

1078

"month": month_key,

1079

"department": department_key,

1080

"metric_type": metric_type_key,

1081

"value": kpi_data["value"],

1082

"unit": kpi_data["unit"]

1083

})

1084

1085

return kpi_data

1086

```

1087

1088

## Partition Mappings

1089

1090

Partition mappings define how partitioned assets depend on partitions of their upstream assets, enabling sophisticated dependency patterns.

1091

1092

### Base Partition Mapping

1093

1094

#### `PartitionMapping` { .api }

1095

1096

**Module:** `dagster._core.definitions.partitions.mapping`

1097

**Type:** Abstract base class

1098

1099

Base class for partition mapping strategies.

1100

1101

```python

1102

from dagster import PartitionMapping, PartitionKeyRange

1103

from typing import Optional, Sequence

1104

1105

class BusinessDayPartitionMapping(PartitionMapping):

1106

"""Custom partition mapping for business days only."""

1107

1108

def __init__(self, skip_weekends: bool = True):

1109

self.skip_weekends = skip_weekends

1110

1111

def get_upstream_mapped_partitions_result_for_partition_key(

1112

self, downstream_partition_key: str, downstream_partitions_def, upstream_partitions_def

1113

):

1114

"""Map downstream partition to upstream partitions."""

1115

1116

# Convert partition key to date

1117

from datetime import datetime, timedelta

1118

partition_date = datetime.strptime(downstream_partition_key, "%Y-%m-%d")

1119

1120

if self.skip_weekends and partition_date.weekday() >= 5: # Weekend

1121

# Skip weekends - no upstream dependency

1122

return PartitionKeyRange(start=None, end=None)

1123

1124

# For business days, map to same day

1125

return PartitionKeyRange(

1126

start=downstream_partition_key,

1127

end=downstream_partition_key

1128

)

1129

1130

def get_downstream_partitions_for_partition_key(

1131

self, upstream_partition_key: str, downstream_partitions_def, upstream_partitions_def

1132

):

1133

"""Map upstream partition to downstream partitions."""

1134

1135

# Business day mapping - same logic in reverse

1136

partition_date = datetime.strptime(upstream_partition_key, "%Y-%m-%d")

1137

1138

if self.skip_weekends and partition_date.weekday() >= 5:

1139

return []

1140

1141

return [upstream_partition_key]

1142

1143

# Usage of custom partition mapping

1144

@asset(partitions_def=daily_partitions)

1145

def raw_trading_data(context) -> pd.DataFrame:

1146

"""Raw trading data (all days)."""

1147

partition_date = context.partition_key

1148

return load_trading_data(partition_date)

1149

1150

@asset(

1151

partitions_def=daily_partitions,

1152

ins={"raw_trading_data": AssetIn(partition_mapping=BusinessDayPartitionMapping())}

1153

)

1154

def business_day_analysis(context, raw_trading_data: pd.DataFrame) -> pd.DataFrame:

1155

"""Analysis that only runs on business days."""

1156

return analyze_trading_data(raw_trading_data)

1157

```

1158

1159

### Built-in Partition Mappings

1160

1161

#### `IdentityPartitionMapping` { .api }

1162

1163

**Module:** `dagster._core.definitions.partitions.mapping`

1164

**Type:** PartitionMapping subclass

1165

1166

One-to-one partition mapping (default behavior).

1167

1168

```python

1169

from dagster import IdentityPartitionMapping, asset, AssetIn

1170

1171

@asset(partitions_def=daily_partitions)

1172

def daily_raw_data(context) -> pd.DataFrame:

1173

"""Daily raw data."""

1174

return load_raw_data(context.partition_key)

1175

1176

@asset(

1177

partitions_def=daily_partitions,

1178

ins={"daily_raw_data": AssetIn(partition_mapping=IdentityPartitionMapping())}

1179

)

1180

def daily_processed_data(context, daily_raw_data: pd.DataFrame) -> pd.DataFrame:

1181

"""Processed data with identity mapping (same partition)."""

1182

return process_data(daily_raw_data)

1183

```

1184

1185

#### `AllPartitionMapping` { .api }

1186

1187

**Module:** `dagster._core.definitions.partitions.mapping`

1188

**Type:** PartitionMapping subclass

1189

1190

Mapping where downstream partition depends on all upstream partitions.

1191

1192

```python

1193

from dagster import AllPartitionMapping, asset, AssetIn

1194

1195

@asset(

1196

ins={"daily_processed_data": AssetIn(partition_mapping=AllPartitionMapping())}

1197

)

1198

def complete_dataset_summary(context, daily_processed_data: pd.DataFrame) -> dict:

1199

"""Summary that depends on all daily data partitions."""

1200

1201

# This asset will wait for all partitions of daily_processed_data

1202

context.log.info(f"Creating summary from {len(daily_processed_data)} daily records")

1203

1204

return {

1205

"total_records": len(daily_processed_data),

1206

"date_range": {

1207

"start": daily_processed_data["date"].min(),

1208

"end": daily_processed_data["date"].max()

1209

},

1210

"summary_stats": daily_processed_data.describe().to_dict()

1211

}

1212

```

1213

1214

#### `TimeWindowPartitionMapping` { .api }

1215

1216

**Module:** `dagster._core.definitions.partitions.mapping`

1217

**Type:** PartitionMapping subclass

1218

1219

Time-based partition mapping with configurable time windows and offsets.

1220

1221

```python

1222

from dagster import TimeWindowPartitionMapping, asset, AssetIn

1223

1224

# Weekly summary depends on last 7 days of daily data

1225

@asset(

1226

partitions_def=weekly_partitions,

1227

ins={

1228

"daily_sales": AssetIn(

1229

partition_mapping=TimeWindowPartitionMapping(

1230

start_offset=-6, # 6 days before

1231

end_offset=0 # current day

1232

)

1233

)

1234

}

1235

)

1236

def weekly_sales_summary(context, daily_sales: pd.DataFrame) -> pd.DataFrame:

1237

"""Weekly summary from 7 days of daily data."""

1238

1239

time_window = context.partition_time_window

1240

context.log.info(f"Creating weekly summary for {time_window.start} to {time_window.end}")

1241

1242

# daily_sales contains data from last 7 days

1243

weekly_summary = daily_sales.groupby("product_category").agg({

1244

"sales_amount": "sum",

1245

"transaction_count": "sum"

1246

}).reset_index()

1247

1248

weekly_summary["week_start"] = time_window.start.date()

1249

weekly_summary["week_end"] = (time_window.end - timedelta(days=1)).date()

1250

1251

return weekly_summary

1252

1253

# Monthly report depends on previous month's data

1254

@asset(

1255

partitions_def=monthly_partitions,

1256

ins={

1257

"weekly_sales_summary": AssetIn(

1258

partition_mapping=TimeWindowPartitionMapping(

1259

start_offset=-1, # Previous month

1260

end_offset=-1

1261

)

1262

)

1263

}

1264

)

1265

def monthly_trends_analysis(context, weekly_sales_summary: pd.DataFrame) -> dict:

1266

"""Monthly trends based on previous month's data."""

1267

1268

partition_month = context.partition_time_window.start.strftime("%B %Y")

1269

previous_month = (context.partition_time_window.start - timedelta(days=1)).strftime("%B %Y")

1270

1271

context.log.info(f"Analyzing {partition_month} trends using {previous_month} data")

1272

1273

trends = {

1274

"analysis_month": partition_month,

1275

"data_month": previous_month,

1276

"weekly_growth_rate": calculate_growth_rate(weekly_sales_summary),

1277

"category_performance": analyze_category_trends(weekly_sales_summary)

1278

}

1279

1280

return trends

1281

```

1282

1283

#### `StaticPartitionMapping` { .api }

1284

1285

**Module:** `dagster._core.definitions.partitions.mapping`

1286

**Type:** PartitionMapping subclass

1287

1288

Static mapping to specific upstream partitions.

1289

1290

```python

1291

from dagster import StaticPartitionMapping, asset, AssetIn

1292

1293

# Asset that always depends on specific reference partitions

1294

@asset(

1295

partitions_def=regions_partitions,

1296

ins={

1297

"global_config": AssetIn(

1298

partition_mapping=StaticPartitionMapping({

1299

"north_america": ["config_v1"],

1300

"europe": ["config_v2"],

1301

"asia_pacific": ["config_v1"],

1302

"latin_america": ["config_v1"],

1303

"africa": ["config_v2"]

1304

})

1305

)

1306

}

1307

)

1308

def regional_processing(context, global_config: dict) -> pd.DataFrame:

1309

"""Regional processing with static config mapping."""

1310

1311

region = context.partition_key

1312

context.log.info(f"Processing {region} with config version {global_config.get('version')}")

1313

1314

return process_regional_data(region, global_config)

1315

```

1316

1317

#### `MultiPartitionMapping` { .api }

1318

1319

**Module:** `dagster._core.definitions.partitions.mapping`

1320

**Type:** PartitionMapping subclass

1321

1322

Mapping for multi-dimensional partitions with dimension-specific mappings.

1323

1324

```python

1325

from dagster import MultiPartitionMapping, DimensionPartitionMapping, asset, AssetIn

1326

1327

# Multi-partition mapping with different strategies per dimension

1328

multi_mapping = MultiPartitionMapping({

1329

"date": TimeWindowPartitionMapping(start_offset=-1, end_offset=0), # Yesterday and today

1330

"region": IdentityPartitionMapping() # Same region

1331

})

1332

1333

@asset(

1334

partitions_def=multi_partitions, # date x region

1335

ins={

1336

"regional_daily_sales": AssetIn(partition_mapping=multi_mapping)

1337

}

1338

)

1339

def two_day_regional_trends(context, regional_daily_sales: pd.DataFrame) -> dict:

1340

"""Trends analysis using 2 days of data for same region."""

1341

1342

partition_key = context.partition_key

1343

if isinstance(partition_key, MultiPartitionKey):

1344

date_key = partition_key.keys_by_dimension["date"]

1345

region_key = partition_key.keys_by_dimension["region"]

1346

1347

context.log.info(f"Analyzing 2-day trends for {region_key} on {date_key}")

1348

1349

# regional_daily_sales contains data from yesterday and today for this region

1350

if len(regional_daily_sales) >= 2:

1351

# Compare yesterday vs today

1352

yesterday_data = regional_daily_sales[regional_daily_sales["partition_date"] ==

1353

(datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")]

1354

today_data = regional_daily_sales[regional_daily_sales["partition_date"] == date_key]

1355

1356

trends = {

1357

"date": date_key,

1358

"region": region_key,

1359

"yesterday_sales": yesterday_data["sales"].sum() if len(yesterday_data) > 0 else 0,

1360

"today_sales": today_data["sales"].sum() if len(today_data) > 0 else 0,

1361

}

1362

1363

if trends["yesterday_sales"] > 0:

1364

trends["growth_rate"] = (trends["today_sales"] - trends["yesterday_sales"]) / trends["yesterday_sales"]

1365

else:

1366

trends["growth_rate"] = None

1367

else:

1368

trends = {

1369

"date": date_key,

1370

"region": region_key,

1371

"error": "Insufficient data for trend analysis"

1372

}

1373

1374

return trends

1375

```

1376

1377

## Partitioned Configuration

1378

1379

Partitioned configuration allows partition-specific configuration values, enabling different processing logic per partition.

1380

1381

### `PartitionedConfig` { .api }

1382

1383

**Module:** `dagster._core.definitions.partitions.partitioned_config`

1384

**Type:** Class

1385

1386

Configuration that varies by partition with dynamic config generation.

1387

1388

```python

1389

from dagster import PartitionedConfig, job, op, daily_partitioned_config

1390

1391

@op(config_schema={"batch_size": int, "processing_mode": str})

1392

def process_partition_data(context):

1393

"""Op with partition-specific configuration."""

1394

1395

config = context.op_config

1396

partition_key = context.partition_key

1397

1398

batch_size = config["batch_size"]

1399

processing_mode = config["processing_mode"]

1400

1401

context.log.info(f"Processing partition {partition_key} with batch_size={batch_size}, mode={processing_mode}")

1402

1403

# Partition-specific processing logic

1404

return f"Processed {partition_key}"

1405

1406

# Dynamic partitioned configuration based on partition date

1407

def partition_config_fn(start: datetime, _end: datetime) -> dict:

1408

"""Generate configuration based on partition date."""

1409

1410

# Weekend processing uses different parameters

1411

is_weekend = start.weekday() >= 5

1412

1413

return {

1414

"ops": {

1415

"process_partition_data": {

1416

"config": {

1417

"batch_size": 500 if is_weekend else 1000, # Smaller batches on weekends

1418

"processing_mode": "maintenance" if is_weekend else "normal"

1419

}

1420

}

1421

},

1422

"resources": {

1423

"database": {

1424

"config": {

1425

"pool_size": 5 if is_weekend else 20, # Fewer connections on weekends

1426

"timeout": 60 if is_weekend else 30

1427

}

1428

}

1429

}

1430

}

1431

1432

partitioned_config = PartitionedConfig(

1433

run_config_for_partition_fn=partition_config_fn,

1434

partitions_def=daily_partitions

1435

)

1436

1437

@job(config=partitioned_config, partitions_def=daily_partitions)

1438

def daily_processing_job():

1439

"""Job with partition-specific configuration."""

1440

process_partition_data()

1441

1442

# Built-in partitioned config helpers

1443

daily_config = daily_partitioned_config(

1444

start_date="2023-01-01",

1445

timezone="UTC"

1446

)(partition_config_fn)

1447

1448

@job(config=daily_config)

1449

def daily_job_with_helper():

1450

process_partition_data()

1451

1452

# Business logic based partitioned config

1453

def business_partition_config(start: datetime, _end: datetime) -> dict:

1454

"""Configuration based on business calendar."""

1455

1456

# Check if it's end of month

1457

next_day = start + timedelta(days=1)

1458

is_month_end = next_day.day == 1

1459

1460

# Check if it's end of quarter

1461

is_quarter_end = (start.month in [3, 6, 9, 12] and

1462

start == (start.replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1))

1463

1464

return {

1465

"ops": {

1466

"process_partition_data": {

1467

"config": {

1468

"batch_size": 100 if is_quarter_end else 500 if is_month_end else 1000,

1469

"processing_mode": "quarter_end" if is_quarter_end else

1470

"month_end" if is_month_end else "normal"

1471

}

1472

}

1473

}

1474

}

1475

1476

business_config = PartitionedConfig(

1477

run_config_for_partition_fn=business_partition_config,

1478

partitions_def=daily_partitions

1479

)

1480

1481

@job(config=business_config, partitions_def=daily_partitions)

1482

def business_processing_job():

1483

"""Job with business calendar aware configuration."""

1484

process_partition_data()

1485

```

1486

1487

## Partition Utilities

1488

1489

### `PartitionKeyRange` { .api }

1490

1491

**Module:** `dagster._core.definitions.partitions.partition_key_range`

1492

**Type:** Class

1493

1494

Represents a range of partition keys for batch operations and queries.

1495

1496

```python

1497

from dagster import PartitionKeyRange, asset

1498

1499

@asset(partitions_def=daily_partitions)

1500

def batch_processed_data(context) -> pd.DataFrame:

1501

"""Asset that processes data in batches using partition ranges."""

1502

1503

# Get current partition

1504

current_partition = context.partition_key

1505

1506

# Create partition range for last 7 days

1507

end_date = datetime.strptime(current_partition, "%Y-%m-%d")

1508

start_date = end_date - timedelta(days=6)

1509

1510

partition_range = PartitionKeyRange(

1511

start=start_date.strftime("%Y-%m-%d"),

1512

end=current_partition

1513

)

1514

1515

# Get all partition keys in range

1516

partitions_def = context.assets_def.partitions_def

1517

partition_keys_in_range = partitions_def.get_partition_keys_in_range(partition_range)

1518

1519

context.log.info(f"Processing batch for partitions: {partition_keys_in_range}")

1520

1521

# Load data for all partitions in range

1522

all_data = []

1523

for partition_key in partition_keys_in_range:

1524

partition_data = load_data_for_partition(partition_key)

1525

all_data.append(partition_data)

1526

1527

# Combine all partition data

1528

combined_df = pd.concat(all_data, ignore_index=True)

1529

1530

context.add_output_metadata({

1531

"partition_range_start": partition_range.start,

1532

"partition_range_end": partition_range.end,

1533

"partitions_processed": len(partition_keys_in_range),

1534

"total_records": len(combined_df)

1535

})

1536

1537

return combined_df

1538

```

1539

1540

### `TimeWindow` { .api }

1541

1542

**Module:** `dagster._core.definitions.partitions.utils`

1543

**Type:** Class

1544

1545

Time window utility for time-based partition operations.

1546

1547

```python

1548

from dagster import TimeWindow, asset

1549

1550

@asset(partitions_def=hourly_partitions)

1551

def time_window_analysis(context) -> dict:

1552

"""Asset demonstrating TimeWindow usage."""

1553

1554

# Access partition time window

1555

time_window = context.partition_time_window

1556

1557

# TimeWindow properties

1558

start_time = time_window.start

1559

end_time = time_window.end

1560

duration = end_time - start_time

1561

1562

context.log.info(f"Processing time window: {start_time} to {end_time} (duration: {duration})")

1563

1564

# Time window operations

1565

analysis = {

1566

"window_start": start_time.isoformat(),

1567

"window_end": end_time.isoformat(),

1568

"duration_seconds": duration.total_seconds(),

1569

"duration_minutes": duration.total_seconds() / 60,

1570

"hour_of_day": start_time.hour,

1571

"day_of_week": start_time.weekday(),

1572

"is_business_hour": 9 <= start_time.hour < 18,

1573

"is_weekend": start_time.weekday() >= 5

1574

}

1575

1576

# Load data for time window

1577

query = f"""

1578

SELECT COUNT(*) as event_count, AVG(metric_value) as avg_metric

1579

FROM events

1580

WHERE timestamp >= '{start_time}'

1581

AND timestamp < '{end_time}'

1582

"""

1583

1584

result = execute_query(query)

1585

if result:

1586

analysis.update({

1587

"event_count": result[0]["event_count"],

1588

"avg_metric": result[0]["avg_metric"]

1589

})

1590

1591

return analysis

1592

```

1593

1594

This comprehensive partitions system enables efficient processing of large datasets through intelligent data division, automatic dependency management, and sophisticated mapping strategies. The system supports everything from simple daily partitions to complex multi-dimensional business logic partitioning with dynamic runtime partition management.

1595

1596

For automation with partitioned assets, see [Sensors and Schedules](./sensors-schedules.md). For execution contexts in partitioned environments, see [Execution and Contexts](./execution-contexts.md).