CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

partitions.mddocs/

Partitions System

This document covers Dagster's comprehensive partitioning system, including partition definitions, mappings, time-based partitions, dynamic partitions, and multi-dimensional partitioning. The partitions system enables efficient processing of large datasets by breaking them into manageable, parallelizable chunks.

Partition Definitions

Partition definitions specify how data is divided into discrete, processable units with automatic dependency tracking and backfill capabilities.

Base Partition Classes

PartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: Abstract base class

Base class for all partition definitions with common partition operations.

from dagster import PartitionsDefinition, asset, job, op
from typing import Sequence
import pandas as pd

class CustomPartitionsDefinition(PartitionsDefinition):
    """Custom partition definition implementation."""
    
    def __init__(self, partition_keys: Sequence[str]):
        self._partition_keys = partition_keys
    
    @property
    def partition_keys(self) -> Sequence[str]:
        """Return all partition keys."""
        return self._partition_keys
    
    def get_partition_keys_in_range(self, partition_key_range) -> Sequence[str]:
        """Get partition keys in specified range."""
        start_idx = self._partition_keys.index(partition_key_range.start) if partition_key_range.start else 0
        end_idx = self._partition_keys.index(partition_key_range.end) + 1 if partition_key_range.end else len(self._partition_keys)
        return self._partition_keys[start_idx:end_idx]

# Custom business quarter partitions
quarters_partition_def = CustomPartitionsDefinition([
    "2023-Q1", "2023-Q2", "2023-Q3", "2023-Q4",
    "2024-Q1", "2024-Q2", "2024-Q3", "2024-Q4"
])

@asset(partitions_def=quarters_partition_def)
def quarterly_sales_report(context) -> pd.DataFrame:
    """Asset partitioned by business quarters."""
    partition_key = context.partition_key
    quarter, year = partition_key.split("-")
    
    context.log.info(f"Processing {quarter} {year} sales report")
    
    # Load data for specific quarter
    quarter_num = int(quarter[1])  # Extract number from Q1, Q2, etc.
    start_month = (quarter_num - 1) * 3 + 1
    end_month = quarter_num * 3
    
    query = f"""
    SELECT * FROM sales 
    WHERE YEAR(date) = {year}
    AND MONTH(date) BETWEEN {start_month} AND {end_month}
    """
    
    return pd.read_sql(query, connection)

Static Partitions

StaticPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass

Partition definition with a fixed set of partition keys.

from dagster import StaticPartitionsDefinition, asset

# Geographic partitions
regions_partitions = StaticPartitionsDefinition([
    "north_america", "europe", "asia_pacific", "latin_america", "africa"
])

@asset(partitions_def=regions_partitions)
def regional_sales_data(context) -> pd.DataFrame:
    """Sales data partitioned by geographic region."""
    
    region = context.partition_key
    context.log.info(f"Processing sales data for region: {region}")
    
    # Load region-specific data
    query = f"SELECT * FROM sales WHERE region = '{region}'"
    df = pd.read_sql(query, connection)
    
    # Add partition metadata
    context.add_output_metadata({
        "region": region,
        "record_count": len(df),
        "processing_date": pd.Timestamp.now().isoformat()
    })
    
    return df

# Product category partitions
product_categories = StaticPartitionsDefinition([
    "electronics", "clothing", "books", "home_garden", "sports", "automotive"
])

@asset(partitions_def=product_categories)
def category_analytics(context, regional_sales_data: pd.DataFrame) -> dict:
    """Analytics partitioned by product category."""
    
    category = context.partition_key
    
    # Filter data for category
    category_data = regional_sales_data[
        regional_sales_data["category"] == category
    ]
    
    analytics = {
        "category": category,
        "total_sales": category_data["amount"].sum(),
        "transaction_count": len(category_data),
        "avg_transaction": category_data["amount"].mean(),
        "top_products": category_data.groupby("product")["amount"].sum().nlargest(5).to_dict()
    }
    
    context.add_output_metadata({
        "category": category,
        "total_sales": analytics["total_sales"],
        "transaction_count": analytics["transaction_count"]
    })
    
    return analytics

# Environment-based partitions
environments_partitions = StaticPartitionsDefinition([
    "development", "staging", "production"
])

@job(partitions_def=environments_partitions)
def deployment_job():
    """Job partitioned by deployment environment."""
    
    @op
    def deploy_to_environment(context):
        environment = context.partition_key
        
        context.log.info(f"Deploying to {environment} environment")
        
        # Environment-specific deployment logic
        if environment == "production":
            # Additional validation and approvals
            run_production_checks()
            require_manual_approval()
        
        deploy_application(environment)
        run_health_checks(environment)
        
        return f"Deployed to {environment}"
    
    deploy_to_environment()

Time-Based Partitions

TimeWindowPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass

Base class for time-based partitions with configurable time windows.

from dagster import TimeWindowPartitionsDefinition, asset
from datetime import datetime, timedelta

# Custom hourly partitions with business hours only
business_hours_partitions = TimeWindowPartitionsDefinition(
    start=datetime(2023, 1, 1, 9),  # 9 AM
    end=datetime(2024, 1, 1, 18),   # 6 PM  
    cron_schedule="0 9-17 * * 1-5",  # Business hours, weekdays only
    timezone="America/New_York"
)

@asset(partitions_def=business_hours_partitions)
def business_hours_metrics(context) -> pd.DataFrame:
    """Metrics calculated only during business hours."""
    
    time_window = context.partition_time_window
    start_time = time_window.start
    end_time = time_window.end
    
    context.log.info(f"Processing metrics for {start_time} to {end_time}")
    
    # Load data for specific time window
    query = f"""
    SELECT * FROM user_events 
    WHERE timestamp >= '{start_time}' 
    AND timestamp < '{end_time}'
    """
    
    df = pd.read_sql(query, connection)
    
    # Calculate business metrics
    metrics = df.groupby("event_type").agg({
        "user_id": "nunique",
        "session_id": "nunique", 
        "timestamp": "count"
    }).reset_index()
    
    context.add_output_metadata({
        "time_window_start": start_time.isoformat(),
        "time_window_end": end_time.isoformat(),
        "total_events": len(df),
        "unique_users": df["user_id"].nunique()
    })
    
    return metrics

DailyPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass

Daily time-based partitions with automatic date-based partitioning.

from dagster import DailyPartitionsDefinition, asset, AssetIn
import pandas as pd

# Basic daily partitions
daily_partitions = DailyPartitionsDefinition(
    start_date="2023-01-01",
    timezone="UTC"
)

@asset(partitions_def=daily_partitions)
def daily_user_events(context) -> pd.DataFrame:
    """Daily user events data."""
    
    partition_date = context.partition_key  # Format: "2023-01-01"
    context.log.info(f"Processing user events for {partition_date}")
    
    # Load events for specific date
    query = f"""
    SELECT user_id, event_type, timestamp, properties
    FROM user_events 
    WHERE DATE(timestamp) = '{partition_date}'
    """
    
    df = pd.read_sql(query, connection)
    
    context.add_output_metadata({
        "partition_date": partition_date,
        "event_count": len(df),
        "unique_users": df["user_id"].nunique(),
        "event_types": df["event_type"].nunique()
    })
    
    return df

@asset(partitions_def=daily_partitions)
def daily_user_summary(context, daily_user_events: pd.DataFrame) -> pd.DataFrame:
    """Daily summary of user activity."""
    
    partition_date = context.partition_key
    
    # Aggregate events by user
    user_summary = daily_user_events.groupby("user_id").agg({
        "event_type": "count",
        "timestamp": ["min", "max"]
    }).reset_index()
    
    user_summary.columns = ["user_id", "event_count", "first_event", "last_event"]
    user_summary["session_duration"] = (
        pd.to_datetime(user_summary["last_event"]) - 
        pd.to_datetime(user_summary["first_event"])
    ).dt.total_seconds() / 60  # Minutes
    
    user_summary["date"] = partition_date
    
    context.add_output_metadata({
        "partition_date": partition_date,
        "active_users": len(user_summary),
        "avg_events_per_user": user_summary["event_count"].mean(),
        "avg_session_duration_minutes": user_summary["session_duration"].mean()
    })
    
    return user_summary

# Daily partitions with custom start time
daily_partitions_6am = DailyPartitionsDefinition(
    start_date="2023-01-01",
    minute_offset=0,
    hour_offset=6,  # Partitions start at 6 AM
    timezone="America/New_York"
)

@asset(partitions_def=daily_partitions_6am)
def business_day_data(context) -> pd.DataFrame:
    """Business day data starting at 6 AM."""
    
    time_window = context.partition_time_window
    business_day_start = time_window.start  # 6 AM
    business_day_end = time_window.end      # Next day 6 AM
    
    context.log.info(f"Processing business day from {business_day_start} to {business_day_end}")
    
    query = f"""
    SELECT * FROM transactions 
    WHERE timestamp >= '{business_day_start}'
    AND timestamp < '{business_day_end}'
    """
    
    return pd.read_sql(query, connection)

HourlyPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass

Hourly time-based partitions for high-frequency data processing.

from dagster import HourlyPartitionsDefinition, asset

# Hourly partitions for real-time analytics
hourly_partitions = HourlyPartitionsDefinition(
    start_date="2023-01-01-00:00",
    timezone="UTC"
)

@asset(partitions_def=hourly_partitions)
def hourly_api_metrics(context) -> pd.DataFrame:
    """Hourly API metrics and performance data."""
    
    partition_key = context.partition_key  # Format: "2023-01-01-14:00"
    time_window = context.partition_time_window
    
    hour_start = time_window.start
    hour_end = time_window.end
    
    context.log.info(f"Processing API metrics for hour: {partition_key}")
    
    # Load API logs for specific hour
    query = f"""
    SELECT endpoint, status_code, response_time, timestamp
    FROM api_logs 
    WHERE timestamp >= '{hour_start}'
    AND timestamp < '{hour_end}'
    """
    
    df = pd.read_sql(query, connection)
    
    if len(df) > 0:
        # Calculate hourly metrics
        metrics = df.groupby(["endpoint", "status_code"]).agg({
            "response_time": ["count", "mean", "median", "p95"],
            "timestamp": ["min", "max"]
        }).reset_index()
        
        # Flatten column names
        metrics.columns = [
            "endpoint", "status_code", "request_count", 
            "avg_response_time", "median_response_time", "p95_response_time",
            "first_request", "last_request"
        ]
        
        metrics["hour"] = partition_key
        
        context.add_output_metadata({
            "hour": partition_key,
            "total_requests": len(df),
            "unique_endpoints": df["endpoint"].nunique(),
            "error_rate": len(df[df["status_code"] >= 400]) / len(df)
        })
    else:
        # Handle empty hour
        metrics = pd.DataFrame({
            "hour": [partition_key],
            "total_requests": [0],
            "message": ["No API requests in this hour"]
        })
        
        context.add_output_metadata({
            "hour": partition_key,
            "total_requests": 0,
            "status": "no_data"
        })
    
    return metrics

# Hourly partitions with business hours filter
business_hours_only = HourlyPartitionsDefinition(
    start_date="2023-01-01-09:00",
    end_date="2023-12-31-18:00", 
    timezone="America/New_York"
)

@asset(partitions_def=business_hours_only)
def business_hours_analytics(context) -> dict:
    """Analytics only during business hours."""
    
    time_window = context.partition_time_window
    hour = time_window.start.hour
    
    # Only process during business hours (9 AM - 6 PM)
    if 9 <= hour < 18:
        context.log.info(f"Processing business hour analytics for {context.partition_key}")
        
        # Business analytics logic
        return {
            "hour": context.partition_key,
            "is_business_hour": True,
            "analytics_data": "processed"
        }
    else:
        context.log.info(f"Skipping non-business hour: {context.partition_key}")
        return {
            "hour": context.partition_key,
            "is_business_hour": False,
            "message": "Outside business hours"
        }

WeeklyPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass

Weekly time-based partitions for weekly aggregations and reports.

from dagster import WeeklyPartitionsDefinition, asset

# Weekly partitions starting on Monday
weekly_partitions = WeeklyPartitionsDefinition(
    start_date="2023-01-02",  # First Monday of 2023
    timezone="UTC"
)

@asset(partitions_def=weekly_partitions)
def weekly_sales_summary(context) -> pd.DataFrame:
    """Weekly sales summary and trends."""
    
    time_window = context.partition_time_window
    week_start = time_window.start
    week_end = time_window.end
    
    partition_key = context.partition_key  # Format: "2023-01-02"
    
    context.log.info(f"Processing weekly sales summary for week {partition_key}")
    context.log.info(f"Week span: {week_start} to {week_end}")
    
    # Load sales data for the week
    query = f"""
    SELECT DATE(sale_date) as day, product_category, SUM(amount) as daily_sales
    FROM sales 
    WHERE sale_date >= '{week_start.date()}'
    AND sale_date < '{week_end.date()}'
    GROUP BY DATE(sale_date), product_category
    ORDER BY day, product_category
    """
    
    df = pd.read_sql(query, connection)
    
    if len(df) > 0:
        # Calculate weekly aggregations
        weekly_summary = df.groupby("product_category").agg({
            "daily_sales": ["sum", "mean", "std", "count"]
        }).reset_index()
        
        weekly_summary.columns = [
            "product_category", "total_weekly_sales", 
            "avg_daily_sales", "daily_sales_std", "days_with_sales"
        ]
        
        weekly_summary["week_start"] = week_start.date()
        weekly_summary["week_end"] = (week_end - timedelta(days=1)).date()
        
        context.add_output_metadata({
            "week_start": week_start.isoformat(),
            "week_end": week_end.isoformat(),
            "total_sales": df["daily_sales"].sum(),
            "categories_sold": df["product_category"].nunique(),
            "sales_days": df["day"].nunique()
        })
    else:
        weekly_summary = pd.DataFrame({
            "week_start": [week_start.date()],
            "week_end": [(week_end - timedelta(days=1)).date()],
            "message": ["No sales data for this week"]
        })
        
        context.add_output_metadata({
            "week_start": week_start.isoformat(),
            "week_end": week_end.isoformat(),
            "total_sales": 0,
            "status": "no_data"
        })
    
    return weekly_summary

@asset(partitions_def=weekly_partitions)
def weekly_user_cohorts(context, daily_user_summary: pd.DataFrame) -> pd.DataFrame:
    """Weekly user cohort analysis."""
    
    time_window = context.partition_time_window
    week_dates = pd.date_range(
        start=time_window.start.date(),
        end=(time_window.end - timedelta(days=1)).date(),
        freq="D"
    ).strftime("%Y-%m-%d").tolist()
    
    context.log.info(f"Analyzing user cohorts for dates: {week_dates}")
    
    # Filter daily summaries for this week
    week_data = daily_user_summary[
        daily_user_summary["date"].isin(week_dates)
    ]
    
    if len(week_data) > 0:
        # Cohort analysis
        cohort_analysis = week_data.groupby("user_id").agg({
            "event_count": "sum",
            "session_duration": "sum", 
            "date": ["count", "min", "max"]
        }).reset_index()
        
        cohort_analysis.columns = [
            "user_id", "total_events", "total_session_minutes",
            "active_days", "first_seen", "last_seen"
        ]
        
        # User engagement segmentation
        cohort_analysis["engagement_level"] = pd.cut(
            cohort_analysis["total_events"],
            bins=[0, 5, 20, 50, float("inf")],
            labels=["low", "medium", "high", "power_user"]
        )
        
        cohort_analysis["week_start"] = time_window.start.date()
        
        context.add_output_metadata({
            "week_start": time_window.start.isoformat(),
            "active_users": len(cohort_analysis),
            "avg_events_per_user": cohort_analysis["total_events"].mean(),
            "engagement_distribution": cohort_analysis["engagement_level"].value_counts().to_dict()
        })
    else:
        cohort_analysis = pd.DataFrame({
            "week_start": [time_window.start.date()],
            "message": ["No user data for this week"]
        })
        
        context.add_output_metadata({
            "week_start": time_window.start.isoformat(),
            "active_users": 0,
            "status": "no_data"
        })
    
    return cohort_analysis

MonthlyPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: TimeWindowPartitionsDefinition subclass

Monthly time-based partitions for monthly reporting and aggregations.

from dagster import MonthlyPartitionsDefinition, asset

# Monthly partitions for financial reporting
monthly_partitions = MonthlyPartitionsDefinition(
    start_date="2023-01-01",
    timezone="UTC"
)

@asset(partitions_def=monthly_partitions)
def monthly_financial_report(context) -> pd.DataFrame:
    """Monthly financial report with comprehensive metrics."""
    
    time_window = context.partition_time_window
    month_start = time_window.start
    month_end = time_window.end
    
    partition_key = context.partition_key  # Format: "2023-01-01"
    month_name = month_start.strftime("%B %Y")
    
    context.log.info(f"Processing financial report for {month_name}")
    
    # Load financial data for the month
    query = f"""
    SELECT 
        department,
        SUM(revenue) as total_revenue,
        SUM(costs) as total_costs,
        COUNT(DISTINCT customer_id) as unique_customers,
        COUNT(*) as transaction_count
    FROM financial_transactions 
    WHERE transaction_date >= '{month_start.date()}'
    AND transaction_date < '{month_end.date()}'
    GROUP BY department
    """
    
    df = pd.read_sql(query, connection)
    
    if len(df) > 0:
        # Calculate financial metrics
        df["profit"] = df["total_revenue"] - df["total_costs"]
        df["profit_margin"] = df["profit"] / df["total_revenue"]
        df["revenue_per_customer"] = df["total_revenue"] / df["unique_customers"]
        df["avg_transaction_value"] = df["total_revenue"] / df["transaction_count"]
        
        # Add month information
        df["month"] = month_name
        df["month_start"] = month_start.date()
        df["month_end"] = (month_end - timedelta(days=1)).date()
        
        # Company-wide totals
        total_revenue = df["total_revenue"].sum()
        total_profit = df["profit"].sum()
        total_customers = df["unique_customers"].sum()
        
        context.add_output_metadata({
            "month": month_name,
            "total_revenue": total_revenue,
            "total_profit": total_profit,
            "company_profit_margin": total_profit / total_revenue if total_revenue > 0 else 0,
            "total_customers": total_customers,
            "departments": len(df)
        })
    else:
        df = pd.DataFrame({
            "month": [month_name],
            "message": ["No financial data for this month"]
        })
        
        context.add_output_metadata({
            "month": month_name,
            "total_revenue": 0,
            "status": "no_data"
        })
    
    return df

