A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers Dagster's comprehensive partitioning system, including partition definitions, mappings, time-based partitions, dynamic partitions, and multi-dimensional partitioning. The partitions system enables efficient processing of large datasets by breaking them into manageable, parallelizable chunks.
Partition definitions specify how data is divided into discrete, processable units with automatic dependency tracking and backfill capabilities.
PartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: Abstract base class
Base class for all partition definitions with common partition operations.
from dagster import PartitionsDefinition, asset, job, op
from typing import Sequence
import pandas as pd
class CustomPartitionsDefinition(PartitionsDefinition):
"""Custom partition definition implementation."""
def __init__(self, partition_keys: Sequence[str]):
self._partition_keys = partition_keys
@property
def partition_keys(self) -> Sequence[str]:
"""Return all partition keys."""
return self._partition_keys
def get_partition_keys_in_range(self, partition_key_range) -> Sequence[str]:
"""Get partition keys in specified range."""
start_idx = self._partition_keys.index(partition_key_range.start) if partition_key_range.start else 0
end_idx = self._partition_keys.index(partition_key_range.end) + 1 if partition_key_range.end else len(self._partition_keys)
return self._partition_keys[start_idx:end_idx]
# Custom business quarter partitions
quarters_partition_def = CustomPartitionsDefinition([
"2023-Q1", "2023-Q2", "2023-Q3", "2023-Q4",
"2024-Q1", "2024-Q2", "2024-Q3", "2024-Q4"
])
@asset(partitions_def=quarters_partition_def)
def quarterly_sales_report(context) -> pd.DataFrame:
"""Asset partitioned by business quarters."""
partition_key = context.partition_key
quarter, year = partition_key.split("-")
context.log.info(f"Processing {quarter} {year} sales report")
# Load data for specific quarter
quarter_num = int(quarter[1]) # Extract number from Q1, Q2, etc.
start_month = (quarter_num - 1) * 3 + 1
end_month = quarter_num * 3
query = f"""
SELECT * FROM sales
WHERE YEAR(date) = {year}
AND MONTH(date) BETWEEN {start_month} AND {end_month}
"""
return pd.read_sql(query, connection)StaticPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass
Partition definition with a fixed set of partition keys.
from dagster import StaticPartitionsDefinition, asset
# Geographic partitions
regions_partitions = StaticPartitionsDefinition([
"north_america", "europe", "asia_pacific", "latin_america", "africa"
])
@asset(partitions_def=regions_partitions)
def regional_sales_data(context) -> pd.DataFrame:
"""Sales data partitioned by geographic region."""
region = context.partition_key
context.log.info(f"Processing sales data for region: {region}")
# Load region-specific data
query = f"SELECT * FROM sales WHERE region = '{region}'"
df = pd.read_sql(query, connection)
# Add partition metadata
context.add_output_metadata({
"region": region,
"record_count": len(df),
"processing_date": pd.Timestamp.now().isoformat()
})
return df
# Product category partitions
product_categories = StaticPartitionsDefinition([
"electronics", "clothing", "books", "home_garden", "sports", "automotive"
])
@asset(partitions_def=product_categories)
def category_analytics(context, regional_sales_data: pd.DataFrame) -> dict:
"""Analytics partitioned by product category."""
category = context.partition_key
# Filter data for category
category_data = regional_sales_data[
regional_sales_data["category"] == category
]
analytics = {
"category": category,
"total_sales": category_data["amount"].sum(),
"transaction_count": len(category_data),
"avg_transaction": category_data["amount"].mean(),
"top_products": category_data.groupby("product")["amount"].sum().nlargest(5).to_dict()
}
context.add_output_metadata({
"category": category,
"total_sales": analytics["total_sales"],
"transaction_count": analytics["transaction_count"]
})
return analytics
# Environment-based partitions
environments_partitions = StaticPartitionsDefinition([
"development", "staging", "production"
])
@job(partitions_def=environments_partitions)
def deployment_job():
"""Job partitioned by deployment environment."""
@op
def deploy_to_environment(context):
environment = context.partition_key
context.log.info(f"Deploying to {environment} environment")
# Environment-specific deployment logic
if environment == "production":
# Additional validation and approvals
run_production_checks()
require_manual_approval()
deploy_application(environment)
run_health_checks(environment)
return f"Deployed to {environment}"
deploy_to_environment()TimeWindowPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass
Base class for time-based partitions with configurable time windows.
from dagster import TimeWindowPartitionsDefinition, asset
from datetime import datetime, timedelta
# Custom hourly partitions with business hours only
business_hours_partitions = TimeWindowPartitionsDefinition(
start=datetime(2023, 1, 1, 9), # 9 AM
end=datetime(2024, 1, 1, 18), # 6 PM
cron_schedule="0 9-17 * * 1-5", # Business hours, weekdays only
timezone="America/New_York"
)
@asset(partitions_def=business_hours_partitions)
def business_hours_metrics(context) -> pd.DataFrame:
"""Metrics calculated only during business hours."""
time_window = context.partition_time_window
start_time = time_window.start
end_time = time_window.end
context.log.info(f"Processing metrics for {start_time} to {end_time}")
# Load data for specific time window
query = f"""
SELECT * FROM user_events
WHERE timestamp >= '{start_time}'
AND timestamp < '{end_time}'
"""
df = pd.read_sql(query, connection)
# Calculate business metrics
metrics = df.groupby("event_type").agg({
"user_id": "nunique",
"session_id": "nunique",
"timestamp": "count"
}).reset_index()
context.add_output_metadata({
"time_window_start": start_time.isoformat(),
"time_window_end": end_time.isoformat(),
"total_events": len(df),
"unique_users": df["user_id"].nunique()
})
return metricsDailyPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass
Daily time-based partitions with automatic date-based partitioning.
from dagster import DailyPartitionsDefinition, asset, AssetIn
import pandas as pd
# Basic daily partitions
daily_partitions = DailyPartitionsDefinition(
start_date="2023-01-01",
timezone="UTC"
)
@asset(partitions_def=daily_partitions)
def daily_user_events(context) -> pd.DataFrame:
"""Daily user events data."""
partition_date = context.partition_key # Format: "2023-01-01"
context.log.info(f"Processing user events for {partition_date}")
# Load events for specific date
query = f"""
SELECT user_id, event_type, timestamp, properties
FROM user_events
WHERE DATE(timestamp) = '{partition_date}'
"""
df = pd.read_sql(query, connection)
context.add_output_metadata({
"partition_date": partition_date,
"event_count": len(df),
"unique_users": df["user_id"].nunique(),
"event_types": df["event_type"].nunique()
})
return df
@asset(partitions_def=daily_partitions)
def daily_user_summary(context, daily_user_events: pd.DataFrame) -> pd.DataFrame:
"""Daily summary of user activity."""
partition_date = context.partition_key
# Aggregate events by user
user_summary = daily_user_events.groupby("user_id").agg({
"event_type": "count",
"timestamp": ["min", "max"]
}).reset_index()
user_summary.columns = ["user_id", "event_count", "first_event", "last_event"]
user_summary["session_duration"] = (
pd.to_datetime(user_summary["last_event"]) -
pd.to_datetime(user_summary["first_event"])
).dt.total_seconds() / 60 # Minutes
user_summary["date"] = partition_date
context.add_output_metadata({
"partition_date": partition_date,
"active_users": len(user_summary),
"avg_events_per_user": user_summary["event_count"].mean(),
"avg_session_duration_minutes": user_summary["session_duration"].mean()
})
return user_summary
# Daily partitions with custom start time
daily_partitions_6am = DailyPartitionsDefinition(
start_date="2023-01-01",
minute_offset=0,
hour_offset=6, # Partitions start at 6 AM
timezone="America/New_York"
)
@asset(partitions_def=daily_partitions_6am)
def business_day_data(context) -> pd.DataFrame:
"""Business day data starting at 6 AM."""
time_window = context.partition_time_window
business_day_start = time_window.start # 6 AM
business_day_end = time_window.end # Next day 6 AM
context.log.info(f"Processing business day from {business_day_start} to {business_day_end}")
query = f"""
SELECT * FROM transactions
WHERE timestamp >= '{business_day_start}'
AND timestamp < '{business_day_end}'
"""
return pd.read_sql(query, connection)HourlyPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass
Hourly time-based partitions for high-frequency data processing.
from dagster import HourlyPartitionsDefinition, asset
# Hourly partitions for real-time analytics
hourly_partitions = HourlyPartitionsDefinition(
start_date="2023-01-01-00:00",
timezone="UTC"
)
@asset(partitions_def=hourly_partitions)
def hourly_api_metrics(context) -> pd.DataFrame:
"""Hourly API metrics and performance data."""
partition_key = context.partition_key # Format: "2023-01-01-14:00"
time_window = context.partition_time_window
hour_start = time_window.start
hour_end = time_window.end
context.log.info(f"Processing API metrics for hour: {partition_key}")
# Load API logs for specific hour
query = f"""
SELECT endpoint, status_code, response_time, timestamp
FROM api_logs
WHERE timestamp >= '{hour_start}'
AND timestamp < '{hour_end}'
"""
df = pd.read_sql(query, connection)
if len(df) > 0:
# Calculate hourly metrics
metrics = df.groupby(["endpoint", "status_code"]).agg({
"response_time": ["count", "mean", "median", "p95"],
"timestamp": ["min", "max"]
}).reset_index()
# Flatten column names
metrics.columns = [
"endpoint", "status_code", "request_count",
"avg_response_time", "median_response_time", "p95_response_time",
"first_request", "last_request"
]
metrics["hour"] = partition_key
context.add_output_metadata({
"hour": partition_key,
"total_requests": len(df),
"unique_endpoints": df["endpoint"].nunique(),
"error_rate": len(df[df["status_code"] >= 400]) / len(df)
})
else:
# Handle empty hour
metrics = pd.DataFrame({
"hour": [partition_key],
"total_requests": [0],
"message": ["No API requests in this hour"]
})
context.add_output_metadata({
"hour": partition_key,
"total_requests": 0,
"status": "no_data"
})
return metrics
# Hourly partitions with business hours filter
business_hours_only = HourlyPartitionsDefinition(
start_date="2023-01-01-09:00",
end_date="2023-12-31-18:00",
timezone="America/New_York"
)
@asset(partitions_def=business_hours_only)
def business_hours_analytics(context) -> dict:
"""Analytics only during business hours."""
time_window = context.partition_time_window
hour = time_window.start.hour
# Only process during business hours (9 AM - 6 PM)
if 9 <= hour < 18:
context.log.info(f"Processing business hour analytics for {context.partition_key}")
# Business analytics logic
return {
"hour": context.partition_key,
"is_business_hour": True,
"analytics_data": "processed"
}
else:
context.log.info(f"Skipping non-business hour: {context.partition_key}")
return {
"hour": context.partition_key,
"is_business_hour": False,
"message": "Outside business hours"
}WeeklyPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass
Weekly time-based partitions for weekly aggregations and reports.
from dagster import WeeklyPartitionsDefinition, asset
# Weekly partitions starting on Monday
weekly_partitions = WeeklyPartitionsDefinition(
start_date="2023-01-02", # First Monday of 2023
timezone="UTC"
)
@asset(partitions_def=weekly_partitions)
def weekly_sales_summary(context) -> pd.DataFrame:
"""Weekly sales summary and trends."""
time_window = context.partition_time_window
week_start = time_window.start
week_end = time_window.end
partition_key = context.partition_key # Format: "2023-01-02"
context.log.info(f"Processing weekly sales summary for week {partition_key}")
context.log.info(f"Week span: {week_start} to {week_end}")
# Load sales data for the week
query = f"""
SELECT DATE(sale_date) as day, product_category, SUM(amount) as daily_sales
FROM sales
WHERE sale_date >= '{week_start.date()}'
AND sale_date < '{week_end.date()}'
GROUP BY DATE(sale_date), product_category
ORDER BY day, product_category
"""
df = pd.read_sql(query, connection)
if len(df) > 0:
# Calculate weekly aggregations
weekly_summary = df.groupby("product_category").agg({
"daily_sales": ["sum", "mean", "std", "count"]
}).reset_index()
weekly_summary.columns = [
"product_category", "total_weekly_sales",
"avg_daily_sales", "daily_sales_std", "days_with_sales"
]
weekly_summary["week_start"] = week_start.date()
weekly_summary["week_end"] = (week_end - timedelta(days=1)).date()
context.add_output_metadata({
"week_start": week_start.isoformat(),
"week_end": week_end.isoformat(),
"total_sales": df["daily_sales"].sum(),
"categories_sold": df["product_category"].nunique(),
"sales_days": df["day"].nunique()
})
else:
weekly_summary = pd.DataFrame({
"week_start": [week_start.date()],
"week_end": [(week_end - timedelta(days=1)).date()],
"message": ["No sales data for this week"]
})
context.add_output_metadata({
"week_start": week_start.isoformat(),
"week_end": week_end.isoformat(),
"total_sales": 0,
"status": "no_data"
})
return weekly_summary
@asset(partitions_def=weekly_partitions)
def weekly_user_cohorts(context, daily_user_summary: pd.DataFrame) -> pd.DataFrame:
"""Weekly user cohort analysis."""
time_window = context.partition_time_window
week_dates = pd.date_range(
start=time_window.start.date(),
end=(time_window.end - timedelta(days=1)).date(),
freq="D"
).strftime("%Y-%m-%d").tolist()
context.log.info(f"Analyzing user cohorts for dates: {week_dates}")
# Filter daily summaries for this week
week_data = daily_user_summary[
daily_user_summary["date"].isin(week_dates)
]
if len(week_data) > 0:
# Cohort analysis
cohort_analysis = week_data.groupby("user_id").agg({
"event_count": "sum",
"session_duration": "sum",
"date": ["count", "min", "max"]
}).reset_index()
cohort_analysis.columns = [
"user_id", "total_events", "total_session_minutes",
"active_days", "first_seen", "last_seen"
]
# User engagement segmentation
cohort_analysis["engagement_level"] = pd.cut(
cohort_analysis["total_events"],
bins=[0, 5, 20, 50, float("inf")],
labels=["low", "medium", "high", "power_user"]
)
cohort_analysis["week_start"] = time_window.start.date()
context.add_output_metadata({
"week_start": time_window.start.isoformat(),
"active_users": len(cohort_analysis),
"avg_events_per_user": cohort_analysis["total_events"].mean(),
"engagement_distribution": cohort_analysis["engagement_level"].value_counts().to_dict()
})
else:
cohort_analysis = pd.DataFrame({
"week_start": [time_window.start.date()],
"message": ["No user data for this week"]
})
context.add_output_metadata({
"week_start": time_window.start.isoformat(),
"active_users": 0,
"status": "no_data"
})
return cohort_analysisMonthlyPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass
Monthly time-based partitions for monthly reporting and aggregations.
from dagster import MonthlyPartitionsDefinition, asset
# Monthly partitions for financial reporting
monthly_partitions = MonthlyPartitionsDefinition(
start_date="2023-01-01",
timezone="UTC"
)
@asset(partitions_def=monthly_partitions)
def monthly_financial_report(context) -> pd.DataFrame:
"""Monthly financial report with comprehensive metrics."""
time_window = context.partition_time_window
month_start = time_window.start
month_end = time_window.end
partition_key = context.partition_key # Format: "2023-01-01"
month_name = month_start.strftime("%B %Y")
context.log.info(f"Processing financial report for {month_name}")
# Load financial data for the month
query = f"""
SELECT
department,
SUM(revenue) as total_revenue,
SUM(costs) as total_costs,
COUNT(DISTINCT customer_id) as unique_customers,
COUNT(*) as transaction_count
FROM financial_transactions
WHERE transaction_date >= '{month_start.date()}'
AND transaction_date < '{month_end.date()}'
GROUP BY department
"""
df = pd.read_sql(query, connection)
if len(df) > 0:
# Calculate financial metrics
df["profit"] = df["total_revenue"] - df["total_costs"]
df["profit_margin"] = df["profit"] / df["total_revenue"]
df["revenue_per_customer"] = df["total_revenue"] / df["unique_customers"]
df["avg_transaction_value"] = df["total_revenue"] / df["transaction_count"]
# Add month information
df["month"] = month_name
df["month_start"] = month_start.date()
df["month_end"] = (month_end - timedelta(days=1)).date()
# Company-wide totals
total_revenue = df["total_revenue"].sum()
total_profit = df["profit"].sum()
total_customers = df["unique_customers"].sum()
context.add_output_metadata({
"month": month_name,
"total_revenue": total_revenue,
"total_profit": total_profit,
"company_profit_margin": total_profit / total_revenue if total_revenue > 0 else 0,
"total_customers": total_customers,
"departments": len(df)
})
else:
df = pd.DataFrame({
"month": [month_name],
"message": ["No financial data for this month"]
})
context.add_output_metadata({
"month": month_name,
"total_revenue": 0,
"status": "no_data"
})
return df
# Fiscal year partitions (custom monthly with fiscal year start)
fiscal_monthly_partitions = MonthlyPartitionsDefinition(
start_date="2023-04-01", # Fiscal year starts in April
timezone="America/New_York"
)
@asset(partitions_def=fiscal_monthly_partitions)
def fiscal_monthly_budget_analysis(context) -> dict:
"""Fiscal year monthly budget analysis."""
time_window = context.partition_time_window
fiscal_month_start = time_window.start
# Determine fiscal year and month
if fiscal_month_start.month >= 4: # April-December
fiscal_year = fiscal_month_start.year
fiscal_month = fiscal_month_start.month - 3 # April = Month 1
else: # January-March
fiscal_year = fiscal_month_start.year - 1
fiscal_month = fiscal_month_start.month + 9 # January = Month 10
context.log.info(f"Processing FY{fiscal_year} Month {fiscal_month}")
budget_analysis = {
"fiscal_year": fiscal_year,
"fiscal_month": fiscal_month,
"calendar_month": fiscal_month_start.strftime("%B %Y"),
"quarter": f"Q{((fiscal_month - 1) // 3) + 1}",
"is_fiscal_year_end": fiscal_month == 12
}
context.add_output_metadata({
"fiscal_year": fiscal_year,
"fiscal_month": fiscal_month,
"calendar_month": fiscal_month_start.strftime("%B %Y"),
"quarter": budget_analysis["quarter"]
})
return budget_analysisDynamicPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass
Partition definition where partitions can be added and removed at runtime.
from dagster import DynamicPartitionsDefinition, asset
from dagster import AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest
# Dynamic customer partitions
customers_partitions = DynamicPartitionsDefinition(name="customers")
@asset(partitions_def=customers_partitions)
def customer_analytics(context) -> pd.DataFrame:
"""Analytics for individual customers using dynamic partitions."""
customer_id = context.partition_key
context.log.info(f"Processing analytics for customer: {customer_id}")
# Load customer-specific data
query = f"""
SELECT
DATE(order_date) as date,
product_category,
SUM(order_value) as daily_spend
FROM orders
WHERE customer_id = '{customer_id}'
AND order_date >= CURRENT_DATE - INTERVAL 90 DAY
GROUP BY DATE(order_date), product_category
ORDER BY date DESC
"""
df = pd.read_sql(query, connection)
context.add_output_metadata({
"customer_id": customer_id,
"days_with_orders": len(df),
"total_spend": df["daily_spend"].sum() if len(df) > 0 else 0,
"categories_purchased": df["product_category"].nunique() if len(df) > 0 else 0
})
return df
# Sensor to dynamically add new customer partitions
@sensor(minimum_interval_seconds=3600) # Check hourly
def new_customer_sensor(context):
"""Add partitions for new customers."""
# Find customers added in the last hour
query = """
SELECT DISTINCT customer_id
FROM customers
WHERE created_at >= NOW() - INTERVAL 1 HOUR
"""
new_customers = execute_query(query)
if new_customers:
customer_ids = [row["customer_id"] for row in new_customers]
context.log.info(f"Adding partitions for {len(customer_ids)} new customers")
return AddDynamicPartitionsRequest(
partitions_def_name="customers",
partition_keys=customer_ids
)
return SkipReason("No new customers found")
# Job to clean up inactive customer partitions
@job
def cleanup_inactive_customers():
"""Remove partitions for inactive customers."""
@op
def identify_inactive_customers(context) -> list:
# Find customers with no activity in 6 months
query = """
SELECT DISTINCT c.customer_id
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
AND o.order_date >= CURRENT_DATE - INTERVAL 6 MONTH
WHERE o.customer_id IS NULL
AND c.created_at < CURRENT_DATE - INTERVAL 6 MONTH
"""
inactive_customers = execute_query(query)
customer_ids = [row["customer_id"] for row in inactive_customers]
context.log.info(f"Found {len(customer_ids)} inactive customers")
return customer_ids
@op
def remove_customer_partitions(context, inactive_customers: list):
if inactive_customers:
context.log.info(f"Removing partitions for {len(inactive_customers)} inactive customers")
return DeleteDynamicPartitionsRequest(
partitions_def_name="customers",
partition_keys=inactive_customers
)
else:
context.log.info("No inactive customers to remove")
return None
remove_customer_partitions(identify_inactive_customers())
# Dynamic experiment partitions for A/B testing
experiments_partitions = DynamicPartitionsDefinition(name="experiments")
@asset(partitions_def=experiments_partitions)
def experiment_results(context) -> dict:
"""Results analysis for A/B test experiments."""
experiment_id = context.partition_key
context.log.info(f"Analyzing results for experiment: {experiment_id}")
# Load experiment data
query = f"""
SELECT
variant,
COUNT(*) as participants,
SUM(converted) as conversions,
AVG(metric_value) as avg_metric
FROM experiment_data
WHERE experiment_id = '{experiment_id}'
GROUP BY variant
"""
df = pd.read_sql(query, connection)
if len(df) > 0:
# Calculate experiment results
results = {
"experiment_id": experiment_id,
"variants": df["variant"].tolist(),
"total_participants": df["participants"].sum(),
"results_by_variant": df.to_dict("records")
}
# Statistical significance testing
if len(df) == 2: # A/B test
control = df[df["variant"] == "control"].iloc[0] if len(df[df["variant"] == "control"]) > 0 else None
treatment = df[df["variant"] == "treatment"].iloc[0] if len(df[df["variant"] == "treatment"]) > 0 else None
if control is not None and treatment is not None:
control_rate = control["conversions"] / control["participants"]
treatment_rate = treatment["conversions"] / treatment["participants"]
lift = (treatment_rate - control_rate) / control_rate if control_rate > 0 else 0
results["lift"] = lift
results["control_conversion_rate"] = control_rate
results["treatment_conversion_rate"] = treatment_rate
context.add_output_metadata({
"experiment_id": experiment_id,
"total_participants": results["total_participants"],
"variants_count": len(df),
"lift": results.get("lift", "N/A")
})
else:
results = {
"experiment_id": experiment_id,
"error": "No data found for experiment"
}
context.add_output_metadata({
"experiment_id": experiment_id,
"total_participants": 0,
"status": "no_data"
})
return resultsMultiPartitionsDefinition { .api }Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass
Partition definition combining multiple partition dimensions.
from dagster import MultiPartitionsDefinition, MultiPartitionKey, asset
# Multi-dimensional partitioning: Date x Region
multi_partitions = MultiPartitionsDefinition({
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"region": StaticPartitionsDefinition(["us", "eu", "asia"])
})
@asset(partitions_def=multi_partitions)
def regional_daily_sales(context) -> pd.DataFrame:
"""Sales data partitioned by both date and region."""
# Access multi-dimensional partition key
partition_key = context.partition_key
if isinstance(partition_key, MultiPartitionKey):
date_key = partition_key.keys_by_dimension["date"]
region_key = partition_key.keys_by_dimension["region"]
else:
# Handle string representation
date_key, region_key = partition_key.split("|")
context.log.info(f"Processing sales for {region_key} on {date_key}")
# Load region and date specific data
query = f"""
SELECT product_id, SUM(amount) as sales, COUNT(*) as transactions
FROM sales
WHERE DATE(sale_date) = '{date_key}'
AND region = '{region_key}'
GROUP BY product_id
"""
df = pd.read_sql(query, connection)
# Add partition information
df["partition_date"] = date_key
df["partition_region"] = region_key
context.add_output_metadata({
"partition_date": date_key,
"partition_region": region_key,
"total_sales": df["sales"].sum() if len(df) > 0 else 0,
"unique_products": len(df),
"total_transactions": df["transactions"].sum() if len(df) > 0 else 0
})
return df
# Three-dimensional partitioning: Date x Region x Product Category
three_dim_partitions = MultiPartitionsDefinition({
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"region": StaticPartitionsDefinition(["us", "eu", "asia", "latam"]),
"category": StaticPartitionsDefinition(["electronics", "clothing", "books", "home"])
})
@asset(partitions_def=three_dim_partitions)
def detailed_sales_metrics(context) -> dict:
"""Highly granular sales metrics with three-dimensional partitioning."""
partition_key = context.partition_key
if isinstance(partition_key, MultiPartitionKey):
date_key = partition_key.keys_by_dimension["date"]
region_key = partition_key.keys_by_dimension["region"]
category_key = partition_key.keys_by_dimension["category"]
else:
# Parse string representation
parts = str(partition_key).split("|")
date_key, region_key, category_key = parts
context.log.info(f"Processing {category_key} sales in {region_key} on {date_key}")
# Highly specific query
query = f"""
SELECT
AVG(amount) as avg_sale_amount,
MIN(amount) as min_sale_amount,
MAX(amount) as max_sale_amount,
COUNT(*) as transaction_count,
SUM(amount) as total_sales,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales
WHERE DATE(sale_date) = '{date_key}'
AND region = '{region_key}'
AND category = '{category_key}'
"""
result = execute_query(query)
if result and len(result) > 0:
metrics = result[0]
metrics.update({
"partition_date": date_key,
"partition_region": region_key,
"partition_category": category_key,
"has_data": metrics["transaction_count"] > 0
})
else:
metrics = {
"partition_date": date_key,
"partition_region": region_key,
"partition_category": category_key,
"has_data": False,
"transaction_count": 0,
"total_sales": 0
}
context.add_output_metadata({
"date": date_key,
"region": region_key,
"category": category_key,
"transaction_count": metrics["transaction_count"],
"total_sales": metrics["total_sales"]
})
return metrics
# Complex multi-dimensional example with time and business dimensions
business_multi_partitions = MultiPartitionsDefinition({
"month": MonthlyPartitionsDefinition(start_date="2023-01-01"),
"department": StaticPartitionsDefinition(["sales", "marketing", "engineering", "support"]),
"metric_type": StaticPartitionsDefinition(["revenue", "costs", "headcount", "productivity"])
})
@asset(partitions_def=business_multi_partitions)
def department_kpi_metrics(context) -> dict:
"""KPI metrics partitioned by month, department, and metric type."""
partition_key = context.partition_key
if isinstance(partition_key, MultiPartitionKey):
month_key = partition_key.keys_by_dimension["month"]
department_key = partition_key.keys_by_dimension["department"]
metric_type_key = partition_key.keys_by_dimension["metric_type"]
else:
month_key, department_key, metric_type_key = str(partition_key).split("|")
context.log.info(f"Processing {metric_type_key} for {department_key} in {month_key}")
# Metric-specific data loading
if metric_type_key == "revenue":
query = f"""
SELECT SUM(amount) as value, 'USD' as unit
FROM revenue
WHERE department = '{department_key}'
AND YEAR(date) = YEAR('{month_key}')
AND MONTH(date) = MONTH('{month_key}')
"""
elif metric_type_key == "costs":
query = f"""
SELECT SUM(cost) as value, 'USD' as unit
FROM expenses
WHERE department = '{department_key}'
AND YEAR(date) = YEAR('{month_key}')
AND MONTH(date) = MONTH('{month_key}')
"""
elif metric_type_key == "headcount":
query = f"""
SELECT COUNT(DISTINCT employee_id) as value, 'people' as unit
FROM employees
WHERE department = '{department_key}'
AND start_date <= '{month_key}'
AND (end_date IS NULL OR end_date > LAST_DAY('{month_key}'))
"""
elif metric_type_key == "productivity":
query = f"""
SELECT AVG(productivity_score) as value, 'score' as unit
FROM productivity_metrics
WHERE department = '{department_key}'
AND YEAR(date) = YEAR('{month_key}')
AND MONTH(date) = MONTH('{month_key}')
"""
result = execute_query(query)
if result and len(result) > 0:
kpi_data = {
"month": month_key,
"department": department_key,
"metric_type": metric_type_key,
"value": result[0]["value"] or 0,
"unit": result[0]["unit"],
"has_data": result[0]["value"] is not None
}
else:
kpi_data = {
"month": month_key,
"department": department_key,
"metric_type": metric_type_key,
"value": 0,
"unit": "unknown",
"has_data": False
}
context.add_output_metadata({
"month": month_key,
"department": department_key,
"metric_type": metric_type_key,
"value": kpi_data["value"],
"unit": kpi_data["unit"]
})
return kpi_dataPartition mappings define how partitioned assets depend on partitions of their upstream assets, enabling sophisticated dependency patterns.
PartitionMapping { .api }Module: dagster._core.definitions.partitions.mapping
Type: Abstract base class
Base class for partition mapping strategies.
from dagster import PartitionMapping, PartitionKeyRange
from typing import Optional, Sequence
class BusinessDayPartitionMapping(PartitionMapping):
"""Custom partition mapping for business days only."""
def __init__(self, skip_weekends: bool = True):
self.skip_weekends = skip_weekends
def get_upstream_mapped_partitions_result_for_partition_key(
self, downstream_partition_key: str, downstream_partitions_def, upstream_partitions_def
):
"""Map downstream partition to upstream partitions."""
# Convert partition key to date
from datetime import datetime, timedelta
partition_date = datetime.strptime(downstream_partition_key, "%Y-%m-%d")
if self.skip_weekends and partition_date.weekday() >= 5: # Weekend
# Skip weekends - no upstream dependency
return PartitionKeyRange(start=None, end=None)
# For business days, map to same day
return PartitionKeyRange(
start=downstream_partition_key,
end=downstream_partition_key
)
def get_downstream_partitions_for_partition_key(
self, upstream_partition_key: str, downstream_partitions_def, upstream_partitions_def
):
"""Map upstream partition to downstream partitions."""
# Business day mapping - same logic in reverse
partition_date = datetime.strptime(upstream_partition_key, "%Y-%m-%d")
if self.skip_weekends and partition_date.weekday() >= 5:
return []
return [upstream_partition_key]
# Usage of custom partition mapping
@asset(partitions_def=daily_partitions)
def raw_trading_data(context) -> pd.DataFrame:
"""Raw trading data (all days)."""
partition_date = context.partition_key
return load_trading_data(partition_date)
@asset(
partitions_def=daily_partitions,
ins={"raw_trading_data": AssetIn(partition_mapping=BusinessDayPartitionMapping())}
)
def business_day_analysis(context, raw_trading_data: pd.DataFrame) -> pd.DataFrame:
"""Analysis that only runs on business days."""
return analyze_trading_data(raw_trading_data)IdentityPartitionMapping { .api }Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass
One-to-one partition mapping (default behavior).
from dagster import IdentityPartitionMapping, asset, AssetIn
@asset(partitions_def=daily_partitions)
def daily_raw_data(context) -> pd.DataFrame:
"""Daily raw data."""
return load_raw_data(context.partition_key)
@asset(
partitions_def=daily_partitions,
ins={"daily_raw_data": AssetIn(partition_mapping=IdentityPartitionMapping())}
)
def daily_processed_data(context, daily_raw_data: pd.DataFrame) -> pd.DataFrame:
"""Processed data with identity mapping (same partition)."""
return process_data(daily_raw_data)AllPartitionMapping { .api }Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass
Mapping where downstream partition depends on all upstream partitions.
from dagster import AllPartitionMapping, asset, AssetIn
@asset(
ins={"daily_processed_data": AssetIn(partition_mapping=AllPartitionMapping())}
)
def complete_dataset_summary(context, daily_processed_data: pd.DataFrame) -> dict:
"""Summary that depends on all daily data partitions."""
# This asset will wait for all partitions of daily_processed_data
context.log.info(f"Creating summary from {len(daily_processed_data)} daily records")
return {
"total_records": len(daily_processed_data),
"date_range": {
"start": daily_processed_data["date"].min(),
"end": daily_processed_data["date"].max()
},
"summary_stats": daily_processed_data.describe().to_dict()
}TimeWindowPartitionMapping { .api }Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass
Time-based partition mapping with configurable time windows and offsets.
from dagster import TimeWindowPartitionMapping, asset, AssetIn
# Weekly summary depends on last 7 days of daily data
@asset(
partitions_def=weekly_partitions,
ins={
"daily_sales": AssetIn(
partition_mapping=TimeWindowPartitionMapping(
start_offset=-6, # 6 days before
end_offset=0 # current day
)
)
}
)
def weekly_sales_summary(context, daily_sales: pd.DataFrame) -> pd.DataFrame:
"""Weekly summary from 7 days of daily data."""
time_window = context.partition_time_window
context.log.info(f"Creating weekly summary for {time_window.start} to {time_window.end}")
# daily_sales contains data from last 7 days
weekly_summary = daily_sales.groupby("product_category").agg({
"sales_amount": "sum",
"transaction_count": "sum"
}).reset_index()
weekly_summary["week_start"] = time_window.start.date()
weekly_summary["week_end"] = (time_window.end - timedelta(days=1)).date()
return weekly_summary
# Monthly report depends on previous month's data
@asset(
partitions_def=monthly_partitions,
ins={
"weekly_sales_summary": AssetIn(
partition_mapping=TimeWindowPartitionMapping(
start_offset=-1, # Previous month
end_offset=-1
)
)
}
)
def monthly_trends_analysis(context, weekly_sales_summary: pd.DataFrame) -> dict:
"""Monthly trends based on previous month's data."""
partition_month = context.partition_time_window.start.strftime("%B %Y")
previous_month = (context.partition_time_window.start - timedelta(days=1)).strftime("%B %Y")
context.log.info(f"Analyzing {partition_month} trends using {previous_month} data")
trends = {
"analysis_month": partition_month,
"data_month": previous_month,
"weekly_growth_rate": calculate_growth_rate(weekly_sales_summary),
"category_performance": analyze_category_trends(weekly_sales_summary)
}
return trendsStaticPartitionMapping { .api }Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass
Static mapping to specific upstream partitions.
from dagster import StaticPartitionMapping, asset, AssetIn
# Asset that always depends on specific reference partitions
@asset(
partitions_def=regions_partitions,
ins={
"global_config": AssetIn(
partition_mapping=StaticPartitionMapping({
"north_america": ["config_v1"],
"europe": ["config_v2"],
"asia_pacific": ["config_v1"],
"latin_america": ["config_v1"],
"africa": ["config_v2"]
})
)
}
)
def regional_processing(context, global_config: dict) -> pd.DataFrame:
"""Regional processing with static config mapping."""
region = context.partition_key
context.log.info(f"Processing {region} with config version {global_config.get('version')}")
return process_regional_data(region, global_config)MultiPartitionMapping { .api }Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass
Mapping for multi-dimensional partitions with dimension-specific mappings.
from dagster import MultiPartitionMapping, DimensionPartitionMapping, asset, AssetIn
# Multi-partition mapping with different strategies per dimension
multi_mapping = MultiPartitionMapping({
"date": TimeWindowPartitionMapping(start_offset=-1, end_offset=0), # Yesterday and today
"region": IdentityPartitionMapping() # Same region
})
@asset(
partitions_def=multi_partitions, # date x region
ins={
"regional_daily_sales": AssetIn(partition_mapping=multi_mapping)
}
)
def two_day_regional_trends(context, regional_daily_sales: pd.DataFrame) -> dict:
"""Trends analysis using 2 days of data for same region."""
partition_key = context.partition_key
if isinstance(partition_key, MultiPartitionKey):
date_key = partition_key.keys_by_dimension["date"]
region_key = partition_key.keys_by_dimension["region"]
context.log.info(f"Analyzing 2-day trends for {region_key} on {date_key}")
# regional_daily_sales contains data from yesterday and today for this region
if len(regional_daily_sales) >= 2:
# Compare yesterday vs today
yesterday_data = regional_daily_sales[regional_daily_sales["partition_date"] ==
(datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")]
today_data = regional_daily_sales[regional_daily_sales["partition_date"] == date_key]
trends = {
"date": date_key,
"region": region_key,
"yesterday_sales": yesterday_data["sales"].sum() if len(yesterday_data) > 0 else 0,
"today_sales": today_data["sales"].sum() if len(today_data) > 0 else 0,
}
if trends["yesterday_sales"] > 0:
trends["growth_rate"] = (trends["today_sales"] - trends["yesterday_sales"]) / trends["yesterday_sales"]
else:
trends["growth_rate"] = None
else:
trends = {
"date": date_key,
"region": region_key,
"error": "Insufficient data for trend analysis"
}
return trendsPartitioned configuration allows partition-specific configuration values, enabling different processing logic per partition.
PartitionedConfig { .api }Module: dagster._core.definitions.partitions.partitioned_config
Type: Class
Configuration that varies by partition with dynamic config generation.
from dagster import PartitionedConfig, job, op, daily_partitioned_config
@op(config_schema={"batch_size": int, "processing_mode": str})
def process_partition_data(context):
"""Op with partition-specific configuration."""
config = context.op_config
partition_key = context.partition_key
batch_size = config["batch_size"]
processing_mode = config["processing_mode"]
context.log.info(f"Processing partition {partition_key} with batch_size={batch_size}, mode={processing_mode}")
# Partition-specific processing logic
return f"Processed {partition_key}"
# Dynamic partitioned configuration based on partition date
def partition_config_fn(start: datetime, _end: datetime) -> dict:
"""Generate configuration based on partition date."""
# Weekend processing uses different parameters
is_weekend = start.weekday() >= 5
return {
"ops": {
"process_partition_data": {
"config": {
"batch_size": 500 if is_weekend else 1000, # Smaller batches on weekends
"processing_mode": "maintenance" if is_weekend else "normal"
}
}
},
"resources": {
"database": {
"config": {
"pool_size": 5 if is_weekend else 20, # Fewer connections on weekends
"timeout": 60 if is_weekend else 30
}
}
}
}
partitioned_config = PartitionedConfig(
run_config_for_partition_fn=partition_config_fn,
partitions_def=daily_partitions
)
@job(config=partitioned_config, partitions_def=daily_partitions)
def daily_processing_job():
"""Job with partition-specific configuration."""
process_partition_data()
# Built-in partitioned config helpers
daily_config = daily_partitioned_config(
start_date="2023-01-01",
timezone="UTC"
)(partition_config_fn)
@job(config=daily_config)
def daily_job_with_helper():
process_partition_data()
# Business logic based partitioned config
def business_partition_config(start: datetime, _end: datetime) -> dict:
"""Configuration based on business calendar."""
# Check if it's end of month
next_day = start + timedelta(days=1)
is_month_end = next_day.day == 1
# Check if it's end of quarter
is_quarter_end = (start.month in [3, 6, 9, 12] and
start == (start.replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1))
return {
"ops": {
"process_partition_data": {
"config": {
"batch_size": 100 if is_quarter_end else 500 if is_month_end else 1000,
"processing_mode": "quarter_end" if is_quarter_end else
"month_end" if is_month_end else "normal"
}
}
}
}
business_config = PartitionedConfig(
run_config_for_partition_fn=business_partition_config,
partitions_def=daily_partitions
)
@job(config=business_config, partitions_def=daily_partitions)
def business_processing_job():
"""Job with business calendar aware configuration."""
process_partition_data()PartitionKeyRange { .api }Module: dagster._core.definitions.partitions.partition_key_range
Type: Class
Represents a range of partition keys for batch operations and queries.
from dagster import PartitionKeyRange, asset
@asset(partitions_def=daily_partitions)
def batch_processed_data(context) -> pd.DataFrame:
"""Asset that processes data in batches using partition ranges."""
# Get current partition
current_partition = context.partition_key
# Create partition range for last 7 days
end_date = datetime.strptime(current_partition, "%Y-%m-%d")
start_date = end_date - timedelta(days=6)
partition_range = PartitionKeyRange(
start=start_date.strftime("%Y-%m-%d"),
end=current_partition
)
# Get all partition keys in range
partitions_def = context.assets_def.partitions_def
partition_keys_in_range = partitions_def.get_partition_keys_in_range(partition_range)
context.log.info(f"Processing batch for partitions: {partition_keys_in_range}")
# Load data for all partitions in range
all_data = []
for partition_key in partition_keys_in_range:
partition_data = load_data_for_partition(partition_key)
all_data.append(partition_data)
# Combine all partition data
combined_df = pd.concat(all_data, ignore_index=True)
context.add_output_metadata({
"partition_range_start": partition_range.start,
"partition_range_end": partition_range.end,
"partitions_processed": len(partition_keys_in_range),
"total_records": len(combined_df)
})
return combined_dfTimeWindow { .api }Module: dagster._core.definitions.partitions.utils
Type: Class
Time window utility for time-based partition operations.
from dagster import TimeWindow, asset
@asset(partitions_def=hourly_partitions)
def time_window_analysis(context) -> dict:
"""Asset demonstrating TimeWindow usage."""
# Access partition time window
time_window = context.partition_time_window
# TimeWindow properties
start_time = time_window.start
end_time = time_window.end
duration = end_time - start_time
context.log.info(f"Processing time window: {start_time} to {end_time} (duration: {duration})")
# Time window operations
analysis = {
"window_start": start_time.isoformat(),
"window_end": end_time.isoformat(),
"duration_seconds": duration.total_seconds(),
"duration_minutes": duration.total_seconds() / 60,
"hour_of_day": start_time.hour,
"day_of_week": start_time.weekday(),
"is_business_hour": 9 <= start_time.hour < 18,
"is_weekend": start_time.weekday() >= 5
}
# Load data for time window
query = f"""
SELECT COUNT(*) as event_count, AVG(metric_value) as avg_metric
FROM events
WHERE timestamp >= '{start_time}'
AND timestamp < '{end_time}'
"""
result = execute_query(query)
if result:
analysis.update({
"event_count": result[0]["event_count"],
"avg_metric": result[0]["avg_metric"]
})
return analysisThis comprehensive partitions system enables efficient processing of large datasets through intelligent data division, automatic dependency management, and sophisticated mapping strategies. The system supports everything from simple daily partitions to complex multi-dimensional business logic partitioning with dynamic runtime partition management.
For automation with partitioned assets, see Sensors and Schedules. For execution contexts in partitioned environments, see Execution and Contexts.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster