CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mesa

Agent-based modeling (ABM) in Python framework with spatial grids, agent schedulers, data collection tools, and browser-based visualization capabilities

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

batch-running.mddocs/

Batch Running

Mesa's batch running system enables systematic exploration of model behavior through parameter sweeps and multiple simulation runs. The batch_run function provides powerful capabilities for running models across parameter spaces, collecting data, and analyzing results with parallel processing support.

Imports

from mesa import batch_run, Model
from typing import Any, Dict, Iterable, List, Mapping, Union
import pandas as pd
import numpy as np

batch_run Function

The core batch running functionality is provided by the batch_run function, which orchestrates parameter sweeps and data collection across multiple model runs.

def batch_run(model_cls: type[Model], 
              parameters: Mapping[str, Any | Iterable[Any]],
              number_processes: int | None = 1,
              iterations: int = 1,
              data_collection_period: int = -1,
              max_steps: int = 1000,
              display_progress: bool = True) -> list[dict[str, Any]]:
    """
    Batch run a Mesa model with parameter sweeps and data collection.
    
    Systematically executes model runs across parameter combinations,
    collecting data and supporting parallel execution for efficient
    exploration of model behavior.
    
    Parameters:
        model_cls: The Mesa model class to run
        parameters: Dictionary mapping parameter names to values or ranges.
                   Single values are held constant, iterables define parameter sweeps.
        number_processes: Number of parallel processes to use (None for all available cores)
        iterations: Number of independent runs per parameter combination
        data_collection_period: How often to collect data during runs:
                               -1 = only at end of run
                               1 = every step  
                               n = every n steps
        max_steps: Maximum number of simulation steps per run
        display_progress: Whether to show progress bar during execution
    
    Returns:
        List of dictionaries containing collected data from all runs.
        Each dictionary represents one data collection point with:
        - Parameter values for that run
        - Collected model and agent data
        - Run metadata (iteration, step, etc.)
    """
    ...

Parameter Specification

Parameters can be specified as single values (constants) or iterables (ranges) to define parameter sweeps.

# Parameter specification examples
parameters = {
    # Single values (constants across all runs)
    "width": 20,
    "height": 20,
    "torus": True,
    
    # Ranges for parameter sweeps
    "n_agents": range(10, 101, 10),  # 10, 20, 30, ..., 100
    "infection_rate": [0.1, 0.2, 0.3, 0.4, 0.5],  # Explicit values
    "recovery_time": np.arange(5, 20, 2.5),  # NumPy array: 5.0, 7.5, 10.0, ...
    
    # Boolean parameters
    "vaccination": [True, False],
    
    # Complex parameter types
    "network_type": ["small_world", "scale_free", "random"],
    "initial_infected": lambda: random.randint(1, 5),  # Callable for random values
}

# Parameter combinations
# This creates: 10 n_agents × 5 infection_rates × 2 vaccination × 3 network_types
# = 300 parameter combinations
# With iterations=5, total runs = 1500

Usage Examples

Basic Parameter Sweep

from mesa import Agent, Model, DataCollector, batch_run

class SimpleAgent(Agent):
    def __init__(self, model, cooperation_rate=0.5):
        super().__init__(model)
        self.cooperation_rate = cooperation_rate
        self.cooperated_this_step = False
    
    def step(self):
        # Decide whether to cooperate
        self.cooperated_this_step = self.random.random() < self.cooperation_rate
        
        # Interact with neighbors if in spatial model
        if hasattr(self.model, 'space'):
            neighbors = self.model.space.get_neighbors(self.pos, radius=1)
            for neighbor in neighbors:
                if neighbor.cooperated_this_step and self.cooperated_this_step:
                    # Mutual cooperation bonus
                    self.cooperation_rate = min(1.0, self.cooperation_rate + 0.01)