# Fiscal year partitions (custom monthly with fiscal year start)
fiscal_monthly_partitions = MonthlyPartitionsDefinition(
    start_date="2023-04-01",  # Fiscal year starts in April
    timezone="America/New_York"
)

@asset(partitions_def=fiscal_monthly_partitions)
def fiscal_monthly_budget_analysis(context) -> dict:
    """Fiscal year monthly budget analysis."""
    
    time_window = context.partition_time_window
    fiscal_month_start = time_window.start
    
    # Determine fiscal year and month
    if fiscal_month_start.month >= 4:  # April-December
        fiscal_year = fiscal_month_start.year
        fiscal_month = fiscal_month_start.month - 3  # April = Month 1
    else:  # January-March 
        fiscal_year = fiscal_month_start.year - 1
        fiscal_month = fiscal_month_start.month + 9  # January = Month 10
    
    context.log.info(f"Processing FY{fiscal_year} Month {fiscal_month}")
    
    budget_analysis = {
        "fiscal_year": fiscal_year,
        "fiscal_month": fiscal_month,
        "calendar_month": fiscal_month_start.strftime("%B %Y"),
        "quarter": f"Q{((fiscal_month - 1) // 3) + 1}",
        "is_fiscal_year_end": fiscal_month == 12
    }
    
    context.add_output_metadata({
        "fiscal_year": fiscal_year,
        "fiscal_month": fiscal_month,
        "calendar_month": fiscal_month_start.strftime("%B %Y"),
        "quarter": budget_analysis["quarter"]
    })
    
    return budget_analysis

Dynamic Partitions

DynamicPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass

Partition definition where partitions can be added and removed at runtime.

from dagster import DynamicPartitionsDefinition, asset
from dagster import AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest

# Dynamic customer partitions
customers_partitions = DynamicPartitionsDefinition(name="customers")

@asset(partitions_def=customers_partitions)
def customer_analytics(context) -> pd.DataFrame:
    """Analytics for individual customers using dynamic partitions."""
    
    customer_id = context.partition_key
    context.log.info(f"Processing analytics for customer: {customer_id}")
    
    # Load customer-specific data
    query = f"""
    SELECT 
        DATE(order_date) as date,
        product_category,
        SUM(order_value) as daily_spend
    FROM orders 
    WHERE customer_id = '{customer_id}'
    AND order_date >= CURRENT_DATE - INTERVAL 90 DAY
    GROUP BY DATE(order_date), product_category
    ORDER BY date DESC
    """
    
    df = pd.read_sql(query, connection)
    
    context.add_output_metadata({
        "customer_id": customer_id,
        "days_with_orders": len(df),
        "total_spend": df["daily_spend"].sum() if len(df) > 0 else 0,
        "categories_purchased": df["product_category"].nunique() if len(df) > 0 else 0
    })
    
    return df

# Sensor to dynamically add new customer partitions
@sensor(minimum_interval_seconds=3600)  # Check hourly
def new_customer_sensor(context):
    """Add partitions for new customers."""
    
    # Find customers added in the last hour
    query = """
    SELECT DISTINCT customer_id 
    FROM customers 
    WHERE created_at >= NOW() - INTERVAL 1 HOUR
    """
    
    new_customers = execute_query(query)
    
    if new_customers:
        customer_ids = [row["customer_id"] for row in new_customers]
        
        context.log.info(f"Adding partitions for {len(customer_ids)} new customers")
        
        return AddDynamicPartitionsRequest(
            partitions_def_name="customers",
            partition_keys=customer_ids
        )
    
    return SkipReason("No new customers found")

# Job to clean up inactive customer partitions
@job
def cleanup_inactive_customers():
    """Remove partitions for inactive customers."""
    
    @op
    def identify_inactive_customers(context) -> list:
        # Find customers with no activity in 6 months
        query = """
        SELECT DISTINCT c.customer_id
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id 
            AND o.order_date >= CURRENT_DATE - INTERVAL 6 MONTH
        WHERE o.customer_id IS NULL
        AND c.created_at < CURRENT_DATE - INTERVAL 6 MONTH
        """
        
        inactive_customers = execute_query(query)
        customer_ids = [row["customer_id"] for row in inactive_customers]
        
        context.log.info(f"Found {len(customer_ids)} inactive customers")
        return customer_ids
    
    @op
    def remove_customer_partitions(context, inactive_customers: list):
        if inactive_customers:
            context.log.info(f"Removing partitions for {len(inactive_customers)} inactive customers")
            
            return DeleteDynamicPartitionsRequest(
                partitions_def_name="customers",
                partition_keys=inactive_customers
            )
        else:
            context.log.info("No inactive customers to remove")
            return None
    
    remove_customer_partitions(identify_inactive_customers())

# Dynamic experiment partitions for A/B testing
experiments_partitions = DynamicPartitionsDefinition(name="experiments")

