Agent-based modeling (ABM) in Python framework with spatial grids, agent schedulers, data collection tools, and browser-based visualization capabilities
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Mesa's batch running system enables systematic exploration of model behavior through parameter sweeps and multiple simulation runs. The batch_run function provides powerful capabilities for running models across parameter spaces, collecting data, and analyzing results with parallel processing support.
from mesa import batch_run, Model
from typing import Any, Dict, Iterable, List, Mapping, Union
import pandas as pd
import numpy as npThe core batch running functionality is provided by the batch_run function, which orchestrates parameter sweeps and data collection across multiple model runs.
def batch_run(model_cls: type[Model],
parameters: Mapping[str, Any | Iterable[Any]],
number_processes: int | None = 1,
iterations: int = 1,
data_collection_period: int = -1,
max_steps: int = 1000,
display_progress: bool = True) -> list[dict[str, Any]]:
"""
Batch run a Mesa model with parameter sweeps and data collection.
Systematically executes model runs across parameter combinations,
collecting data and supporting parallel execution for efficient
exploration of model behavior.
Parameters:
model_cls: The Mesa model class to run
parameters: Dictionary mapping parameter names to values or ranges.
Single values are held constant, iterables define parameter sweeps.
number_processes: Number of parallel processes to use (None for all available cores)
iterations: Number of independent runs per parameter combination
data_collection_period: How often to collect data during runs:
-1 = only at end of run
1 = every step
n = every n steps
max_steps: Maximum number of simulation steps per run
display_progress: Whether to show progress bar during execution
Returns:
List of dictionaries containing collected data from all runs.
Each dictionary represents one data collection point with:
- Parameter values for that run
- Collected model and agent data
- Run metadata (iteration, step, etc.)
"""
...Parameters can be specified as single values (constants) or iterables (ranges) to define parameter sweeps.
# Parameter specification examples
parameters = {
# Single values (constants across all runs)
"width": 20,
"height": 20,
"torus": True,
# Ranges for parameter sweeps
"n_agents": range(10, 101, 10), # 10, 20, 30, ..., 100
"infection_rate": [0.1, 0.2, 0.3, 0.4, 0.5], # Explicit values
"recovery_time": np.arange(5, 20, 2.5), # NumPy array: 5.0, 7.5, 10.0, ...
# Boolean parameters
"vaccination": [True, False],
# Complex parameter types
"network_type": ["small_world", "scale_free", "random"],
"initial_infected": lambda: random.randint(1, 5), # Callable for random values
}
# Parameter combinations
# This creates: 10 n_agents × 5 infection_rates × 2 vaccination × 3 network_types
# = 300 parameter combinations
# With iterations=5, total runs = 1500from mesa import Agent, Model, DataCollector, batch_run
class SimpleAgent(Agent):
def __init__(self, model, cooperation_rate=0.5):
super().__init__(model)
self.cooperation_rate = cooperation_rate
self.cooperated_this_step = False
def step(self):
# Decide whether to cooperate
self.cooperated_this_step = self.random.random() < self.cooperation_rate
# Interact with neighbors if in spatial model
if hasattr(self.model, 'space'):
neighbors = self.model.space.get_neighbors(self.pos, radius=1)
for neighbor in neighbors:
if neighbor.cooperated_this_step and self.cooperated_this_step:
# Mutual cooperation bonus
self.cooperation_rate = min(1.0, self.cooperation_rate + 0.01)
class CooperationModel(Model):
def __init__(self, n_agents=100, initial_cooperation=0.5, network_density=0.1):
super().__init__()
self.n_agents = n_agents
self.network_density = network_density
# Data collection
self.datacollector = DataCollector(
model_reporters={
"Cooperation Rate": lambda m: len([a for a in m.agents
if a.cooperated_this_step]) / len(m.agents),
"Average Cooperation Tendency": lambda m: sum(a.cooperation_rate for a in m.agents) / len(m.agents),
"Highly Cooperative Agents": lambda m: len([a for a in m.agents
if a.cooperation_rate > 0.8])
},
agent_reporters={
"Cooperation Rate": "cooperation_rate",
"Cooperated": "cooperated_this_step"
}
)
# Create agents
for i in range(n_agents):
agent = SimpleAgent(self, cooperation_rate=initial_cooperation)
self.running = True
def step(self):
self.datacollector.collect(self)
self.agents.shuffle_do("step")
# Define parameter sweep
parameters = {
"n_agents": [50, 100, 200],
"initial_cooperation": [0.1, 0.3, 0.5, 0.7, 0.9],
"network_density": [0.05, 0.1, 0.2, 0.3]
}
# Run batch experiment
results = batch_run(
CooperationModel,
parameters=parameters,
iterations=10, # 10 runs per parameter combination
max_steps=200,
number_processes=4, # Use 4 CPU cores
data_collection_period=10, # Collect data every 10 steps
display_progress=True
)
# Convert to DataFrame for analysis
df = pd.DataFrame(results)
print(f"Total runs: {len(df)}")
print(f"Parameter combinations: {len(df.groupby(['n_agents', 'initial_cooperation', 'network_density']))}")
print(f"Columns: {list(df.columns)}")from mesa import Agent, Model, DataCollector, batch_run
from mesa.space import MultiGrid
import numpy as np
class EconomicAgent(Agent):
def __init__(self, model, wealth=100, risk_tolerance=0.5):
super().__init__(model)
self.wealth = wealth
self.initial_wealth = wealth
self.risk_tolerance = risk_tolerance
self.investments = []
self.transactions_count = 0
def step(self):
# Investment decision
if self.wealth > 50 and self.random.random() < self.risk_tolerance:
investment = min(self.wealth * 0.1, 20) # Invest up to 10% of wealth
self.wealth -= investment
self.investments.append(investment)
# Trade with neighbors
if hasattr(self, 'pos'):
neighbors = [agent for agent in self.model.grid.get_neighbors(self.pos, moore=True)
if isinstance(agent, EconomicAgent)]
if neighbors and self.wealth > 0:
partner = self.random.choice(neighbors)
if partner.wealth > 0:
# Simple trade
trade_amount = min(self.wealth, partner.wealth) * 0.05
if self.random.random() < 0.5:
self.wealth += trade_amount * self.random.uniform(0.9, 1.1)
partner.wealth -= trade_amount
else:
self.wealth -= trade_amount
partner.wealth += trade_amount * self.random.uniform(0.9, 1.1)
self.transactions_count += 1
partner.transactions_count += 1
# Investment returns (risky)
if self.investments:
for i, investment in enumerate(self.investments):
if self.random.random() < 0.1: # 10% chance per investment per step
return_rate = self.random.lognormal(0, 0.5) # Log-normal returns
self.wealth += investment * return_rate
self.investments.pop(i)
break
class EconomicModel(Model):
def __init__(self, n_agents=100, width=20, height=20,
mean_initial_wealth=100, wealth_std=20,
market_volatility=0.1, taxation_rate=0.0):
super().__init__()
self.n_agents = n_agents
self.market_volatility = market_volatility
self.taxation_rate = taxation_rate
self.total_taxes_collected = 0
# Create spatial environment
self.grid = MultiGrid(width, height, torus=True)
# Advanced data collection
self.datacollector = DataCollector(
model_reporters={
"Total Wealth": lambda m: sum(a.wealth for a in m.agents),
"Mean Wealth": lambda m: np.mean([a.wealth for a in m.agents]),
"Wealth Std": lambda m: np.std([a.wealth for a in m.agents]),
"Gini Coefficient": self.calculate_gini,
"Wealth Inequality Ratio": lambda m: (
np.percentile([a.wealth for a in m.agents], 90) /
max(1, np.percentile([a.wealth for a in m.agents], 10))
),
"Active Investors": lambda m: len([a for a in m.agents if len(a.investments) > 0]),
"Total Transactions": lambda m: sum(a.transactions_count for a in m.agents),
"Market Activity": lambda m: sum(a.transactions_count for a in m.agents) / len(m.agents),
"Taxes Collected": "total_taxes_collected"
},
agent_reporters={
"Wealth": "wealth",
"Initial Wealth": "initial_wealth",
"Risk Tolerance": "risk_tolerance",
"Active Investments": lambda a: len(a.investments),
"Transaction Count": "transactions_count",
"Wealth Change": lambda a: a.wealth - a.initial_wealth,
"ROI": lambda a: (a.wealth - a.initial_wealth) / a.initial_wealth if a.initial_wealth > 0 else 0
}
)
# Create agents with varied initial conditions
for i in range(n_agents):
# Wealth follows log-normal distribution
wealth = max(10, self.random.lognormvariate(
np.log(mean_initial_wealth), wealth_std / mean_initial_wealth
))
# Risk tolerance varies
risk_tolerance = self.random.betavariate(2, 2) # Beta distribution
agent = EconomicAgent(self, wealth=wealth, risk_tolerance=risk_tolerance)
# Place randomly on grid
x = self.random.randrange(width)
y = self.random.randrange(height)
self.grid.place_agent(agent, (x, y))
self.running = True
def calculate_gini(self):
"""Calculate Gini coefficient of wealth distribution."""
wealth_values = sorted([a.wealth for a in self.agents])
n = len(wealth_values)
if n == 0 or sum(wealth_values) == 0:
return 0
cumsum = sum((i + 1) * wealth for i, wealth in enumerate(wealth_values))
return (2 * cumsum) / (n * sum(wealth_values)) - (n + 1) / n
def step(self):
self.datacollector.collect(self)
# Market shock (random events)
if self.random.random() < self.market_volatility:
shock_magnitude = self.random.normalvariate(0, 0.1)
for agent in self.agents:
agent.wealth *= (1 + shock_magnitude)
agent.wealth = max(0, agent.wealth) # Prevent negative wealth
# Taxation
if self.taxation_rate > 0:
for agent in self.agents:
if agent.wealth > 100: # Tax threshold
tax = agent.wealth * self.taxation_rate
agent.wealth -= tax
self.total_taxes_collected += tax
# Agent actions
self.agents.shuffle_do("step")
# Complex parameter sweep
parameters = {
# Population parameters
"n_agents": [50, 100, 200],
"mean_initial_wealth": [100, 200, 500],
"wealth_std": [20, 50, 100],
# Policy parameters
"taxation_rate": [0.0, 0.01, 0.02, 0.05],
"market_volatility": [0.05, 0.1, 0.2],
# Spatial parameters
"width": 20, # Fixed
"height": 20 # Fixed
}
# Run comprehensive batch experiment
results = batch_run(
EconomicModel,
parameters=parameters,
iterations=15, # 15 runs per parameter combination
max_steps=500,
number_processes=None, # Use all available cores
data_collection_period=25, # Collect data every 25 steps
display_progress=True
)
# Analysis
df = pd.DataFrame(results)
print(f"Experiment completed:")
print(f"- Total data points: {len(df):,}")
print(f"- Parameter combinations: {len(df.groupby(['n_agents', 'mean_initial_wealth', 'wealth_std', 'taxation_rate', 'market_volatility'])):,}")
print(f"- Average final Gini coefficient: {df[df['Step'] == df['Step'].max()]['Gini Coefficient'].mean():.3f}")
# Group by policy parameters for analysis
policy_effects = df[df['Step'] == df['Step'].max()].groupby(['taxation_rate', 'market_volatility']).agg({
'Gini Coefficient': ['mean', 'std'],
'Total Wealth': ['mean', 'std'],
'Market Activity': ['mean', 'std']
}).round(3)
print("\nPolicy effects on final outcomes:")
print(policy_effects)from mesa import Agent, Model, DataCollector, batch_run
class PopulationAgent(Agent):
def __init__(self, model, age=0, max_age=100):
super().__init__(model)
self.age = age
self.max_age = max_age
self.reproduced_this_step = False
def step(self):
self.age += 1
self.reproduced_this_step = False
# Death by old age
if self.age >= self.max_age:
self.remove()
return
# Death by random events
death_probability = 0.001 + (self.age / self.max_age) * 0.02
if self.random.random() < death_probability:
self.remove()
return
# Reproduction
if 18 <= self.age <= 50: # Reproductive age range
reproduction_rate = self.model.base_reproduction_rate * (
1 - len(self.model.agents) / self.model.carrying_capacity
) # Logistic growth
if self.random.random() < reproduction_rate:
# Create offspring
offspring = PopulationAgent(self.model, age=0, max_age=self.max_age)
self.reproduced_this_step = True
class PopulationModel(Model):
def __init__(self, initial_population=100, carrying_capacity=1000,
base_reproduction_rate=0.05, max_lifespan=100):
super().__init__()
self.carrying_capacity = carrying_capacity
self.base_reproduction_rate = base_reproduction_rate
# Detailed time series data collection
self.datacollector = DataCollector(
model_reporters={
"Population": lambda m: len(m.agents),
"Births": lambda m: len([a for a in m.agents if a.reproduced_this_step]),
"Deaths": lambda m: m.deaths_this_step,
"Average Age": lambda m: sum(a.age for a in m.agents) / len(m.agents) if m.agents else 0,
"Age Distribution 0-18": lambda m: len([a for a in m.agents if 0 <= a.age < 18]),
"Age Distribution 18-65": lambda m: len([a for a in m.agents if 18 <= a.age < 65]),
"Age Distribution 65+": lambda m: len([a for a in m.agents if a.age >= 65]),
"Growth Rate": lambda m: (len(m.agents) - m.previous_population) / max(1, m.previous_population),
"Carrying Capacity Utilization": lambda m: len(m.agents) / m.carrying_capacity
}
)
# Create initial population with age distribution
for i in range(initial_population):
age = self.random.randint(0, max_lifespan // 2) # Younger initial population
agent = PopulationAgent(self, age=age, max_age=max_lifespan)
self.previous_population = initial_population
self.deaths_this_step = 0
self.running = True
def step(self):
self.datacollector.collect(self)
initial_count = len(self.agents)
self.agents.shuffle_do("step")
final_count = len(self.agents)
self.deaths_this_step = initial_count - final_count + len([a for a in self.agents if a.reproduced_this_step])
self.previous_population = initial_count
# Stop if population extinct or simulation very long
if len(self.agents) == 0 or self.steps > 1000:
self.running = False
# Time series parameter sweep
parameters = {
"initial_population": [50, 100, 200],
"carrying_capacity": [500, 1000, 2000],
"base_reproduction_rate": [0.02, 0.05, 0.08, 0.1],
"max_lifespan": [80, 100, 120]
}
# Run with frequent data collection for time series
results = batch_run(
PopulationModel,
parameters=parameters,
iterations=20,
max_steps=500,
number_processes=6,
data_collection_period=1, # Collect data every step for time series
display_progress=True
)
# Time series analysis
df = pd.DataFrame(results)
# Calculate population dynamics metrics
def analyze_population_dynamics(group):
"""Analyze population dynamics for a parameter combination."""
population = group['Population'].values
steps = group['Step'].values
# Find equilibrium (if reached)
if len(population) > 100:
recent_pop = population[-50:] # Last 50 steps
equilibrium = recent_pop.mean()
equilibrium_stability = recent_pop.std() / equilibrium if equilibrium > 0 else float('inf')
else:
equilibrium = population[-1] if len(population) > 0 else 0
equilibrium_stability = float('inf')
# Growth phase analysis
max_pop = population.max()
max_pop_step = steps[population.argmax()]
# Extinction check
went_extinct = population[-1] == 0 if len(population) > 0 else True
return pd.Series({
'max_population': max_pop,
'max_population_step': max_pop_step,
'equilibrium_population': equilibrium,
'equilibrium_stability': equilibrium_stability,
'went_extinct': went_extinct,
'final_population': population[-1] if len(population) > 0 else 0
})
# Group by parameter combinations and analyze
param_cols = ['initial_population', 'carrying_capacity', 'base_reproduction_rate', 'max_lifespan', 'iteration']
dynamics_analysis = df.groupby(param_cols).apply(analyze_population_dynamics).reset_index()
# Summary statistics
print("Population Dynamics Summary:")
print(dynamics_analysis.groupby(['carrying_capacity', 'base_reproduction_rate']).agg({
'max_population': 'mean',
'equilibrium_population': 'mean',
'went_extinct': 'sum',
'equilibrium_stability': 'mean'
}).round(2))
# Find optimal parameters (high equilibrium, low extinction rate)
optimal_params = dynamics_analysis.groupby(['carrying_capacity', 'base_reproduction_rate']).agg({
'equilibrium_population': 'mean',
'went_extinct': 'mean',
'equilibrium_stability': 'mean'
}).reset_index()
print("\nOptimal parameter combinations (low extinction, high stability):")
stable_params = optimal_params[
(optimal_params['went_extinct'] < 0.1) &
(optimal_params['equilibrium_stability'] < 50)
].sort_values('equilibrium_population', ascending=False)
print(stable_params.head())# Optimal number of processes
import multiprocessing
# Use all available cores
results = batch_run(
MyModel,
parameters=params,
number_processes=None # Uses all available cores
)
# Use specific number of cores (recommended for shared systems)
results = batch_run(
MyModel,
parameters=params,
number_processes=multiprocessing.cpu_count() - 1 # Leave one core free
)
# For large parameter sweeps, monitor memory usage
results = batch_run(
MyModel,
parameters=params,
number_processes=4, # Conservative for memory-intensive models
data_collection_period=50 # Reduce data collection frequency
)# For very large experiments, process in batches
def large_batch_experiment(model_cls, all_parameters, batch_size=100):
"""Run large experiments in batches to manage memory."""
all_results = []
# Split parameter combinations into batches
param_combinations = list(param_combinations_generator(all_parameters))
for i in range(0, len(param_combinations), batch_size):
batch_params = param_combinations[i:i + batch_size]
print(f"Running batch {i//batch_size + 1}/{len(param_combinations)//batch_size + 1}")
batch_results = batch_run(
model_cls,
parameters=batch_params,
iterations=10,
max_steps=200,
number_processes=4
)
all_results.extend(batch_results)
# Save intermediate results
pd.DataFrame(batch_results).to_parquet(f"batch_{i//batch_size}.parquet")
return all_results
# Efficient data collection for long runs
efficient_params = {
"data_collection_period": 20, # Collect every 20 steps instead of every step
"max_steps": 1000
}# Convert results to DataFrame
df = pd.DataFrame(results)
# Parameter sensitivity analysis
def parameter_sensitivity_analysis(df, output_var='Total Wealth', step_filter=None):
"""Analyze how sensitive outputs are to parameter changes."""
if step_filter:
analysis_df = df[df['Step'] == step_filter].copy()
else:
analysis_df = df[df['Step'] == df['Step'].max()].copy() # Final step only
# Get parameter columns (exclude data columns)
param_cols = [col for col in analysis_df.columns
if col not in ['Step', 'iteration', output_var] and
not col.startswith('Agent')]
sensitivity_results = {}
for param in param_cols:
# Calculate correlation between parameter and output
correlation = analysis_df[param].corr(analysis_df[output_var])
# Calculate variance explained
param_groups = analysis_df.groupby(param)[output_var].agg(['mean', 'std', 'count'])
between_var = param_groups['mean'].var()
within_var = (param_groups['std'] ** 2 * param_groups['count']).sum() / param_groups['count'].sum()
variance_explained = between_var / (between_var + within_var)
sensitivity_results[param] = {
'correlation': correlation,
'variance_explained': variance_explained,
'range_effect': param_groups['mean'].max() - param_groups['mean'].min()
}
return pd.DataFrame(sensitivity_results).T
# Run sensitivity analysis
sensitivity = parameter_sensitivity_analysis(df, 'Gini Coefficient')
print("Parameter sensitivity for Gini Coefficient:")
print(sensitivity.sort_values('variance_explained', ascending=False))import scipy.stats as stats
# Statistical testing of parameter effects
def compare_parameter_effects(df, param_name, output_var, param_values=None):
"""Compare output distributions across parameter values."""
final_data = df[df['Step'] == df['Step'].max()].copy()
if param_values is None:
param_values = sorted(final_data[param_name].unique())
# Extract data for each parameter value
groups = [final_data[final_data[param_name] == val][output_var].values
for val in param_values]
# ANOVA test
f_stat, p_value = stats.f_oneway(*groups)
# Pairwise t-tests
pairwise_results = {}
for i, val1 in enumerate(param_values):
for j, val2 in enumerate(param_values[i+1:], i+1):
t_stat, p_val = stats.ttest_ind(groups[i], groups[j])
pairwise_results[f"{val1}_vs_{val2}"] = {'t_stat': t_stat, 'p_value': p_val}
return {
'anova': {'f_stat': f_stat, 'p_value': p_value},
'pairwise': pairwise_results,
'group_stats': {val: {'mean': groups[i].mean(), 'std': groups[i].std()}
for i, val in enumerate(param_values)}
}
# Statistical comparison
tax_effects = compare_parameter_effects(df, 'taxation_rate', 'Gini Coefficient')
print("Statistical analysis of taxation effects:")
print(f"ANOVA p-value: {tax_effects['anova']['p_value']:.4f}")
for comparison, result in tax_effects['pairwise'].items():
print(f"{comparison}: p={result['p_value']:.4f}")class BatchFriendlyModel(Model):
"""Example model designed for efficient batch running."""
def __init__(self, **kwargs):
super().__init__()
# Store all parameters as model attributes
for key, value in kwargs.items():
setattr(self, key, value)
# Efficient data collection setup
self.datacollector = DataCollector(
model_reporters={
# Use lambda functions for efficient computation
"Population": lambda m: len(m.agents),
"Parameter_Summary": lambda m: {
param: getattr(m, param) for param in
['param1', 'param2', 'param3'] if hasattr(m, param)
}
}
)
# Model initialization based on parameters
self._initialize_model()
def _initialize_model(self):
"""Initialize model components based on parameters."""
# Create agents, space, etc. based on stored parameters
pass
def step(self):
"""Efficient step implementation."""
# Collect data first (for consistency)
self.datacollector.collect(self)
# Agent actions
self.agents.shuffle_do("step")
# Termination conditions
if len(self.agents) == 0 or self.steps >= getattr(self, 'max_internal_steps', 1000):
self.running = False
# Use with batch_run
results = batch_run(
BatchFriendlyModel,
parameters={
'param1': [1, 2, 3],
'param2': [0.1, 0.2],
'max_internal_steps': 200
},
iterations=50,
max_steps=200,
data_collection_period=-1 # Only collect at end for efficiency
)