class CooperationModel(Model):
    def __init__(self, n_agents=100, initial_cooperation=0.5, network_density=0.1):
        super().__init__()
        
        self.n_agents = n_agents
        self.network_density = network_density
        
        # Data collection
        self.datacollector = DataCollector(
            model_reporters={
                "Cooperation Rate": lambda m: len([a for a in m.agents 
                                                 if a.cooperated_this_step]) / len(m.agents),
                "Average Cooperation Tendency": lambda m: sum(a.cooperation_rate for a in m.agents) / len(m.agents),
                "Highly Cooperative Agents": lambda m: len([a for a in m.agents 
                                                          if a.cooperation_rate > 0.8])
            },
            agent_reporters={
                "Cooperation Rate": "cooperation_rate",
                "Cooperated": "cooperated_this_step"
            }
        )
        
        # Create agents
        for i in range(n_agents):
            agent = SimpleAgent(self, cooperation_rate=initial_cooperation)
        
        self.running = True
    
    def step(self):
        self.datacollector.collect(self)
        self.agents.shuffle_do("step")

# Define parameter sweep
parameters = {
    "n_agents": [50, 100, 200],
    "initial_cooperation": [0.1, 0.3, 0.5, 0.7, 0.9],
    "network_density": [0.05, 0.1, 0.2, 0.3]
}

# Run batch experiment
results = batch_run(
    CooperationModel,
    parameters=parameters,
    iterations=10,  # 10 runs per parameter combination
    max_steps=200,
    number_processes=4,  # Use 4 CPU cores
    data_collection_period=10,  # Collect data every 10 steps
    display_progress=True
)

# Convert to DataFrame for analysis
df = pd.DataFrame(results)
print(f"Total runs: {len(df)}")
print(f"Parameter combinations: {len(df.groupby(['n_agents', 'initial_cooperation', 'network_density']))}")
print(f"Columns: {list(df.columns)}")

Advanced Parameter Sweep with Custom Data Collection

from mesa import Agent, Model, DataCollector, batch_run
from mesa.space import MultiGrid
import numpy as np

class EconomicAgent(Agent):
    def __init__(self, model, wealth=100, risk_tolerance=0.5):
        super().__init__(model)
        self.wealth = wealth
        self.initial_wealth = wealth
        self.risk_tolerance = risk_tolerance
        self.investments = []
        self.transactions_count = 0
    
    def step(self):
        # Investment decision
        if self.wealth > 50 and self.random.random() < self.risk_tolerance:
            investment = min(self.wealth * 0.1, 20)  # Invest up to 10% of wealth
            self.wealth -= investment
            self.investments.append(investment)
        
        # Trade with neighbors
        if hasattr(self, 'pos'):
            neighbors = [agent for agent in self.model.grid.get_neighbors(self.pos, moore=True)
                        if isinstance(agent, EconomicAgent)]
            
            if neighbors and self.wealth > 0:
                partner = self.random.choice(neighbors)
                if partner.wealth > 0:
                    # Simple trade
                    trade_amount = min(self.wealth, partner.wealth) * 0.05
                    if self.random.random() < 0.5:
                        self.wealth += trade_amount * self.random.uniform(0.9, 1.1)
                        partner.wealth -= trade_amount
                    else:
                        self.wealth -= trade_amount  
                        partner.wealth += trade_amount * self.random.uniform(0.9, 1.1)
                    
                    self.transactions_count += 1
                    partner.transactions_count += 1
        
        # Investment returns (risky)
        if self.investments:
            for i, investment in enumerate(self.investments):
                if self.random.random() < 0.1:  # 10% chance per investment per step
                    return_rate = self.random.lognormal(0, 0.5)  # Log-normal returns
                    self.wealth += investment * return_rate
                    self.investments.pop(i)
                    break