@asset(partitions_def=experiments_partitions)
def experiment_results(context) -> dict:
    """Results analysis for A/B test experiments."""
    
    experiment_id = context.partition_key
    context.log.info(f"Analyzing results for experiment: {experiment_id}")
    
    # Load experiment data
    query = f"""
    SELECT 
        variant,
        COUNT(*) as participants,
        SUM(converted) as conversions,
        AVG(metric_value) as avg_metric
    FROM experiment_data 
    WHERE experiment_id = '{experiment_id}'
    GROUP BY variant
    """
    
    df = pd.read_sql(query, connection)
    
    if len(df) > 0:
        # Calculate experiment results
        results = {
            "experiment_id": experiment_id,
            "variants": df["variant"].tolist(),
            "total_participants": df["participants"].sum(),
            "results_by_variant": df.to_dict("records")
        }
        
        # Statistical significance testing
        if len(df) == 2:  # A/B test
            control = df[df["variant"] == "control"].iloc[0] if len(df[df["variant"] == "control"]) > 0 else None
            treatment = df[df["variant"] == "treatment"].iloc[0] if len(df[df["variant"] == "treatment"]) > 0 else None
            
            if control is not None and treatment is not None:
                control_rate = control["conversions"] / control["participants"]
                treatment_rate = treatment["conversions"] / treatment["participants"]
                lift = (treatment_rate - control_rate) / control_rate if control_rate > 0 else 0
                
                results["lift"] = lift
                results["control_conversion_rate"] = control_rate
                results["treatment_conversion_rate"] = treatment_rate
        
        context.add_output_metadata({
            "experiment_id": experiment_id,
            "total_participants": results["total_participants"],
            "variants_count": len(df),
            "lift": results.get("lift", "N/A")
        })
    else:
        results = {
            "experiment_id": experiment_id,
            "error": "No data found for experiment"
        }
        
        context.add_output_metadata({
            "experiment_id": experiment_id,
            "total_participants": 0,
            "status": "no_data"
        })
    
    return results

Multi-Dimensional Partitions

MultiPartitionsDefinition { .api }

Module: dagster._core.definitions.partitions.definition
Type: PartitionsDefinition subclass

Partition definition combining multiple partition dimensions.

from dagster import MultiPartitionsDefinition, MultiPartitionKey, asset

# Multi-dimensional partitioning: Date x Region
multi_partitions = MultiPartitionsDefinition({
    "date": DailyPartitionsDefinition(start_date="2023-01-01"),
    "region": StaticPartitionsDefinition(["us", "eu", "asia"])
})

@asset(partitions_def=multi_partitions)
def regional_daily_sales(context) -> pd.DataFrame:
    """Sales data partitioned by both date and region."""
    
    # Access multi-dimensional partition key
    partition_key = context.partition_key
    
    if isinstance(partition_key, MultiPartitionKey):
        date_key = partition_key.keys_by_dimension["date"]
        region_key = partition_key.keys_by_dimension["region"]
    else:
        # Handle string representation
        date_key, region_key = partition_key.split("|")
    
    context.log.info(f"Processing sales for {region_key} on {date_key}")
    
    # Load region and date specific data
    query = f"""
    SELECT product_id, SUM(amount) as sales, COUNT(*) as transactions
    FROM sales 
    WHERE DATE(sale_date) = '{date_key}'
    AND region = '{region_key}'
    GROUP BY product_id
    """
    
    df = pd.read_sql(query, connection)
    
    # Add partition information
    df["partition_date"] = date_key
    df["partition_region"] = region_key
    
    context.add_output_metadata({
        "partition_date": date_key,
        "partition_region": region_key,
        "total_sales": df["sales"].sum() if len(df) > 0 else 0,
        "unique_products": len(df),
        "total_transactions": df["transactions"].sum() if len(df) > 0 else 0
    })
    
    return df

# Three-dimensional partitioning: Date x Region x Product Category
three_dim_partitions = MultiPartitionsDefinition({
    "date": DailyPartitionsDefinition(start_date="2023-01-01"),
    "region": StaticPartitionsDefinition(["us", "eu", "asia", "latam"]),
    "category": StaticPartitionsDefinition(["electronics", "clothing", "books", "home"])
})

@asset(partitions_def=three_dim_partitions)
def detailed_sales_metrics(context) -> dict:
    """Highly granular sales metrics with three-dimensional partitioning."""
    
    partition_key = context.partition_key
    
    if isinstance(partition_key, MultiPartitionKey):
        date_key = partition_key.keys_by_dimension["date"]
        region_key = partition_key.keys_by_dimension["region"]
        category_key = partition_key.keys_by_dimension["category"]
    else:
        # Parse string representation
        parts = str(partition_key).split("|")
        date_key, region_key, category_key = parts
    
    context.log.info(f"Processing {category_key} sales in {region_key} on {date_key}")
    
    # Highly specific query
    query = f"""
    SELECT 
        AVG(amount) as avg_sale_amount,
        MIN(amount) as min_sale_amount,
        MAX(amount) as max_sale_amount,
        COUNT(*) as transaction_count,
        SUM(amount) as total_sales,
        COUNT(DISTINCT customer_id) as unique_customers
    FROM sales 
    WHERE DATE(sale_date) = '{date_key}'
    AND region = '{region_key}'
    AND category = '{category_key}'
    """
    
    result = execute_query(query)
    
    if result and len(result) > 0:
        metrics = result[0]
        metrics.update({
            "partition_date": date_key,
            "partition_region": region_key,
            "partition_category": category_key,
            "has_data": metrics["transaction_count"] > 0
        })
    else:
        metrics = {
            "partition_date": date_key,
            "partition_region": region_key,
            "partition_category": category_key,
            "has_data": False,
            "transaction_count": 0,
            "total_sales": 0
        }
    
    context.add_output_metadata({
        "date": date_key,
        "region": region_key,
        "category": category_key,
        "transaction_count": metrics["transaction_count"],
        "total_sales": metrics["total_sales"]
    })
    
    return metrics

