0
# Partitions System
1
2
This document covers Dagster's comprehensive partitioning system, including partition definitions, mappings, time-based partitions, dynamic partitions, and multi-dimensional partitioning. The partitions system enables efficient processing of large datasets by breaking them into manageable, parallelizable chunks.
3
4
## Partition Definitions
5
6
Partition definitions specify how data is divided into discrete, processable units with automatic dependency tracking and backfill capabilities.
7
8
### Base Partition Classes
9
10
#### `PartitionsDefinition` { .api }
11
12
**Module:** `dagster._core.definitions.partitions.definition`
13
**Type:** Abstract base class
14
15
Base class for all partition definitions with common partition operations.
16
17
```python
18
from dagster import PartitionsDefinition, asset, job, op
19
from typing import Sequence
20
import pandas as pd
21
22
class CustomPartitionsDefinition(PartitionsDefinition):
23
"""Custom partition definition implementation."""
24
25
def __init__(self, partition_keys: Sequence[str]):
26
self._partition_keys = partition_keys
27
28
@property
29
def partition_keys(self) -> Sequence[str]:
30
"""Return all partition keys."""
31
return self._partition_keys
32
33
def get_partition_keys_in_range(self, partition_key_range) -> Sequence[str]:
34
"""Get partition keys in specified range."""
35
start_idx = self._partition_keys.index(partition_key_range.start) if partition_key_range.start else 0
36
end_idx = self._partition_keys.index(partition_key_range.end) + 1 if partition_key_range.end else len(self._partition_keys)
37
return self._partition_keys[start_idx:end_idx]
38
39
# Custom business quarter partitions
40
quarters_partition_def = CustomPartitionsDefinition([
41
"2023-Q1", "2023-Q2", "2023-Q3", "2023-Q4",
42
"2024-Q1", "2024-Q2", "2024-Q3", "2024-Q4"
43
])
44
45
@asset(partitions_def=quarters_partition_def)
46
def quarterly_sales_report(context) -> pd.DataFrame:
47
"""Asset partitioned by business quarters."""
48
partition_key = context.partition_key
49
quarter, year = partition_key.split("-")
50
51
context.log.info(f"Processing {quarter} {year} sales report")
52
53
# Load data for specific quarter
54
quarter_num = int(quarter[1]) # Extract number from Q1, Q2, etc.
55
start_month = (quarter_num - 1) * 3 + 1
56
end_month = quarter_num * 3
57
58
query = f"""
59
SELECT * FROM sales
60
WHERE YEAR(date) = {year}
61
AND MONTH(date) BETWEEN {start_month} AND {end_month}
62
"""
63
64
return pd.read_sql(query, connection)
65
```
66
67
### Static Partitions
68
69
#### `StaticPartitionsDefinition` { .api }
70
71
**Module:** `dagster._core.definitions.partitions.definition`
72
**Type:** PartitionsDefinition subclass
73
74
Partition definition with a fixed set of partition keys.
75
76
```python
77
from dagster import StaticPartitionsDefinition, asset
78
79
# Geographic partitions
80
regions_partitions = StaticPartitionsDefinition([
81
"north_america", "europe", "asia_pacific", "latin_america", "africa"
82
])
83
84
@asset(partitions_def=regions_partitions)
85
def regional_sales_data(context) -> pd.DataFrame:
86
"""Sales data partitioned by geographic region."""
87
88
region = context.partition_key
89
context.log.info(f"Processing sales data for region: {region}")
90
91
# Load region-specific data
92
query = f"SELECT * FROM sales WHERE region = '{region}'"
93
df = pd.read_sql(query, connection)
94
95
# Add partition metadata
96
context.add_output_metadata({
97
"region": region,
98
"record_count": len(df),
99
"processing_date": pd.Timestamp.now().isoformat()
100
})
101
102
return df
103
104
# Product category partitions
105
product_categories = StaticPartitionsDefinition([
106
"electronics", "clothing", "books", "home_garden", "sports", "automotive"
107
])
108
109
@asset(partitions_def=product_categories)
110
def category_analytics(context, regional_sales_data: pd.DataFrame) -> dict:
111
"""Analytics partitioned by product category."""
112
113
category = context.partition_key
114
115
# Filter data for category
116
category_data = regional_sales_data[
117
regional_sales_data["category"] == category
118
]
119
120
analytics = {
121
"category": category,
122
"total_sales": category_data["amount"].sum(),
123
"transaction_count": len(category_data),
124
"avg_transaction": category_data["amount"].mean(),
125
"top_products": category_data.groupby("product")["amount"].sum().nlargest(5).to_dict()
126
}
127
128
context.add_output_metadata({
129
"category": category,
130
"total_sales": analytics["total_sales"],
131
"transaction_count": analytics["transaction_count"]
132
})
133
134
return analytics
135
136
# Environment-based partitions
137
environments_partitions = StaticPartitionsDefinition([
138
"development", "staging", "production"
139
])
140
141
@job(partitions_def=environments_partitions)
142
def deployment_job():
143
"""Job partitioned by deployment environment."""
144
145
@op
146
def deploy_to_environment(context):
147
environment = context.partition_key
148
149
context.log.info(f"Deploying to {environment} environment")
150
151
# Environment-specific deployment logic
152
if environment == "production":
153
# Additional validation and approvals
154
run_production_checks()
155
require_manual_approval()
156
157
deploy_application(environment)
158
run_health_checks(environment)
159
160
return f"Deployed to {environment}"
161
162
deploy_to_environment()
163
```
164
165
### Time-Based Partitions
166
167
#### `TimeWindowPartitionsDefinition` { .api }
168
169
**Module:** `dagster._core.definitions.partitions.definition`
170
**Type:** PartitionsDefinition subclass
171
172
Base class for time-based partitions with configurable time windows.
173
174
```python
175
from dagster import TimeWindowPartitionsDefinition, asset
176
from datetime import datetime, timedelta
177
178
# Custom hourly partitions with business hours only
179
business_hours_partitions = TimeWindowPartitionsDefinition(
180
start=datetime(2023, 1, 1, 9), # 9 AM
181
end=datetime(2024, 1, 1, 18), # 6 PM
182
cron_schedule="0 9-17 * * 1-5", # Business hours, weekdays only
183
timezone="America/New_York"
184
)
185
186
@asset(partitions_def=business_hours_partitions)
187
def business_hours_metrics(context) -> pd.DataFrame:
188
"""Metrics calculated only during business hours."""
189
190
time_window = context.partition_time_window
191
start_time = time_window.start
192
end_time = time_window.end
193
194
context.log.info(f"Processing metrics for {start_time} to {end_time}")
195
196
# Load data for specific time window
197
query = f"""
198
SELECT * FROM user_events
199
WHERE timestamp >= '{start_time}'
200
AND timestamp < '{end_time}'
201
"""
202
203
df = pd.read_sql(query, connection)
204
205
# Calculate business metrics
206
metrics = df.groupby("event_type").agg({
207
"user_id": "nunique",
208
"session_id": "nunique",
209
"timestamp": "count"
210
}).reset_index()
211
212
context.add_output_metadata({
213
"time_window_start": start_time.isoformat(),
214
"time_window_end": end_time.isoformat(),
215
"total_events": len(df),
216
"unique_users": df["user_id"].nunique()
217
})
218
219
return metrics
220
```
221
222
#### `DailyPartitionsDefinition` { .api }
223
224
**Module:** `dagster._core.definitions.partitions.definition`
225
**Type:** TimeWindowPartitionsDefinition subclass
226
227
Daily time-based partitions with automatic date-based partitioning.
228
229
```python
230
from dagster import DailyPartitionsDefinition, asset, AssetIn
231
import pandas as pd
232
233
# Basic daily partitions
234
daily_partitions = DailyPartitionsDefinition(
235
start_date="2023-01-01",
236
timezone="UTC"
237
)
238
239
@asset(partitions_def=daily_partitions)
240
def daily_user_events(context) -> pd.DataFrame:
241
"""Daily user events data."""
242
243
partition_date = context.partition_key # Format: "2023-01-01"
244
context.log.info(f"Processing user events for {partition_date}")
245
246
# Load events for specific date
247
query = f"""
248
SELECT user_id, event_type, timestamp, properties
249
FROM user_events
250
WHERE DATE(timestamp) = '{partition_date}'
251
"""
252
253
df = pd.read_sql(query, connection)
254
255
context.add_output_metadata({
256
"partition_date": partition_date,
257
"event_count": len(df),
258
"unique_users": df["user_id"].nunique(),
259
"event_types": df["event_type"].nunique()
260
})
261
262
return df
263
264
@asset(partitions_def=daily_partitions)
265
def daily_user_summary(context, daily_user_events: pd.DataFrame) -> pd.DataFrame:
266
"""Daily summary of user activity."""
267
268
partition_date = context.partition_key
269
270
# Aggregate events by user
271
user_summary = daily_user_events.groupby("user_id").agg({
272
"event_type": "count",
273
"timestamp": ["min", "max"]
274
}).reset_index()
275
276
user_summary.columns = ["user_id", "event_count", "first_event", "last_event"]
277
user_summary["session_duration"] = (
278
pd.to_datetime(user_summary["last_event"]) -
279
pd.to_datetime(user_summary["first_event"])
280
).dt.total_seconds() / 60 # Minutes
281
282
user_summary["date"] = partition_date
283
284
context.add_output_metadata({
285
"partition_date": partition_date,
286
"active_users": len(user_summary),
287
"avg_events_per_user": user_summary["event_count"].mean(),
288
"avg_session_duration_minutes": user_summary["session_duration"].mean()
289
})
290
291
return user_summary
292
293
# Daily partitions with custom start time
294
daily_partitions_6am = DailyPartitionsDefinition(
295
start_date="2023-01-01",
296
minute_offset=0,
297
hour_offset=6, # Partitions start at 6 AM
298
timezone="America/New_York"
299
)
300
301
@asset(partitions_def=daily_partitions_6am)
302
def business_day_data(context) -> pd.DataFrame:
303
"""Business day data starting at 6 AM."""
304
305
time_window = context.partition_time_window
306
business_day_start = time_window.start # 6 AM
307
business_day_end = time_window.end # Next day 6 AM
308
309
context.log.info(f"Processing business day from {business_day_start} to {business_day_end}")
310
311
query = f"""
312
SELECT * FROM transactions
313
WHERE timestamp >= '{business_day_start}'
314
AND timestamp < '{business_day_end}'
315
"""
316
317
return pd.read_sql(query, connection)
318
```
319
320
#### `HourlyPartitionsDefinition` { .api }
321
322
**Module:** `dagster._core.definitions.partitions.definition`
323
**Type:** TimeWindowPartitionsDefinition subclass
324
325
Hourly time-based partitions for high-frequency data processing.
326
327
```python
328
from dagster import HourlyPartitionsDefinition, asset
329
330
# Hourly partitions for real-time analytics
331
hourly_partitions = HourlyPartitionsDefinition(
332
start_date="2023-01-01-00:00",
333
timezone="UTC"
334
)
335
336
@asset(partitions_def=hourly_partitions)
337
def hourly_api_metrics(context) -> pd.DataFrame:
338
"""Hourly API metrics and performance data."""
339
340
partition_key = context.partition_key # Format: "2023-01-01-14:00"
341
time_window = context.partition_time_window
342
343
hour_start = time_window.start
344
hour_end = time_window.end
345
346
context.log.info(f"Processing API metrics for hour: {partition_key}")
347
348
# Load API logs for specific hour
349
query = f"""
350
SELECT endpoint, status_code, response_time, timestamp
351
FROM api_logs
352
WHERE timestamp >= '{hour_start}'
353
AND timestamp < '{hour_end}'
354
"""
355
356
df = pd.read_sql(query, connection)
357
358
if len(df) > 0:
359
# Calculate hourly metrics
360
metrics = df.groupby(["endpoint", "status_code"]).agg({
361
"response_time": ["count", "mean", "median", "p95"],
362
"timestamp": ["min", "max"]
363
}).reset_index()
364
365
# Flatten column names
366
metrics.columns = [
367
"endpoint", "status_code", "request_count",
368
"avg_response_time", "median_response_time", "p95_response_time",
369
"first_request", "last_request"
370
]
371
372
metrics["hour"] = partition_key
373
374
context.add_output_metadata({
375
"hour": partition_key,
376
"total_requests": len(df),
377
"unique_endpoints": df["endpoint"].nunique(),
378
"error_rate": len(df[df["status_code"] >= 400]) / len(df)
379
})
380
else:
381
# Handle empty hour
382
metrics = pd.DataFrame({
383
"hour": [partition_key],
384
"total_requests": [0],
385
"message": ["No API requests in this hour"]
386
})
387
388
context.add_output_metadata({
389
"hour": partition_key,
390
"total_requests": 0,
391
"status": "no_data"
392
})
393
394
return metrics
395
396
# Hourly partitions with business hours filter
397
business_hours_only = HourlyPartitionsDefinition(
398
start_date="2023-01-01-09:00",
399
end_date="2023-12-31-18:00",
400
timezone="America/New_York"
401
)
402
403
@asset(partitions_def=business_hours_only)
404
def business_hours_analytics(context) -> dict:
405
"""Analytics only during business hours."""
406
407
time_window = context.partition_time_window
408
hour = time_window.start.hour
409
410
# Only process during business hours (9 AM - 6 PM)
411
if 9 <= hour < 18:
412
context.log.info(f"Processing business hour analytics for {context.partition_key}")
413
414
# Business analytics logic
415
return {
416
"hour": context.partition_key,
417
"is_business_hour": True,
418
"analytics_data": "processed"
419
}
420
else:
421
context.log.info(f"Skipping non-business hour: {context.partition_key}")
422
return {
423
"hour": context.partition_key,
424
"is_business_hour": False,
425
"message": "Outside business hours"
426
}
427
```
428
429
#### `WeeklyPartitionsDefinition` { .api }
430
431
**Module:** `dagster._core.definitions.partitions.definition`
432
**Type:** TimeWindowPartitionsDefinition subclass
433
434
Weekly time-based partitions for weekly aggregations and reports.
435
436
```python
437
from dagster import WeeklyPartitionsDefinition, asset
438
439
# Weekly partitions starting on Monday
440
weekly_partitions = WeeklyPartitionsDefinition(
441
start_date="2023-01-02", # First Monday of 2023
442
timezone="UTC"
443
)
444
445
@asset(partitions_def=weekly_partitions)
446
def weekly_sales_summary(context) -> pd.DataFrame:
447
"""Weekly sales summary and trends."""
448
449
time_window = context.partition_time_window
450
week_start = time_window.start
451
week_end = time_window.end
452
453
partition_key = context.partition_key # Format: "2023-01-02"
454
455
context.log.info(f"Processing weekly sales summary for week {partition_key}")
456
context.log.info(f"Week span: {week_start} to {week_end}")
457
458
# Load sales data for the week
459
query = f"""
460
SELECT DATE(sale_date) as day, product_category, SUM(amount) as daily_sales
461
FROM sales
462
WHERE sale_date >= '{week_start.date()}'
463
AND sale_date < '{week_end.date()}'
464
GROUP BY DATE(sale_date), product_category
465
ORDER BY day, product_category
466
"""
467
468
df = pd.read_sql(query, connection)
469
470
if len(df) > 0:
471
# Calculate weekly aggregations
472
weekly_summary = df.groupby("product_category").agg({
473
"daily_sales": ["sum", "mean", "std", "count"]
474
}).reset_index()
475
476
weekly_summary.columns = [
477
"product_category", "total_weekly_sales",
478
"avg_daily_sales", "daily_sales_std", "days_with_sales"
479
]
480
481
weekly_summary["week_start"] = week_start.date()
482
weekly_summary["week_end"] = (week_end - timedelta(days=1)).date()
483
484
context.add_output_metadata({
485
"week_start": week_start.isoformat(),
486
"week_end": week_end.isoformat(),
487
"total_sales": df["daily_sales"].sum(),
488
"categories_sold": df["product_category"].nunique(),
489
"sales_days": df["day"].nunique()
490
})
491
else:
492
weekly_summary = pd.DataFrame({
493
"week_start": [week_start.date()],
494
"week_end": [(week_end - timedelta(days=1)).date()],
495
"message": ["No sales data for this week"]
496
})
497
498
context.add_output_metadata({
499
"week_start": week_start.isoformat(),
500
"week_end": week_end.isoformat(),
501
"total_sales": 0,
502
"status": "no_data"
503
})
504
505
return weekly_summary
506
507
@asset(partitions_def=weekly_partitions)
508
def weekly_user_cohorts(context, daily_user_summary: pd.DataFrame) -> pd.DataFrame:
509
"""Weekly user cohort analysis."""
510
511
time_window = context.partition_time_window
512
week_dates = pd.date_range(
513
start=time_window.start.date(),
514
end=(time_window.end - timedelta(days=1)).date(),
515
freq="D"
516
).strftime("%Y-%m-%d").tolist()
517
518
context.log.info(f"Analyzing user cohorts for dates: {week_dates}")
519
520
# Filter daily summaries for this week
521
week_data = daily_user_summary[
522
daily_user_summary["date"].isin(week_dates)
523
]
524
525
if len(week_data) > 0:
526
# Cohort analysis
527
cohort_analysis = week_data.groupby("user_id").agg({
528
"event_count": "sum",
529
"session_duration": "sum",
530
"date": ["count", "min", "max"]
531
}).reset_index()
532
533
cohort_analysis.columns = [
534
"user_id", "total_events", "total_session_minutes",
535
"active_days", "first_seen", "last_seen"
536
]
537
538
# User engagement segmentation
539
cohort_analysis["engagement_level"] = pd.cut(
540
cohort_analysis["total_events"],
541
bins=[0, 5, 20, 50, float("inf")],
542
labels=["low", "medium", "high", "power_user"]
543
)
544
545
cohort_analysis["week_start"] = time_window.start.date()
546
547
context.add_output_metadata({
548
"week_start": time_window.start.isoformat(),
549
"active_users": len(cohort_analysis),
550
"avg_events_per_user": cohort_analysis["total_events"].mean(),
551
"engagement_distribution": cohort_analysis["engagement_level"].value_counts().to_dict()
552
})
553
else:
554
cohort_analysis = pd.DataFrame({
555
"week_start": [time_window.start.date()],
556
"message": ["No user data for this week"]
557
})
558
559
context.add_output_metadata({
560
"week_start": time_window.start.isoformat(),
561
"active_users": 0,
562
"status": "no_data"
563
})
564
565
return cohort_analysis
566
```
567
568
#### `MonthlyPartitionsDefinition` { .api }
569
570
**Module:** `dagster._core.definitions.partitions.definition`
571
**Type:** TimeWindowPartitionsDefinition subclass
572
573
Monthly time-based partitions for monthly reporting and aggregations.
574
575
```python
576
from dagster import MonthlyPartitionsDefinition, asset
577
578
# Monthly partitions for financial reporting
579
monthly_partitions = MonthlyPartitionsDefinition(
580
start_date="2023-01-01",
581
timezone="UTC"
582
)
583
584
@asset(partitions_def=monthly_partitions)
585
def monthly_financial_report(context) -> pd.DataFrame:
586
"""Monthly financial report with comprehensive metrics."""
587
588
time_window = context.partition_time_window
589
month_start = time_window.start
590
month_end = time_window.end
591
592
partition_key = context.partition_key # Format: "2023-01-01"
593
month_name = month_start.strftime("%B %Y")
594
595
context.log.info(f"Processing financial report for {month_name}")
596
597
# Load financial data for the month
598
query = f"""
599
SELECT
600
department,
601
SUM(revenue) as total_revenue,
602
SUM(costs) as total_costs,
603
COUNT(DISTINCT customer_id) as unique_customers,
604
COUNT(*) as transaction_count
605
FROM financial_transactions
606
WHERE transaction_date >= '{month_start.date()}'
607
AND transaction_date < '{month_end.date()}'
608
GROUP BY department
609
"""
610
611
df = pd.read_sql(query, connection)
612
613
if len(df) > 0:
614
# Calculate financial metrics
615
df["profit"] = df["total_revenue"] - df["total_costs"]
616
df["profit_margin"] = df["profit"] / df["total_revenue"]
617
df["revenue_per_customer"] = df["total_revenue"] / df["unique_customers"]
618
df["avg_transaction_value"] = df["total_revenue"] / df["transaction_count"]
619
620
# Add month information
621
df["month"] = month_name
622
df["month_start"] = month_start.date()
623
df["month_end"] = (month_end - timedelta(days=1)).date()
624
625
# Company-wide totals
626
total_revenue = df["total_revenue"].sum()
627
total_profit = df["profit"].sum()
628
total_customers = df["unique_customers"].sum()
629
630
context.add_output_metadata({
631
"month": month_name,
632
"total_revenue": total_revenue,
633
"total_profit": total_profit,
634
"company_profit_margin": total_profit / total_revenue if total_revenue > 0 else 0,
635
"total_customers": total_customers,
636
"departments": len(df)
637
})
638
else:
639
df = pd.DataFrame({
640
"month": [month_name],
641
"message": ["No financial data for this month"]
642
})
643
644
context.add_output_metadata({
645
"month": month_name,
646
"total_revenue": 0,
647
"status": "no_data"
648
})
649
650
return df
651
652
# Fiscal year partitions (custom monthly with fiscal year start)
653
fiscal_monthly_partitions = MonthlyPartitionsDefinition(
654
start_date="2023-04-01", # Fiscal year starts in April
655
timezone="America/New_York"
656
)
657
658
@asset(partitions_def=fiscal_monthly_partitions)
659
def fiscal_monthly_budget_analysis(context) -> dict:
660
"""Fiscal year monthly budget analysis."""
661
662
time_window = context.partition_time_window
663
fiscal_month_start = time_window.start
664
665
# Determine fiscal year and month
666
if fiscal_month_start.month >= 4: # April-December
667
fiscal_year = fiscal_month_start.year
668
fiscal_month = fiscal_month_start.month - 3 # April = Month 1
669
else: # January-March
670
fiscal_year = fiscal_month_start.year - 1
671
fiscal_month = fiscal_month_start.month + 9 # January = Month 10
672
673
context.log.info(f"Processing FY{fiscal_year} Month {fiscal_month}")
674
675
budget_analysis = {
676
"fiscal_year": fiscal_year,
677
"fiscal_month": fiscal_month,
678
"calendar_month": fiscal_month_start.strftime("%B %Y"),
679
"quarter": f"Q{((fiscal_month - 1) // 3) + 1}",
680
"is_fiscal_year_end": fiscal_month == 12
681
}
682
683
context.add_output_metadata({
684
"fiscal_year": fiscal_year,
685
"fiscal_month": fiscal_month,
686
"calendar_month": fiscal_month_start.strftime("%B %Y"),
687
"quarter": budget_analysis["quarter"]
688
})
689
690
return budget_analysis
691
```
692
693
### Dynamic Partitions
694
695
#### `DynamicPartitionsDefinition` { .api }
696
697
**Module:** `dagster._core.definitions.partitions.definition`
698
**Type:** PartitionsDefinition subclass
699
700
Partition definition where partitions can be added and removed at runtime.
701
702
```python
703
from dagster import DynamicPartitionsDefinition, asset
704
from dagster import AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest
705
706
# Dynamic customer partitions
707
customers_partitions = DynamicPartitionsDefinition(name="customers")
708
709
@asset(partitions_def=customers_partitions)
710
def customer_analytics(context) -> pd.DataFrame:
711
"""Analytics for individual customers using dynamic partitions."""
712
713
customer_id = context.partition_key
714
context.log.info(f"Processing analytics for customer: {customer_id}")
715
716
# Load customer-specific data
717
query = f"""
718
SELECT
719
DATE(order_date) as date,
720
product_category,
721
SUM(order_value) as daily_spend
722
FROM orders
723
WHERE customer_id = '{customer_id}'
724
AND order_date >= CURRENT_DATE - INTERVAL 90 DAY
725
GROUP BY DATE(order_date), product_category
726
ORDER BY date DESC
727
"""
728
729
df = pd.read_sql(query, connection)
730
731
context.add_output_metadata({
732
"customer_id": customer_id,
733
"days_with_orders": len(df),
734
"total_spend": df["daily_spend"].sum() if len(df) > 0 else 0,
735
"categories_purchased": df["product_category"].nunique() if len(df) > 0 else 0
736
})
737
738
return df
739
740
# Sensor to dynamically add new customer partitions
741
@sensor(minimum_interval_seconds=3600) # Check hourly
742
def new_customer_sensor(context):
743
"""Add partitions for new customers."""
744
745
# Find customers added in the last hour
746
query = """
747
SELECT DISTINCT customer_id
748
FROM customers
749
WHERE created_at >= NOW() - INTERVAL 1 HOUR
750
"""
751
752
new_customers = execute_query(query)
753
754
if new_customers:
755
customer_ids = [row["customer_id"] for row in new_customers]
756
757
context.log.info(f"Adding partitions for {len(customer_ids)} new customers")
758
759
return AddDynamicPartitionsRequest(
760
partitions_def_name="customers",
761
partition_keys=customer_ids
762
)
763
764
return SkipReason("No new customers found")
765
766
# Job to clean up inactive customer partitions
767
@job
768
def cleanup_inactive_customers():
769
"""Remove partitions for inactive customers."""
770
771
@op
772
def identify_inactive_customers(context) -> list:
773
# Find customers with no activity in 6 months
774
query = """
775
SELECT DISTINCT c.customer_id
776
FROM customers c
777
LEFT JOIN orders o ON c.customer_id = o.customer_id
778
AND o.order_date >= CURRENT_DATE - INTERVAL 6 MONTH
779
WHERE o.customer_id IS NULL
780
AND c.created_at < CURRENT_DATE - INTERVAL 6 MONTH
781
"""
782
783
inactive_customers = execute_query(query)
784
customer_ids = [row["customer_id"] for row in inactive_customers]
785
786
context.log.info(f"Found {len(customer_ids)} inactive customers")
787
return customer_ids
788
789
@op
790
def remove_customer_partitions(context, inactive_customers: list):
791
if inactive_customers:
792
context.log.info(f"Removing partitions for {len(inactive_customers)} inactive customers")
793
794
return DeleteDynamicPartitionsRequest(
795
partitions_def_name="customers",
796
partition_keys=inactive_customers
797
)
798
else:
799
context.log.info("No inactive customers to remove")
800
return None
801
802
remove_customer_partitions(identify_inactive_customers())
803
804
# Dynamic experiment partitions for A/B testing
805
experiments_partitions = DynamicPartitionsDefinition(name="experiments")
806
807
@asset(partitions_def=experiments_partitions)
808
def experiment_results(context) -> dict:
809
"""Results analysis for A/B test experiments."""
810
811
experiment_id = context.partition_key
812
context.log.info(f"Analyzing results for experiment: {experiment_id}")
813
814
# Load experiment data
815
query = f"""
816
SELECT
817
variant,
818
COUNT(*) as participants,
819
SUM(converted) as conversions,
820
AVG(metric_value) as avg_metric
821
FROM experiment_data
822
WHERE experiment_id = '{experiment_id}'
823
GROUP BY variant
824
"""
825
826
df = pd.read_sql(query, connection)
827
828
if len(df) > 0:
829
# Calculate experiment results
830
results = {
831
"experiment_id": experiment_id,
832
"variants": df["variant"].tolist(),
833
"total_participants": df["participants"].sum(),
834
"results_by_variant": df.to_dict("records")
835
}
836
837
# Statistical significance testing
838
if len(df) == 2: # A/B test
839
control = df[df["variant"] == "control"].iloc[0] if len(df[df["variant"] == "control"]) > 0 else None
840
treatment = df[df["variant"] == "treatment"].iloc[0] if len(df[df["variant"] == "treatment"]) > 0 else None
841
842
if control is not None and treatment is not None:
843
control_rate = control["conversions"] / control["participants"]
844
treatment_rate = treatment["conversions"] / treatment["participants"]
845
lift = (treatment_rate - control_rate) / control_rate if control_rate > 0 else 0
846
847
results["lift"] = lift
848
results["control_conversion_rate"] = control_rate
849
results["treatment_conversion_rate"] = treatment_rate
850
851
context.add_output_metadata({
852
"experiment_id": experiment_id,
853
"total_participants": results["total_participants"],
854
"variants_count": len(df),
855
"lift": results.get("lift", "N/A")
856
})
857
else:
858
results = {
859
"experiment_id": experiment_id,
860
"error": "No data found for experiment"
861
}
862
863
context.add_output_metadata({
864
"experiment_id": experiment_id,
865
"total_participants": 0,
866
"status": "no_data"
867
})
868
869
return results
870
```
871
872
### Multi-Dimensional Partitions
873
874
#### `MultiPartitionsDefinition` { .api }
875
876
**Module:** `dagster._core.definitions.partitions.definition`
877
**Type:** PartitionsDefinition subclass
878
879
Partition definition combining multiple partition dimensions.
880
881
```python
882
from dagster import MultiPartitionsDefinition, MultiPartitionKey, asset
883
884
# Multi-dimensional partitioning: Date x Region
885
multi_partitions = MultiPartitionsDefinition({
886
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
887
"region": StaticPartitionsDefinition(["us", "eu", "asia"])
888
})
889
890
@asset(partitions_def=multi_partitions)
891
def regional_daily_sales(context) -> pd.DataFrame:
892
"""Sales data partitioned by both date and region."""
893
894
# Access multi-dimensional partition key
895
partition_key = context.partition_key
896
897
if isinstance(partition_key, MultiPartitionKey):
898
date_key = partition_key.keys_by_dimension["date"]
899
region_key = partition_key.keys_by_dimension["region"]
900
else:
901
# Handle string representation
902
date_key, region_key = partition_key.split("|")
903
904
context.log.info(f"Processing sales for {region_key} on {date_key}")
905
906
# Load region and date specific data
907
query = f"""
908
SELECT product_id, SUM(amount) as sales, COUNT(*) as transactions
909
FROM sales
910
WHERE DATE(sale_date) = '{date_key}'
911
AND region = '{region_key}'
912
GROUP BY product_id
913
"""
914
915
df = pd.read_sql(query, connection)
916
917
# Add partition information
918
df["partition_date"] = date_key
919
df["partition_region"] = region_key
920
921
context.add_output_metadata({
922
"partition_date": date_key,
923
"partition_region": region_key,
924
"total_sales": df["sales"].sum() if len(df) > 0 else 0,
925
"unique_products": len(df),
926
"total_transactions": df["transactions"].sum() if len(df) > 0 else 0
927
})
928
929
return df
930
931
# Three-dimensional partitioning: Date x Region x Product Category
932
three_dim_partitions = MultiPartitionsDefinition({
933
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
934
"region": StaticPartitionsDefinition(["us", "eu", "asia", "latam"]),
935
"category": StaticPartitionsDefinition(["electronics", "clothing", "books", "home"])
936
})
937
938
@asset(partitions_def=three_dim_partitions)
939
def detailed_sales_metrics(context) -> dict:
940
"""Highly granular sales metrics with three-dimensional partitioning."""
941
942
partition_key = context.partition_key
943
944
if isinstance(partition_key, MultiPartitionKey):
945
date_key = partition_key.keys_by_dimension["date"]
946
region_key = partition_key.keys_by_dimension["region"]
947
category_key = partition_key.keys_by_dimension["category"]
948
else:
949
# Parse string representation
950
parts = str(partition_key).split("|")
951
date_key, region_key, category_key = parts
952
953
context.log.info(f"Processing {category_key} sales in {region_key} on {date_key}")
954
955
# Highly specific query
956
query = f"""
957
SELECT
958
AVG(amount) as avg_sale_amount,
959
MIN(amount) as min_sale_amount,
960
MAX(amount) as max_sale_amount,
961
COUNT(*) as transaction_count,
962
SUM(amount) as total_sales,
963
COUNT(DISTINCT customer_id) as unique_customers
964
FROM sales
965
WHERE DATE(sale_date) = '{date_key}'
966
AND region = '{region_key}'
967
AND category = '{category_key}'
968
"""
969
970
result = execute_query(query)
971
972
if result and len(result) > 0:
973
metrics = result[0]
974
metrics.update({
975
"partition_date": date_key,
976
"partition_region": region_key,
977
"partition_category": category_key,
978
"has_data": metrics["transaction_count"] > 0
979
})
980
else:
981
metrics = {
982
"partition_date": date_key,
983
"partition_region": region_key,
984
"partition_category": category_key,
985
"has_data": False,
986
"transaction_count": 0,
987
"total_sales": 0
988
}
989
990
context.add_output_metadata({
991
"date": date_key,
992
"region": region_key,
993
"category": category_key,
994
"transaction_count": metrics["transaction_count"],
995
"total_sales": metrics["total_sales"]
996
})
997
998
return metrics
999
1000
# Complex multi-dimensional example with time and business dimensions
1001
business_multi_partitions = MultiPartitionsDefinition({
1002
"month": MonthlyPartitionsDefinition(start_date="2023-01-01"),
1003
"department": StaticPartitionsDefinition(["sales", "marketing", "engineering", "support"]),
1004
"metric_type": StaticPartitionsDefinition(["revenue", "costs", "headcount", "productivity"])
1005
})
1006
1007
@asset(partitions_def=business_multi_partitions)
1008
def department_kpi_metrics(context) -> dict:
1009
"""KPI metrics partitioned by month, department, and metric type."""
1010
1011
partition_key = context.partition_key
1012
1013
if isinstance(partition_key, MultiPartitionKey):
1014
month_key = partition_key.keys_by_dimension["month"]
1015
department_key = partition_key.keys_by_dimension["department"]
1016
metric_type_key = partition_key.keys_by_dimension["metric_type"]
1017
else:
1018
month_key, department_key, metric_type_key = str(partition_key).split("|")
1019
1020
context.log.info(f"Processing {metric_type_key} for {department_key} in {month_key}")
1021
1022
# Metric-specific data loading
1023
if metric_type_key == "revenue":
1024
query = f"""
1025
SELECT SUM(amount) as value, 'USD' as unit
1026
FROM revenue
1027
WHERE department = '{department_key}'
1028
AND YEAR(date) = YEAR('{month_key}')
1029
AND MONTH(date) = MONTH('{month_key}')
1030
"""
1031
elif metric_type_key == "costs":
1032
query = f"""
1033
SELECT SUM(cost) as value, 'USD' as unit
1034
FROM expenses
1035
WHERE department = '{department_key}'
1036
AND YEAR(date) = YEAR('{month_key}')
1037
AND MONTH(date) = MONTH('{month_key}')
1038
"""
1039
elif metric_type_key == "headcount":
1040
query = f"""
1041
SELECT COUNT(DISTINCT employee_id) as value, 'people' as unit
1042
FROM employees
1043
WHERE department = '{department_key}'
1044
AND start_date <= '{month_key}'
1045
AND (end_date IS NULL OR end_date > LAST_DAY('{month_key}'))
1046
"""
1047
elif metric_type_key == "productivity":
1048
query = f"""
1049
SELECT AVG(productivity_score) as value, 'score' as unit
1050
FROM productivity_metrics
1051
WHERE department = '{department_key}'
1052
AND YEAR(date) = YEAR('{month_key}')
1053
AND MONTH(date) = MONTH('{month_key}')
1054
"""
1055
1056
result = execute_query(query)
1057
1058
if result and len(result) > 0:
1059
kpi_data = {
1060
"month": month_key,
1061
"department": department_key,
1062
"metric_type": metric_type_key,
1063
"value": result[0]["value"] or 0,
1064
"unit": result[0]["unit"],
1065
"has_data": result[0]["value"] is not None
1066
}
1067
else:
1068
kpi_data = {
1069
"month": month_key,
1070
"department": department_key,
1071
"metric_type": metric_type_key,
1072
"value": 0,
1073
"unit": "unknown",
1074
"has_data": False
1075
}
1076
1077
context.add_output_metadata({
1078
"month": month_key,
1079
"department": department_key,
1080
"metric_type": metric_type_key,
1081
"value": kpi_data["value"],
1082
"unit": kpi_data["unit"]
1083
})
1084
1085
return kpi_data
1086
```
1087
1088
## Partition Mappings
1089
1090
Partition mappings define how partitioned assets depend on partitions of their upstream assets, enabling sophisticated dependency patterns.
1091
1092
### Base Partition Mapping
1093
1094
#### `PartitionMapping` { .api }
1095
1096
**Module:** `dagster._core.definitions.partitions.mapping`
1097
**Type:** Abstract base class
1098
1099
Base class for partition mapping strategies.
1100
1101
```python
1102
from dagster import PartitionMapping, PartitionKeyRange
1103
from typing import Optional, Sequence
1104
1105
class BusinessDayPartitionMapping(PartitionMapping):
1106
"""Custom partition mapping for business days only."""
1107
1108
def __init__(self, skip_weekends: bool = True):
1109
self.skip_weekends = skip_weekends
1110
1111
def get_upstream_mapped_partitions_result_for_partition_key(
1112
self, downstream_partition_key: str, downstream_partitions_def, upstream_partitions_def
1113
):
1114
"""Map downstream partition to upstream partitions."""
1115
1116
# Convert partition key to date
1117
from datetime import datetime, timedelta
1118
partition_date = datetime.strptime(downstream_partition_key, "%Y-%m-%d")
1119
1120
if self.skip_weekends and partition_date.weekday() >= 5: # Weekend
1121
# Skip weekends - no upstream dependency
1122
return PartitionKeyRange(start=None, end=None)
1123
1124
# For business days, map to same day
1125
return PartitionKeyRange(
1126
start=downstream_partition_key,
1127
end=downstream_partition_key
1128
)
1129
1130
def get_downstream_partitions_for_partition_key(
1131
self, upstream_partition_key: str, downstream_partitions_def, upstream_partitions_def
1132
):
1133
"""Map upstream partition to downstream partitions."""
1134
1135
# Business day mapping - same logic in reverse
1136
partition_date = datetime.strptime(upstream_partition_key, "%Y-%m-%d")
1137
1138
if self.skip_weekends and partition_date.weekday() >= 5:
1139
return []
1140
1141
return [upstream_partition_key]
1142
1143
# Usage of custom partition mapping
1144
@asset(partitions_def=daily_partitions)
1145
def raw_trading_data(context) -> pd.DataFrame:
1146
"""Raw trading data (all days)."""
1147
partition_date = context.partition_key
1148
return load_trading_data(partition_date)
1149
1150
@asset(
1151
partitions_def=daily_partitions,
1152
ins={"raw_trading_data": AssetIn(partition_mapping=BusinessDayPartitionMapping())}
1153
)
1154
def business_day_analysis(context, raw_trading_data: pd.DataFrame) -> pd.DataFrame:
1155
"""Analysis that only runs on business days."""
1156
return analyze_trading_data(raw_trading_data)
1157
```
1158
1159
### Built-in Partition Mappings
1160
1161
#### `IdentityPartitionMapping` { .api }
1162
1163
**Module:** `dagster._core.definitions.partitions.mapping`
1164
**Type:** PartitionMapping subclass
1165
1166
One-to-one partition mapping (default behavior).
1167
1168
```python
1169
from dagster import IdentityPartitionMapping, asset, AssetIn
1170
1171
@asset(partitions_def=daily_partitions)
1172
def daily_raw_data(context) -> pd.DataFrame:
1173
"""Daily raw data."""
1174
return load_raw_data(context.partition_key)
1175
1176
@asset(
1177
partitions_def=daily_partitions,
1178
ins={"daily_raw_data": AssetIn(partition_mapping=IdentityPartitionMapping())}
1179
)
1180
def daily_processed_data(context, daily_raw_data: pd.DataFrame) -> pd.DataFrame:
1181
"""Processed data with identity mapping (same partition)."""
1182
return process_data(daily_raw_data)
1183
```
1184
1185
#### `AllPartitionMapping` { .api }
1186
1187
**Module:** `dagster._core.definitions.partitions.mapping`
1188
**Type:** PartitionMapping subclass
1189
1190
Mapping where downstream partition depends on all upstream partitions.
1191
1192
```python
1193
from dagster import AllPartitionMapping, asset, AssetIn
1194
1195
@asset(
1196
ins={"daily_processed_data": AssetIn(partition_mapping=AllPartitionMapping())}
1197
)
1198
def complete_dataset_summary(context, daily_processed_data: pd.DataFrame) -> dict:
1199
"""Summary that depends on all daily data partitions."""
1200
1201
# This asset will wait for all partitions of daily_processed_data
1202
context.log.info(f"Creating summary from {len(daily_processed_data)} daily records")
1203
1204
return {
1205
"total_records": len(daily_processed_data),
1206
"date_range": {
1207
"start": daily_processed_data["date"].min(),
1208
"end": daily_processed_data["date"].max()
1209
},
1210
"summary_stats": daily_processed_data.describe().to_dict()
1211
}
1212
```
1213
1214
#### `TimeWindowPartitionMapping` { .api }
1215
1216
**Module:** `dagster._core.definitions.partitions.mapping`
1217
**Type:** PartitionMapping subclass
1218
1219
Time-based partition mapping with configurable time windows and offsets.
1220
1221
```python
1222
from dagster import TimeWindowPartitionMapping, asset, AssetIn
1223
1224
# Weekly summary depends on last 7 days of daily data
1225
@asset(
1226
partitions_def=weekly_partitions,
1227
ins={
1228
"daily_sales": AssetIn(
1229
partition_mapping=TimeWindowPartitionMapping(
1230
start_offset=-6, # 6 days before
1231
end_offset=0 # current day
1232
)
1233
)
1234
}
1235
)
1236
def weekly_sales_summary(context, daily_sales: pd.DataFrame) -> pd.DataFrame:
1237
"""Weekly summary from 7 days of daily data."""
1238
1239
time_window = context.partition_time_window
1240
context.log.info(f"Creating weekly summary for {time_window.start} to {time_window.end}")
1241
1242
# daily_sales contains data from last 7 days
1243
weekly_summary = daily_sales.groupby("product_category").agg({
1244
"sales_amount": "sum",
1245
"transaction_count": "sum"
1246
}).reset_index()
1247
1248
weekly_summary["week_start"] = time_window.start.date()
1249
weekly_summary["week_end"] = (time_window.end - timedelta(days=1)).date()
1250
1251
return weekly_summary
1252
1253
# Monthly report depends on previous month's data
1254
@asset(
1255
partitions_def=monthly_partitions,
1256
ins={
1257
"weekly_sales_summary": AssetIn(
1258
partition_mapping=TimeWindowPartitionMapping(
1259
start_offset=-1, # Previous month
1260
end_offset=-1
1261
)
1262
)
1263
}
1264
)
1265
def monthly_trends_analysis(context, weekly_sales_summary: pd.DataFrame) -> dict:
1266
"""Monthly trends based on previous month's data."""
1267
1268
partition_month = context.partition_time_window.start.strftime("%B %Y")
1269
previous_month = (context.partition_time_window.start - timedelta(days=1)).strftime("%B %Y")
1270
1271
context.log.info(f"Analyzing {partition_month} trends using {previous_month} data")
1272
1273
trends = {
1274
"analysis_month": partition_month,
1275
"data_month": previous_month,
1276
"weekly_growth_rate": calculate_growth_rate(weekly_sales_summary),
1277
"category_performance": analyze_category_trends(weekly_sales_summary)
1278
}
1279
1280
return trends
1281
```
1282
1283
#### `StaticPartitionMapping` { .api }
1284
1285
**Module:** `dagster._core.definitions.partitions.mapping`
1286
**Type:** PartitionMapping subclass
1287
1288
Static mapping to specific upstream partitions.
1289
1290
```python
1291
from dagster import StaticPartitionMapping, asset, AssetIn
1292
1293
# Asset that always depends on specific reference partitions
1294
@asset(
1295
partitions_def=regions_partitions,
1296
ins={
1297
"global_config": AssetIn(
1298
partition_mapping=StaticPartitionMapping({
1299
"north_america": ["config_v1"],
1300
"europe": ["config_v2"],
1301
"asia_pacific": ["config_v1"],
1302
"latin_america": ["config_v1"],
1303
"africa": ["config_v2"]
1304
})
1305
)
1306
}
1307
)
1308
def regional_processing(context, global_config: dict) -> pd.DataFrame:
1309
"""Regional processing with static config mapping."""
1310
1311
region = context.partition_key
1312
context.log.info(f"Processing {region} with config version {global_config.get('version')}")
1313
1314
return process_regional_data(region, global_config)
1315
```
1316
1317
#### `MultiPartitionMapping` { .api }
1318
1319
**Module:** `dagster._core.definitions.partitions.mapping`
1320
**Type:** PartitionMapping subclass
1321
1322
Mapping for multi-dimensional partitions with dimension-specific mappings.
1323
1324
```python
1325
from dagster import MultiPartitionMapping, DimensionPartitionMapping, asset, AssetIn
1326
1327
# Multi-partition mapping with different strategies per dimension
1328
multi_mapping = MultiPartitionMapping({
1329
"date": TimeWindowPartitionMapping(start_offset=-1, end_offset=0), # Yesterday and today
1330
"region": IdentityPartitionMapping() # Same region
1331
})
1332
1333
@asset(
1334
partitions_def=multi_partitions, # date x region
1335
ins={
1336
"regional_daily_sales": AssetIn(partition_mapping=multi_mapping)
1337
}
1338
)
1339
def two_day_regional_trends(context, regional_daily_sales: pd.DataFrame) -> dict:
1340
"""Trends analysis using 2 days of data for same region."""
1341
1342
partition_key = context.partition_key
1343
if isinstance(partition_key, MultiPartitionKey):
1344
date_key = partition_key.keys_by_dimension["date"]
1345
region_key = partition_key.keys_by_dimension["region"]
1346
1347
context.log.info(f"Analyzing 2-day trends for {region_key} on {date_key}")
1348
1349
# regional_daily_sales contains data from yesterday and today for this region
1350
if len(regional_daily_sales) >= 2:
1351
# Compare yesterday vs today
1352
yesterday_data = regional_daily_sales[regional_daily_sales["partition_date"] ==
1353
(datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")]
1354
today_data = regional_daily_sales[regional_daily_sales["partition_date"] == date_key]
1355
1356
trends = {
1357
"date": date_key,
1358
"region": region_key,
1359
"yesterday_sales": yesterday_data["sales"].sum() if len(yesterday_data) > 0 else 0,
1360
"today_sales": today_data["sales"].sum() if len(today_data) > 0 else 0,
1361
}
1362
1363
if trends["yesterday_sales"] > 0:
1364
trends["growth_rate"] = (trends["today_sales"] - trends["yesterday_sales"]) / trends["yesterday_sales"]
1365
else:
1366
trends["growth_rate"] = None
1367
else:
1368
trends = {
1369
"date": date_key,
1370
"region": region_key,
1371
"error": "Insufficient data for trend analysis"
1372
}
1373
1374
return trends
1375
```
1376
1377
## Partitioned Configuration
1378
1379
Partitioned configuration allows partition-specific configuration values, enabling different processing logic per partition.
1380
1381
### `PartitionedConfig` { .api }
1382
1383
**Module:** `dagster._core.definitions.partitions.partitioned_config`
1384
**Type:** Class
1385
1386
Configuration that varies by partition with dynamic config generation.
1387
1388
```python
1389
from dagster import PartitionedConfig, job, op, daily_partitioned_config
1390
1391
@op(config_schema={"batch_size": int, "processing_mode": str})
1392
def process_partition_data(context):
1393
"""Op with partition-specific configuration."""
1394
1395
config = context.op_config
1396
partition_key = context.partition_key
1397
1398
batch_size = config["batch_size"]
1399
processing_mode = config["processing_mode"]
1400
1401
context.log.info(f"Processing partition {partition_key} with batch_size={batch_size}, mode={processing_mode}")
1402
1403
# Partition-specific processing logic
1404
return f"Processed {partition_key}"
1405
1406
# Dynamic partitioned configuration based on partition date
1407
def partition_config_fn(start: datetime, _end: datetime) -> dict:
1408
"""Generate configuration based on partition date."""
1409
1410
# Weekend processing uses different parameters
1411
is_weekend = start.weekday() >= 5
1412
1413
return {
1414
"ops": {
1415
"process_partition_data": {
1416
"config": {
1417
"batch_size": 500 if is_weekend else 1000, # Smaller batches on weekends
1418
"processing_mode": "maintenance" if is_weekend else "normal"
1419
}
1420
}
1421
},
1422
"resources": {
1423
"database": {
1424
"config": {
1425
"pool_size": 5 if is_weekend else 20, # Fewer connections on weekends
1426
"timeout": 60 if is_weekend else 30
1427
}
1428
}
1429
}
1430
}
1431
1432
partitioned_config = PartitionedConfig(
1433
run_config_for_partition_fn=partition_config_fn,
1434
partitions_def=daily_partitions
1435
)
1436
1437
@job(config=partitioned_config, partitions_def=daily_partitions)
1438
def daily_processing_job():
1439
"""Job with partition-specific configuration."""
1440
process_partition_data()
1441
1442
# Built-in partitioned config helpers
1443
daily_config = daily_partitioned_config(
1444
start_date="2023-01-01",
1445
timezone="UTC"
1446
)(partition_config_fn)
1447
1448
@job(config=daily_config)
1449
def daily_job_with_helper():
1450
process_partition_data()
1451
1452
# Business logic based partitioned config
1453
def business_partition_config(start: datetime, _end: datetime) -> dict:
1454
"""Configuration based on business calendar."""
1455
1456
# Check if it's end of month
1457
next_day = start + timedelta(days=1)
1458
is_month_end = next_day.day == 1
1459
1460
# Check if it's end of quarter
1461
is_quarter_end = (start.month in [3, 6, 9, 12] and
1462
start == (start.replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1))
1463
1464
return {
1465
"ops": {
1466
"process_partition_data": {
1467
"config": {
1468
"batch_size": 100 if is_quarter_end else 500 if is_month_end else 1000,
1469
"processing_mode": "quarter_end" if is_quarter_end else
1470
"month_end" if is_month_end else "normal"
1471
}
1472
}
1473
}
1474
}
1475
1476
business_config = PartitionedConfig(
1477
run_config_for_partition_fn=business_partition_config,
1478
partitions_def=daily_partitions
1479
)
1480
1481
@job(config=business_config, partitions_def=daily_partitions)
1482
def business_processing_job():
1483
"""Job with business calendar aware configuration."""
1484
process_partition_data()
1485
```
1486
1487
## Partition Utilities
1488
1489
### `PartitionKeyRange` { .api }
1490
1491
**Module:** `dagster._core.definitions.partitions.partition_key_range`
1492
**Type:** Class
1493
1494
Represents a range of partition keys for batch operations and queries.
1495
1496
```python
1497
from dagster import PartitionKeyRange, asset
1498
1499
@asset(partitions_def=daily_partitions)
1500
def batch_processed_data(context) -> pd.DataFrame:
1501
"""Asset that processes data in batches using partition ranges."""
1502
1503
# Get current partition
1504
current_partition = context.partition_key
1505
1506
# Create partition range for last 7 days
1507
end_date = datetime.strptime(current_partition, "%Y-%m-%d")
1508
start_date = end_date - timedelta(days=6)
1509
1510
partition_range = PartitionKeyRange(
1511
start=start_date.strftime("%Y-%m-%d"),
1512
end=current_partition
1513
)
1514
1515
# Get all partition keys in range
1516
partitions_def = context.assets_def.partitions_def
1517
partition_keys_in_range = partitions_def.get_partition_keys_in_range(partition_range)
1518
1519
context.log.info(f"Processing batch for partitions: {partition_keys_in_range}")
1520
1521
# Load data for all partitions in range
1522
all_data = []
1523
for partition_key in partition_keys_in_range:
1524
partition_data = load_data_for_partition(partition_key)
1525
all_data.append(partition_data)
1526
1527
# Combine all partition data
1528
combined_df = pd.concat(all_data, ignore_index=True)
1529
1530
context.add_output_metadata({
1531
"partition_range_start": partition_range.start,
1532
"partition_range_end": partition_range.end,
1533
"partitions_processed": len(partition_keys_in_range),
1534
"total_records": len(combined_df)
1535
})
1536
1537
return combined_df
1538
```
1539
1540
### `TimeWindow` { .api }
1541
1542
**Module:** `dagster._core.definitions.partitions.utils`
1543
**Type:** Class
1544
1545
Time window utility for time-based partition operations.
1546
1547
```python
1548
from dagster import TimeWindow, asset
1549
1550
@asset(partitions_def=hourly_partitions)
1551
def time_window_analysis(context) -> dict:
1552
"""Asset demonstrating TimeWindow usage."""
1553
1554
# Access partition time window
1555
time_window = context.partition_time_window
1556
1557
# TimeWindow properties
1558
start_time = time_window.start
1559
end_time = time_window.end
1560
duration = end_time - start_time
1561
1562
context.log.info(f"Processing time window: {start_time} to {end_time} (duration: {duration})")
1563
1564
# Time window operations
1565
analysis = {
1566
"window_start": start_time.isoformat(),
1567
"window_end": end_time.isoformat(),
1568
"duration_seconds": duration.total_seconds(),
1569
"duration_minutes": duration.total_seconds() / 60,
1570
"hour_of_day": start_time.hour,
1571
"day_of_week": start_time.weekday(),
1572
"is_business_hour": 9 <= start_time.hour < 18,
1573
"is_weekend": start_time.weekday() >= 5
1574
}
1575
1576
# Load data for time window
1577
query = f"""
1578
SELECT COUNT(*) as event_count, AVG(metric_value) as avg_metric
1579
FROM events
1580
WHERE timestamp >= '{start_time}'
1581
AND timestamp < '{end_time}'
1582
"""
1583
1584
result = execute_query(query)
1585
if result:
1586
analysis.update({
1587
"event_count": result[0]["event_count"],
1588
"avg_metric": result[0]["avg_metric"]
1589
})
1590
1591
return analysis
1592
```
1593
1594
This comprehensive partitions system enables efficient processing of large datasets through intelligent data division, automatic dependency management, and sophisticated mapping strategies. The system supports everything from simple daily partitions to complex multi-dimensional business logic partitioning with dynamic runtime partition management.
1595
1596
For automation with partitioned assets, see [Sensors and Schedules](./sensors-schedules.md). For execution contexts in partitioned environments, see [Execution and Contexts](./execution-contexts.md).