class EconomicModel(Model):
    def __init__(self, n_agents=100, width=20, height=20, 
                 mean_initial_wealth=100, wealth_std=20,
                 market_volatility=0.1, taxation_rate=0.0):
        super().__init__()
        
        self.n_agents = n_agents
        self.market_volatility = market_volatility
        self.taxation_rate = taxation_rate
        self.total_taxes_collected = 0
        
        # Create spatial environment
        self.grid = MultiGrid(width, height, torus=True)
        
        # Advanced data collection
        self.datacollector = DataCollector(
            model_reporters={
                "Total Wealth": lambda m: sum(a.wealth for a in m.agents),
                "Mean Wealth": lambda m: np.mean([a.wealth for a in m.agents]),
                "Wealth Std": lambda m: np.std([a.wealth for a in m.agents]),
                "Gini Coefficient": self.calculate_gini,
                "Wealth Inequality Ratio": lambda m: (
                    np.percentile([a.wealth for a in m.agents], 90) /
                    max(1, np.percentile([a.wealth for a in m.agents], 10))
                ),
                "Active Investors": lambda m: len([a for a in m.agents if len(a.investments) > 0]),
                "Total Transactions": lambda m: sum(a.transactions_count for a in m.agents),
                "Market Activity": lambda m: sum(a.transactions_count for a in m.agents) / len(m.agents),
                "Taxes Collected": "total_taxes_collected"
            },
            agent_reporters={
                "Wealth": "wealth",
                "Initial Wealth": "initial_wealth", 
                "Risk Tolerance": "risk_tolerance",
                "Active Investments": lambda a: len(a.investments),
                "Transaction Count": "transactions_count",
                "Wealth Change": lambda a: a.wealth - a.initial_wealth,
                "ROI": lambda a: (a.wealth - a.initial_wealth) / a.initial_wealth if a.initial_wealth > 0 else 0
            }
        )
        
        # Create agents with varied initial conditions
        for i in range(n_agents):
            # Wealth follows log-normal distribution
            wealth = max(10, self.random.lognormvariate(
                np.log(mean_initial_wealth), wealth_std / mean_initial_wealth
            ))
            
            # Risk tolerance varies
            risk_tolerance = self.random.betavariate(2, 2)  # Beta distribution
            
            agent = EconomicAgent(self, wealth=wealth, risk_tolerance=risk_tolerance)
            
            # Place randomly on grid
            x = self.random.randrange(width)
            y = self.random.randrange(height)
            self.grid.place_agent(agent, (x, y))
        
        self.running = True
    
    def calculate_gini(self):
        """Calculate Gini coefficient of wealth distribution."""
        wealth_values = sorted([a.wealth for a in self.agents])
        n = len(wealth_values)
        
        if n == 0 or sum(wealth_values) == 0:
            return 0
        
        cumsum = sum((i + 1) * wealth for i, wealth in enumerate(wealth_values))
        return (2 * cumsum) / (n * sum(wealth_values)) - (n + 1) / n
    
    def step(self):
        self.datacollector.collect(self)
        
        # Market shock (random events)
        if self.random.random() < self.market_volatility:
            shock_magnitude = self.random.normalvariate(0, 0.1)
            for agent in self.agents:
                agent.wealth *= (1 + shock_magnitude)
                agent.wealth = max(0, agent.wealth)  # Prevent negative wealth
        
        # Taxation
        if self.taxation_rate > 0:
            for agent in self.agents:
                if agent.wealth > 100:  # Tax threshold
                    tax = agent.wealth * self.taxation_rate
                    agent.wealth -= tax
                    self.total_taxes_collected += tax
        
        # Agent actions
        self.agents.shuffle_do("step")

# Complex parameter sweep
parameters = {
    # Population parameters
    "n_agents": [50, 100, 200],
    "mean_initial_wealth": [100, 200, 500],
    "wealth_std": [20, 50, 100],
    
    # Policy parameters
    "taxation_rate": [0.0, 0.01, 0.02, 0.05],
    "market_volatility": [0.05, 0.1, 0.2],
    
    # Spatial parameters  
    "width": 20,  # Fixed
    "height": 20  # Fixed
}

# Run comprehensive batch experiment
results = batch_run(
    EconomicModel,
    parameters=parameters,
    iterations=15,  # 15 runs per parameter combination
    max_steps=500,
    number_processes=None,  # Use all available cores
    data_collection_period=25,  # Collect data every 25 steps
    display_progress=True
)

# Analysis
df = pd.DataFrame(results)

print(f"Experiment completed:")
print(f"- Total data points: {len(df):,}")
print(f"- Parameter combinations: {len(df.groupby(['n_agents', 'mean_initial_wealth', 'wealth_std', 'taxation_rate', 'market_volatility'])):,}")
print(f"- Average final Gini coefficient: {df[df['Step'] == df['Step'].max()]['Gini Coefficient'].mean():.3f}")

# Group by policy parameters for analysis
policy_effects = df[df['Step'] == df['Step'].max()].groupby(['taxation_rate', 'market_volatility']).agg({
    'Gini Coefficient': ['mean', 'std'],
    'Total Wealth': ['mean', 'std'],
    'Market Activity': ['mean', 'std']
}).round(3)