# Complex multi-dimensional example with time and business dimensions
business_multi_partitions = MultiPartitionsDefinition({
    "month": MonthlyPartitionsDefinition(start_date="2023-01-01"),
    "department": StaticPartitionsDefinition(["sales", "marketing", "engineering", "support"]),
    "metric_type": StaticPartitionsDefinition(["revenue", "costs", "headcount", "productivity"])
})

@asset(partitions_def=business_multi_partitions)
def department_kpi_metrics(context) -> dict:
    """KPI metrics partitioned by month, department, and metric type."""
    
    partition_key = context.partition_key
    
    if isinstance(partition_key, MultiPartitionKey):
        month_key = partition_key.keys_by_dimension["month"]
        department_key = partition_key.keys_by_dimension["department"]
        metric_type_key = partition_key.keys_by_dimension["metric_type"]
    else:
        month_key, department_key, metric_type_key = str(partition_key).split("|")
    
    context.log.info(f"Processing {metric_type_key} for {department_key} in {month_key}")
    
    # Metric-specific data loading
    if metric_type_key == "revenue":
        query = f"""
        SELECT SUM(amount) as value, 'USD' as unit
        FROM revenue 
        WHERE department = '{department_key}'
        AND YEAR(date) = YEAR('{month_key}')
        AND MONTH(date) = MONTH('{month_key}')
        """
    elif metric_type_key == "costs":
        query = f"""
        SELECT SUM(cost) as value, 'USD' as unit
        FROM expenses 
        WHERE department = '{department_key}'
        AND YEAR(date) = YEAR('{month_key}')
        AND MONTH(date) = MONTH('{month_key}')
        """
    elif metric_type_key == "headcount":
        query = f"""
        SELECT COUNT(DISTINCT employee_id) as value, 'people' as unit
        FROM employees 
        WHERE department = '{department_key}'
        AND start_date <= '{month_key}'
        AND (end_date IS NULL OR end_date > LAST_DAY('{month_key}'))
        """
    elif metric_type_key == "productivity":
        query = f"""
        SELECT AVG(productivity_score) as value, 'score' as unit
        FROM productivity_metrics 
        WHERE department = '{department_key}'
        AND YEAR(date) = YEAR('{month_key}')
        AND MONTH(date) = MONTH('{month_key}')
        """
    
    result = execute_query(query)
    
    if result and len(result) > 0:
        kpi_data = {
            "month": month_key,
            "department": department_key,
            "metric_type": metric_type_key,
            "value": result[0]["value"] or 0,
            "unit": result[0]["unit"],
            "has_data": result[0]["value"] is not None
        }
    else:
        kpi_data = {
            "month": month_key,
            "department": department_key,
            "metric_type": metric_type_key,
            "value": 0,
            "unit": "unknown",
            "has_data": False
        }
    
    context.add_output_metadata({
        "month": month_key,
        "department": department_key,
        "metric_type": metric_type_key,
        "value": kpi_data["value"],
        "unit": kpi_data["unit"]
    })
    
    return kpi_data

Partition Mappings

Partition mappings define how partitioned assets depend on partitions of their upstream assets, enabling sophisticated dependency patterns.

Base Partition Mapping

PartitionMapping { .api }

Module: dagster._core.definitions.partitions.mapping
Type: Abstract base class

Base class for partition mapping strategies.

from dagster import PartitionMapping, PartitionKeyRange
from typing import Optional, Sequence

class BusinessDayPartitionMapping(PartitionMapping):
    """Custom partition mapping for business days only."""
    
    def __init__(self, skip_weekends: bool = True):
        self.skip_weekends = skip_weekends
    
    def get_upstream_mapped_partitions_result_for_partition_key(
        self, downstream_partition_key: str, downstream_partitions_def, upstream_partitions_def
    ):
        """Map downstream partition to upstream partitions."""
        
        # Convert partition key to date
        from datetime import datetime, timedelta
        partition_date = datetime.strptime(downstream_partition_key, "%Y-%m-%d")
        
        if self.skip_weekends and partition_date.weekday() >= 5:  # Weekend
            # Skip weekends - no upstream dependency
            return PartitionKeyRange(start=None, end=None)
        
        # For business days, map to same day
        return PartitionKeyRange(
            start=downstream_partition_key,
            end=downstream_partition_key
        )
    
    def get_downstream_partitions_for_partition_key(
        self, upstream_partition_key: str, downstream_partitions_def, upstream_partitions_def
    ):
        """Map upstream partition to downstream partitions."""
        
        # Business day mapping - same logic in reverse
        partition_date = datetime.strptime(upstream_partition_key, "%Y-%m-%d")
        
        if self.skip_weekends and partition_date.weekday() >= 5:
            return []
        
        return [upstream_partition_key]

# Usage of custom partition mapping
@asset(partitions_def=daily_partitions)
def raw_trading_data(context) -> pd.DataFrame:
    """Raw trading data (all days)."""
    partition_date = context.partition_key
    return load_trading_data(partition_date)

@asset(
    partitions_def=daily_partitions,
    ins={"raw_trading_data": AssetIn(partition_mapping=BusinessDayPartitionMapping())}
)
def business_day_analysis(context, raw_trading_data: pd.DataFrame) -> pd.DataFrame:
    """Analysis that only runs on business days."""
    return analyze_trading_data(raw_trading_data)

Built-in Partition Mappings

IdentityPartitionMapping { .api }

Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass

One-to-one partition mapping (default behavior).

from dagster import IdentityPartitionMapping, asset, AssetIn

@asset(partitions_def=daily_partitions)
def daily_raw_data(context) -> pd.DataFrame:
    """Daily raw data."""
    return load_raw_data(context.partition_key)

@asset(
    partitions_def=daily_partitions,
    ins={"daily_raw_data": AssetIn(partition_mapping=IdentityPartitionMapping())}
)
def daily_processed_data(context, daily_raw_data: pd.DataFrame) -> pd.DataFrame:
    """Processed data with identity mapping (same partition)."""
    return process_data(daily_raw_data)

AllPartitionMapping { .api }

Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass

Mapping where downstream partition depends on all upstream partitions.

from dagster import AllPartitionMapping, asset, AssetIn

@asset(
    ins={"daily_processed_data": AssetIn(partition_mapping=AllPartitionMapping())}
)
def complete_dataset_summary(context, daily_processed_data: pd.DataFrame) -> dict:
    """Summary that depends on all daily data partitions."""
    
    # This asset will wait for all partitions of daily_processed_data
    context.log.info(f"Creating summary from {len(daily_processed_data)} daily records")
    
    return {
        "total_records": len(daily_processed_data),
        "date_range": {
            "start": daily_processed_data["date"].min(),
            "end": daily_processed_data["date"].max()
        },
        "summary_stats": daily_processed_data.describe().to_dict()
    }

TimeWindowPartitionMapping { .api }

Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass

Time-based partition mapping with configurable time windows and offsets.

from dagster import TimeWindowPartitionMapping, asset, AssetIn

# Weekly summary depends on last 7 days of daily data
@asset(
    partitions_def=weekly_partitions,
    ins={
        "daily_sales": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(
                start_offset=-6,  # 6 days before  
                end_offset=0      # current day
            )
        )
    }
)
def weekly_sales_summary(context, daily_sales: pd.DataFrame) -> pd.DataFrame:
    """Weekly summary from 7 days of daily data."""
    
    time_window = context.partition_time_window
    context.log.info(f"Creating weekly summary for {time_window.start} to {time_window.end}")
    
    # daily_sales contains data from last 7 days
    weekly_summary = daily_sales.groupby("product_category").agg({
        "sales_amount": "sum",
        "transaction_count": "sum"
    }).reset_index()
    
    weekly_summary["week_start"] = time_window.start.date()
    weekly_summary["week_end"] = (time_window.end - timedelta(days=1)).date()
    
    return weekly_summary

# Monthly report depends on previous month's data
@asset(
    partitions_def=monthly_partitions,
    ins={
        "weekly_sales_summary": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(
                start_offset=-1,  # Previous month
                end_offset=-1
            )
        )
    }
)
def monthly_trends_analysis(context, weekly_sales_summary: pd.DataFrame) -> dict:
    """Monthly trends based on previous month's data."""
    
    partition_month = context.partition_time_window.start.strftime("%B %Y")
    previous_month = (context.partition_time_window.start - timedelta(days=1)).strftime("%B %Y")
    
    context.log.info(f"Analyzing {partition_month} trends using {previous_month} data")
    
    trends = {
        "analysis_month": partition_month,
        "data_month": previous_month,
        "weekly_growth_rate": calculate_growth_rate(weekly_sales_summary),
        "category_performance": analyze_category_trends(weekly_sales_summary)
    }
    
    return trends

StaticPartitionMapping { .api }

Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass

Static mapping to specific upstream partitions.

from dagster import StaticPartitionMapping, asset, AssetIn

# Asset that always depends on specific reference partitions
@asset(
    partitions_def=regions_partitions,
    ins={
        "global_config": AssetIn(
            partition_mapping=StaticPartitionMapping({
                "north_america": ["config_v1"],
                "europe": ["config_v2"], 
                "asia_pacific": ["config_v1"],
                "latin_america": ["config_v1"],
                "africa": ["config_v2"]
            })
        )
    }
)
def regional_processing(context, global_config: dict) -> pd.DataFrame:
    """Regional processing with static config mapping."""
    
    region = context.partition_key
    context.log.info(f"Processing {region} with config version {global_config.get('version')}")
    
    return process_regional_data(region, global_config)

MultiPartitionMapping { .api }

Module: dagster._core.definitions.partitions.mapping
Type: PartitionMapping subclass

Mapping for multi-dimensional partitions with dimension-specific mappings.

from dagster import MultiPartitionMapping, DimensionPartitionMapping, asset, AssetIn

# Multi-partition mapping with different strategies per dimension
multi_mapping = MultiPartitionMapping({
    "date": TimeWindowPartitionMapping(start_offset=-1, end_offset=0),  # Yesterday and today
    "region": IdentityPartitionMapping()  # Same region
})