print("\nPolicy effects on final outcomes:")
print(policy_effects)

Time Series Data Collection

from mesa import Agent, Model, DataCollector, batch_run

class PopulationAgent(Agent):
    def __init__(self, model, age=0, max_age=100):
        super().__init__(model)
        self.age = age
        self.max_age = max_age
        self.reproduced_this_step = False
    
    def step(self):
        self.age += 1
        self.reproduced_this_step = False
        
        # Death by old age
        if self.age >= self.max_age:
            self.remove()
            return
        
        # Death by random events
        death_probability = 0.001 + (self.age / self.max_age) * 0.02
        if self.random.random() < death_probability:
            self.remove()
            return
        
        # Reproduction
        if 18 <= self.age <= 50:  # Reproductive age range
            reproduction_rate = self.model.base_reproduction_rate * (
                1 - len(self.model.agents) / self.model.carrying_capacity
            )  # Logistic growth
            
            if self.random.random() < reproduction_rate:
                # Create offspring
                offspring = PopulationAgent(self.model, age=0, max_age=self.max_age)
                self.reproduced_this_step = True

class PopulationModel(Model):
    def __init__(self, initial_population=100, carrying_capacity=1000,
                 base_reproduction_rate=0.05, max_lifespan=100):
        super().__init__()
        
        self.carrying_capacity = carrying_capacity
        self.base_reproduction_rate = base_reproduction_rate
        
        # Detailed time series data collection
        self.datacollector = DataCollector(
            model_reporters={
                "Population": lambda m: len(m.agents),
                "Births": lambda m: len([a for a in m.agents if a.reproduced_this_step]),
                "Deaths": lambda m: m.deaths_this_step,
                "Average Age": lambda m: sum(a.age for a in m.agents) / len(m.agents) if m.agents else 0,
                "Age Distribution 0-18": lambda m: len([a for a in m.agents if 0 <= a.age < 18]),
                "Age Distribution 18-65": lambda m: len([a for a in m.agents if 18 <= a.age < 65]),
                "Age Distribution 65+": lambda m: len([a for a in m.agents if a.age >= 65]),
                "Growth Rate": lambda m: (len(m.agents) - m.previous_population) / max(1, m.previous_population),
                "Carrying Capacity Utilization": lambda m: len(m.agents) / m.carrying_capacity
            }
        )
        
        # Create initial population with age distribution
        for i in range(initial_population):
            age = self.random.randint(0, max_lifespan // 2)  # Younger initial population
            agent = PopulationAgent(self, age=age, max_age=max_lifespan)
        
        self.previous_population = initial_population
        self.deaths_this_step = 0
        self.running = True
    
    def step(self):
        self.datacollector.collect(self)
        
        initial_count = len(self.agents)
        self.agents.shuffle_do("step")
        final_count = len(self.agents)
        
        self.deaths_this_step = initial_count - final_count + len([a for a in self.agents if a.reproduced_this_step])
        self.previous_population = initial_count
        
        # Stop if population extinct or simulation very long
        if len(self.agents) == 0 or self.steps > 1000:
            self.running = False

# Time series parameter sweep
parameters = {
    "initial_population": [50, 100, 200],
    "carrying_capacity": [500, 1000, 2000],
    "base_reproduction_rate": [0.02, 0.05, 0.08, 0.1],
    "max_lifespan": [80, 100, 120]
}

# Run with frequent data collection for time series
results = batch_run(
    PopulationModel,
    parameters=parameters,
    iterations=20,
    max_steps=500,
    number_processes=6,
    data_collection_period=1,  # Collect data every step for time series
    display_progress=True
)

# Time series analysis
df = pd.DataFrame(results)

# Calculate population dynamics metrics
def analyze_population_dynamics(group):
    """Analyze population dynamics for a parameter combination."""
    population = group['Population'].values
    steps = group['Step'].values
    
    # Find equilibrium (if reached)
    if len(population) > 100:
        recent_pop = population[-50:]  # Last 50 steps
        equilibrium = recent_pop.mean()
        equilibrium_stability = recent_pop.std() / equilibrium if equilibrium > 0 else float('inf')
    else:
        equilibrium = population[-1] if len(population) > 0 else 0
        equilibrium_stability = float('inf')
    
    # Growth phase analysis
    max_pop = population.max()
    max_pop_step = steps[population.argmax()]
    
    # Extinction check
    went_extinct = population[-1] == 0 if len(population) > 0 else True
    
    return pd.Series({
        'max_population': max_pop,
        'max_population_step': max_pop_step,
        'equilibrium_population': equilibrium,
        'equilibrium_stability': equilibrium_stability,
        'went_extinct': went_extinct,
        'final_population': population[-1] if len(population) > 0 else 0
    })

# Group by parameter combinations and analyze
param_cols = ['initial_population', 'carrying_capacity', 'base_reproduction_rate', 'max_lifespan', 'iteration']
dynamics_analysis = df.groupby(param_cols).apply(analyze_population_dynamics).reset_index()

# Summary statistics
print("Population Dynamics Summary:")
print(dynamics_analysis.groupby(['carrying_capacity', 'base_reproduction_rate']).agg({
    'max_population': 'mean',
    'equilibrium_population': 'mean', 
    'went_extinct': 'sum',
    'equilibrium_stability': 'mean'
}).round(2))

# Find optimal parameters (high equilibrium, low extinction rate)
optimal_params = dynamics_analysis.groupby(['carrying_capacity', 'base_reproduction_rate']).agg({
    'equilibrium_population': 'mean',
    'went_extinct': 'mean',
    'equilibrium_stability': 'mean'
}).reset_index()

print("\nOptimal parameter combinations (low extinction, high stability):")
stable_params = optimal_params[
    (optimal_params['went_extinct'] < 0.1) & 
    (optimal_params['equilibrium_stability'] < 50)
].sort_values('equilibrium_population', ascending=False)
print(stable_params.head())

Performance Optimization

Parallel Processing

# Optimal number of processes
import multiprocessing

# Use all available cores
results = batch_run(
    MyModel,
    parameters=params,
    number_processes=None  # Uses all available cores
)

# Use specific number of cores (recommended for shared systems)
results = batch_run(
    MyModel, 
    parameters=params,
    number_processes=multiprocessing.cpu_count() - 1  # Leave one core free
)

# For large parameter sweeps, monitor memory usage
results = batch_run(
    MyModel,
    parameters=params,
    number_processes=4,  # Conservative for memory-intensive models
    data_collection_period=50  # Reduce data collection frequency
)

Memory Management

# For very large experiments, process in batches
def large_batch_experiment(model_cls, all_parameters, batch_size=100):
    """Run large experiments in batches to manage memory."""
    all_results = []
    
    # Split parameter combinations into batches
    param_combinations = list(param_combinations_generator(all_parameters))
    
    for i in range(0, len(param_combinations), batch_size):
        batch_params = param_combinations[i:i + batch_size]
        
        print(f"Running batch {i//batch_size + 1}/{len(param_combinations)//batch_size + 1}")
        
        batch_results = batch_run(
            model_cls,
            parameters=batch_params,
            iterations=10,
            max_steps=200,
            number_processes=4
        )
        
        all_results.extend(batch_results)
        
        # Save intermediate results
        pd.DataFrame(batch_results).to_parquet(f"batch_{i//batch_size}.parquet")
    
    return all_results

# Efficient data collection for long runs
efficient_params = {
    "data_collection_period": 20,  # Collect every 20 steps instead of every step
    "max_steps": 1000
}

Data Analysis Patterns

Parameter Sensitivity Analysis

# Convert results to DataFrame
df = pd.DataFrame(results)

# Parameter sensitivity analysis
def parameter_sensitivity_analysis(df, output_var='Total Wealth', step_filter=None):
    """Analyze how sensitive outputs are to parameter changes."""
    
    if step_filter:
        analysis_df = df[df['Step'] == step_filter].copy()
    else:
        analysis_df = df[df['Step'] == df['Step'].max()].copy()  # Final step only
    
    # Get parameter columns (exclude data columns)
    param_cols = [col for col in analysis_df.columns 
                  if col not in ['Step', 'iteration', output_var] and 
                  not col.startswith('Agent')]
    
    sensitivity_results = {}
    
    for param in param_cols:
        # Calculate correlation between parameter and output
        correlation = analysis_df[param].corr(analysis_df[output_var])
        
        # Calculate variance explained
        param_groups = analysis_df.groupby(param)[output_var].agg(['mean', 'std', 'count'])
        between_var = param_groups['mean'].var()
        within_var = (param_groups['std'] ** 2 * param_groups['count']).sum() / param_groups['count'].sum()
        variance_explained = between_var / (between_var + within_var)
        
        sensitivity_results[param] = {
            'correlation': correlation,
            'variance_explained': variance_explained,
            'range_effect': param_groups['mean'].max() - param_groups['mean'].min()
        }
    
    return pd.DataFrame(sensitivity_results).T

# Run sensitivity analysis
sensitivity = parameter_sensitivity_analysis(df, 'Gini Coefficient')
print("Parameter sensitivity for Gini Coefficient:")
print(sensitivity.sort_values('variance_explained', ascending=False))

Statistical Analysis

import scipy.stats as stats

# Statistical testing of parameter effects
def compare_parameter_effects(df, param_name, output_var, param_values=None):
    """Compare output distributions across parameter values."""
    
    final_data = df[df['Step'] == df['Step'].max()].copy()
    
    if param_values is None:
        param_values = sorted(final_data[param_name].unique())
    
    # Extract data for each parameter value
    groups = [final_data[final_data[param_name] == val][output_var].values 
              for val in param_values]
    
    # ANOVA test
    f_stat, p_value = stats.f_oneway(*groups)
    
    # Pairwise t-tests
    pairwise_results = {}
    for i, val1 in enumerate(param_values):
        for j, val2 in enumerate(param_values[i+1:], i+1):
            t_stat, p_val = stats.ttest_ind(groups[i], groups[j])
            pairwise_results[f"{val1}_vs_{val2}"] = {'t_stat': t_stat, 'p_value': p_val}
    
    return {
        'anova': {'f_stat': f_stat, 'p_value': p_value},
        'pairwise': pairwise_results,
        'group_stats': {val: {'mean': groups[i].mean(), 'std': groups[i].std()}
                       for i, val in enumerate(param_values)}
    }

# Statistical comparison
tax_effects = compare_parameter_effects(df, 'taxation_rate', 'Gini Coefficient')
print("Statistical analysis of taxation effects:")
print(f"ANOVA p-value: {tax_effects['anova']['p_value']:.4f}")
for comparison, result in tax_effects['pairwise'].items():
    print(f"{comparison}: p={result['p_value']:.4f}")

Best Practices

Model Design for Batch Running

class BatchFriendlyModel(Model):
    """Example model designed for efficient batch running."""
    
    def __init__(self, **kwargs):
        super().__init__()
        
        # Store all parameters as model attributes
        for key, value in kwargs.items():
            setattr(self, key, value)
        
        # Efficient data collection setup
        self.datacollector = DataCollector(
            model_reporters={
                # Use lambda functions for efficient computation
                "Population": lambda m: len(m.agents),
                "Parameter_Summary": lambda m: {
                    param: getattr(m, param) for param in 
                    ['param1', 'param2', 'param3'] if hasattr(m, param)
                }
            }
        )
        
        # Model initialization based on parameters
        self._initialize_model()
    
    def _initialize_model(self):
        """Initialize model components based on parameters."""
        # Create agents, space, etc. based on stored parameters
        pass
    
    def step(self):
        """Efficient step implementation."""
        # Collect data first (for consistency)
        self.datacollector.collect(self)
        
        # Agent actions
        self.agents.shuffle_do("step")
        
        # Termination conditions
        if len(self.agents) == 0 or self.steps >= getattr(self, 'max_internal_steps', 1000):
            self.running = False

# Use with batch_run
results = batch_run(
    BatchFriendlyModel,
    parameters={
        'param1': [1, 2, 3],
        'param2': [0.1, 0.2],
        'max_internal_steps': 200
    },
    iterations=50,
    max_steps=200,
    data_collection_period=-1  # Only collect at end for efficiency
)

docs

batch-running.md

core.md

data-collection.md

experimental.md

index.md

spatial.md

tile.json