@asset(
    partitions_def=multi_partitions,  # date x region
    ins={
        "regional_daily_sales": AssetIn(partition_mapping=multi_mapping)
    }
)
def two_day_regional_trends(context, regional_daily_sales: pd.DataFrame) -> dict:
    """Trends analysis using 2 days of data for same region."""
    
    partition_key = context.partition_key
    if isinstance(partition_key, MultiPartitionKey):
        date_key = partition_key.keys_by_dimension["date"]
        region_key = partition_key.keys_by_dimension["region"]
    
    context.log.info(f"Analyzing 2-day trends for {region_key} on {date_key}")
    
    # regional_daily_sales contains data from yesterday and today for this region
    if len(regional_daily_sales) >= 2:
        # Compare yesterday vs today
        yesterday_data = regional_daily_sales[regional_daily_sales["partition_date"] == 
                                           (datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")]
        today_data = regional_daily_sales[regional_daily_sales["partition_date"] == date_key]
        
        trends = {
            "date": date_key,
            "region": region_key,
            "yesterday_sales": yesterday_data["sales"].sum() if len(yesterday_data) > 0 else 0,
            "today_sales": today_data["sales"].sum() if len(today_data) > 0 else 0,
        }
        
        if trends["yesterday_sales"] > 0:
            trends["growth_rate"] = (trends["today_sales"] - trends["yesterday_sales"]) / trends["yesterday_sales"]
        else:
            trends["growth_rate"] = None
    else:
        trends = {
            "date": date_key,
            "region": region_key,
            "error": "Insufficient data for trend analysis"
        }
    
    return trends

Partitioned Configuration

Partitioned configuration allows partition-specific configuration values, enabling different processing logic per partition.

PartitionedConfig { .api }

Module: dagster._core.definitions.partitions.partitioned_config
Type: Class

Configuration that varies by partition with dynamic config generation.

from dagster import PartitionedConfig, job, op, daily_partitioned_config

@op(config_schema={"batch_size": int, "processing_mode": str})
def process_partition_data(context):
    """Op with partition-specific configuration."""
    
    config = context.op_config
    partition_key = context.partition_key
    
    batch_size = config["batch_size"]
    processing_mode = config["processing_mode"]
    
    context.log.info(f"Processing partition {partition_key} with batch_size={batch_size}, mode={processing_mode}")
    
    # Partition-specific processing logic
    return f"Processed {partition_key}"

# Dynamic partitioned configuration based on partition date
def partition_config_fn(start: datetime, _end: datetime) -> dict:
    """Generate configuration based on partition date."""
    
    # Weekend processing uses different parameters
    is_weekend = start.weekday() >= 5
    
    return {
        "ops": {
            "process_partition_data": {
                "config": {
                    "batch_size": 500 if is_weekend else 1000,  # Smaller batches on weekends
                    "processing_mode": "maintenance" if is_weekend else "normal"
                }
            }
        },
        "resources": {
            "database": {
                "config": {
                    "pool_size": 5 if is_weekend else 20,  # Fewer connections on weekends
                    "timeout": 60 if is_weekend else 30
                }
            }
        }
    }

partitioned_config = PartitionedConfig(
    run_config_for_partition_fn=partition_config_fn,
    partitions_def=daily_partitions
)

@job(config=partitioned_config, partitions_def=daily_partitions)
def daily_processing_job():
    """Job with partition-specific configuration."""
    process_partition_data()

# Built-in partitioned config helpers
daily_config = daily_partitioned_config(
    start_date="2023-01-01",
    timezone="UTC"
)(partition_config_fn)

@job(config=daily_config)
def daily_job_with_helper():
    process_partition_data()

# Business logic based partitioned config
def business_partition_config(start: datetime, _end: datetime) -> dict:
    """Configuration based on business calendar."""
    
    # Check if it's end of month
    next_day = start + timedelta(days=1)
    is_month_end = next_day.day == 1
    
    # Check if it's end of quarter
    is_quarter_end = (start.month in [3, 6, 9, 12] and 
                     start == (start.replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1))
    
    return {
        "ops": {
            "process_partition_data": {
                "config": {
                    "batch_size": 100 if is_quarter_end else 500 if is_month_end else 1000,
                    "processing_mode": "quarter_end" if is_quarter_end else 
                                     "month_end" if is_month_end else "normal"
                }
            }
        }
    }

business_config = PartitionedConfig(
    run_config_for_partition_fn=business_partition_config,
    partitions_def=daily_partitions
)

@job(config=business_config, partitions_def=daily_partitions)
def business_processing_job():
    """Job with business calendar aware configuration."""
    process_partition_data()

Partition Utilities

PartitionKeyRange { .api }

Module: dagster._core.definitions.partitions.partition_key_range
Type: Class

Represents a range of partition keys for batch operations and queries.

from dagster import PartitionKeyRange, asset

@asset(partitions_def=daily_partitions)
def batch_processed_data(context) -> pd.DataFrame:
    """Asset that processes data in batches using partition ranges."""
    
    # Get current partition
    current_partition = context.partition_key
    
    # Create partition range for last 7 days  
    end_date = datetime.strptime(current_partition, "%Y-%m-%d")
    start_date = end_date - timedelta(days=6)
    
    partition_range = PartitionKeyRange(
        start=start_date.strftime("%Y-%m-%d"),
        end=current_partition
    )
    
    # Get all partition keys in range
    partitions_def = context.assets_def.partitions_def
    partition_keys_in_range = partitions_def.get_partition_keys_in_range(partition_range)
    
    context.log.info(f"Processing batch for partitions: {partition_keys_in_range}")
    
    # Load data for all partitions in range
    all_data = []
    for partition_key in partition_keys_in_range:
        partition_data = load_data_for_partition(partition_key)
        all_data.append(partition_data)
    
    # Combine all partition data
    combined_df = pd.concat(all_data, ignore_index=True)
    
    context.add_output_metadata({
        "partition_range_start": partition_range.start,
        "partition_range_end": partition_range.end,
        "partitions_processed": len(partition_keys_in_range),
        "total_records": len(combined_df)
    })
    
    return combined_df

TimeWindow { .api }

Module: dagster._core.definitions.partitions.utils
Type: Class

Time window utility for time-based partition operations.

from dagster import TimeWindow, asset

@asset(partitions_def=hourly_partitions)
def time_window_analysis(context) -> dict:
    """Asset demonstrating TimeWindow usage."""
    
    # Access partition time window
    time_window = context.partition_time_window
    
    # TimeWindow properties
    start_time = time_window.start
    end_time = time_window.end
    duration = end_time - start_time
    
    context.log.info(f"Processing time window: {start_time} to {end_time} (duration: {duration})")
    
    # Time window operations
    analysis = {
        "window_start": start_time.isoformat(),
        "window_end": end_time.isoformat(), 
        "duration_seconds": duration.total_seconds(),
        "duration_minutes": duration.total_seconds() / 60,
        "hour_of_day": start_time.hour,
        "day_of_week": start_time.weekday(),
        "is_business_hour": 9 <= start_time.hour < 18,
        "is_weekend": start_time.weekday() >= 5
    }
    
    # Load data for time window
    query = f"""
    SELECT COUNT(*) as event_count, AVG(metric_value) as avg_metric
    FROM events 
    WHERE timestamp >= '{start_time}'
    AND timestamp < '{end_time}'
    """
    
    result = execute_query(query)
    if result:
        analysis.update({
            "event_count": result[0]["event_count"],
            "avg_metric": result[0]["avg_metric"]
        })
    
    return analysis

This comprehensive partitions system enables efficient processing of large datasets through intelligent data division, automatic dependency management, and sophisticated mapping strategies. The system supports everything from simple daily partitions to complex multi-dimensional business logic partitioning with dynamic runtime partition management.

For automation with partitioned assets, see Sensors and Schedules. For execution contexts in partitioned environments, see Execution and Contexts.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json