Agent-based modeling (ABM) in Python framework with spatial grids, agent schedulers, data collection tools, and browser-based visualization capabilities
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Mesa's data collection system provides comprehensive tools for gathering, organizing, and exporting simulation data. The DataCollector class enables systematic tracking of model dynamics, agent states, and custom metrics throughout simulation runs.
from mesa import DataCollector
from typing import Any, Callable, Dict
import pandas as pdThe DataCollector class serves as the central data collection system for Mesa models, supporting multiple types of reporters and flexible data organization.
class DataCollector:
"""
Data collection system for Mesa models with multiple reporter types.
DataCollector enables systematic collection of model-level statistics,
agent-level data, agent-type summaries, and custom table data throughout
simulation runs.
Attributes:
model_reporters: Dictionary of model-level variable reporters
agent_reporters: Dictionary of agent-level variable reporters
agenttype_reporters: Dictionary of agent-type-level variable reporters
model_vars: Dictionary of collected model variables
tables: Dictionary of custom data tables
"""
def __init__(self,
model_reporters=None,
agent_reporters=None,
agenttype_reporters=None,
tables=None):
"""
Initialize DataCollector with reporter configurations.
Parameters:
model_reporters: Dict mapping variable names to model reporter functions
agent_reporters: Dict mapping variable names to agent attribute names or functions
agenttype_reporters: Dict mapping variable names to agent-type reporter functions
tables: Dict mapping table names to column specifications
"""
...
def collect(self, model):
"""
Collect data from the model for the current step.
This method should be called at each simulation step to gather
data according to the configured reporters.
Parameters:
model: The Mesa model instance to collect data from
"""
...
def add_table_row(self, table_name, row, ignore_missing=False):
"""
Add a row to a custom data table.
Parameters:
table_name: Name of the table to add data to
row: Dictionary of column names to values
ignore_missing: Whether to ignore missing columns in the row
"""
...
def get_model_vars_dataframe(self):
"""
Get model-level data as a pandas DataFrame.
Returns:
DataFrame with model variables over time, indexed by step
"""
...
def get_agent_vars_dataframe(self):
"""
Get agent-level data as a pandas DataFrame.
Returns:
DataFrame with agent variables over time, indexed by step and agent ID
"""
...
def get_agenttype_vars_dataframe(self, agent_type):
"""
Get agent-type-level data as a pandas DataFrame.
Parameters:
agent_type: The agent class to get data for
Returns:
DataFrame with agent-type aggregated data over time
"""
...
def get_table_dataframe(self, table_name):
"""
Get custom table data as a pandas DataFrame.
Parameters:
table_name: Name of the table to retrieve
Returns:
DataFrame containing the custom table data
"""
...Model reporters collect model-level statistics at each simulation step. They can be functions, lambda expressions, or attribute names.
# Model reporter examples
model_reporters = {
# Function that takes model as argument
"Total Agents": lambda model: len(model.agents),
# Model attribute access
"Current Step": "steps",
"Is Running": "running",
# Complex calculations
"Average Wealth": lambda m: m.agents.agg("wealth", lambda vals: sum(vals) / len(vals) if vals else 0),
# Custom function with multiple operations
"Population Stats": lambda m: {
"total": len(m.agents),
"active": len(m.agents.select(lambda a: a.active)),
"inactive": len(m.agents.select(lambda a: not a.active))
}
}Agent reporters collect data from individual agents at each step. They specify which agent attributes or computed values to track.
# Agent reporter examples
agent_reporters = {
# Direct attribute access
"Wealth": "wealth",
"Position": "pos",
"Energy": "energy",
# Computed properties
"Neighborhood Size": lambda agent: len(agent.get_neighbors()) if hasattr(agent, 'get_neighbors') else 0,
# Agent method calls
"Social Network Size": lambda agent: len(agent.social_network) if hasattr(agent, 'social_network') else 0
}Agent type reporters aggregate data across all agents of a specific type at each step.
# Agent type reporter examples
agenttype_reporters = {
# Count agents by type
"Count": lambda model, agent_type: len(model.agents.select(agent_type=agent_type)),
# Average values by type
"Average Wealth": lambda model, agent_type: (
model.agents.select(agent_type=agent_type).agg("wealth",
lambda vals: sum(vals) / len(vals) if vals else 0)
),
# Type-specific statistics
"Total Energy": lambda model, agent_type: (
model.agents.select(agent_type=agent_type).agg("energy", sum)
)
}Custom tables allow for flexible data structures beyond the standard time-series format.
# Custom table specification
tables = {
"interactions": {
"columns": ["step", "agent_1", "agent_2", "interaction_type", "outcome"]
},
"events": {
"columns": ["step", "event_type", "location", "participants", "data"]
}
}from mesa import Agent, Model, DataCollector
class WealthAgent(Agent):
def __init__(self, model, wealth=100):
super().__init__(model)
self.wealth = wealth
self.transactions = 0
def step(self):
# Simple wealth transfer
if len(self.model.agents) > 1:
other = self.random.choice([a for a in self.model.agents if a != self])
transfer = self.random.randint(1, min(10, self.wealth))
self.wealth -= transfer
other.wealth += transfer
self.transactions += 1
class WealthModel(Model):
def __init__(self, n_agents=100, initial_wealth=100):
super().__init__()
# Set up data collection
self.datacollector = DataCollector(
model_reporters={
"Total Wealth": lambda m: m.agents.agg("wealth", sum),
"Gini Coefficient": self.calculate_gini,
"Active Agents": lambda m: len(m.agents.select(lambda a: a.wealth > 0))
},
agent_reporters={
"Wealth": "wealth",
"Transactions": "transactions"
}
)
# Create agents
for i in range(n_agents):
agent = WealthAgent(self, wealth=initial_wealth)
self.running = True
def calculate_gini(self):
"""Calculate Gini coefficient of wealth distribution."""
wealth_values = self.agents.get("wealth")
if not wealth_values:
return 0
wealth_values = sorted(wealth_values)
n = len(wealth_values)
cumsum = sum((i + 1) * wealth for i, wealth in enumerate(wealth_values))
return (2 * cumsum) / (n * sum(wealth_values)) - (n + 1) / n
def step(self):
self.datacollector.collect(self)
self.agents.shuffle_do("step")
# Run simulation and collect data
model = WealthModel(n_agents=50)
for i in range(100):
model.step()
# Get collected data
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe()
print("Model data shape:", model_data.shape)
print("Agent data shape:", agent_data.shape)
print("Final Gini coefficient:", model_data["Gini Coefficient"].iloc[-1])from mesa import Agent, Model, DataCollector
class SocialAgent(Agent):
def __init__(self, model, cooperation_level=0.5):
super().__init__(model)
self.cooperation_level = cooperation_level
self.reputation = 0.5
self.interactions_this_step = []
def interact(self, other):
"""Interact with another agent."""
# Determine cooperation
cooperate = self.random.random() < self.cooperation_level
other_cooperates = self.random.random() < other.cooperation_level
# Record interaction
interaction_data = {
"step": self.model.steps,
"agent_1": self.unique_id,
"agent_2": other.unique_id,
"agent_1_cooperates": cooperate,
"agent_2_cooperates": other_cooperates,
"outcome": "mutual_cooperation" if cooperate and other_cooperates
else "mutual_defection" if not cooperate and not other_cooperates
else "mixed"
}
# Add to model's interaction table
self.model.datacollector.add_table_row("interactions", interaction_data)
# Update reputation based on outcome
if cooperate and other_cooperates:
self.reputation += 0.1
other.reputation += 0.1
elif cooperate and not other_cooperates:
self.reputation -= 0.05
other.reputation += 0.05
def step(self):
# Interact with random other agents
num_interactions = self.random.randint(1, 3)
for _ in range(num_interactions):
if len(self.model.agents) > 1:
other = self.random.choice([a for a in self.model.agents if a != self])
self.interact(other)
class SocialModel(Model):
def __init__(self, n_agents=50):
super().__init__()
# Set up comprehensive data collection
self.datacollector = DataCollector(
model_reporters={
"Average Cooperation": lambda m: m.agents.agg("cooperation_level",
lambda vals: sum(vals) / len(vals)),
"Average Reputation": lambda m: m.agents.agg("reputation",
lambda vals: sum(vals) / len(vals)),
"High Cooperators": lambda m: len(m.agents.select(lambda a: a.cooperation_level > 0.7)),
"Low Cooperators": lambda m: len(m.agents.select(lambda a: a.cooperation_level < 0.3))
},
agent_reporters={
"Cooperation Level": "cooperation_level",
"Reputation": "reputation"
},
agenttype_reporters={
"Count": lambda m, agent_type: len(m.agents.select(agent_type=agent_type)),
"Avg Cooperation": lambda m, agent_type: (
m.agents.select(agent_type=agent_type).agg("cooperation_level",
lambda vals: sum(vals) / len(vals) if vals else 0)
)
},
tables={
"interactions": {
"columns": ["step", "agent_1", "agent_2", "agent_1_cooperates",
"agent_2_cooperates", "outcome"]
},
"reputation_events": {
"columns": ["step", "agent_id", "old_reputation", "new_reputation", "cause"]
}
}
)
# Create agents
for i in range(n_agents):
cooperation = self.random.uniform(0.1, 0.9)
agent = SocialAgent(self, cooperation_level=cooperation)
self.running = True
def step(self):
self.datacollector.collect(self)
self.agents.shuffle_do("step")
# Add reputation change events to custom table
for agent in self.agents:
if hasattr(agent, 'old_reputation') and agent.reputation != agent.old_reputation:
self.datacollector.add_table_row("reputation_events", {
"step": self.steps,
"agent_id": agent.unique_id,
"old_reputation": agent.old_reputation,
"new_reputation": agent.reputation,
"cause": "interaction"
})
agent.old_reputation = agent.reputation
# Run simulation
model = SocialModel(n_agents=30)
for i in range(50):
model.step()
# Analyze collected data
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe()
interactions_data = model.datacollector.get_table_dataframe("interactions")
reputation_events = model.datacollector.get_table_dataframe("reputation_events")
print("Model evolution:")
print(model_data[["Average Cooperation", "Average Reputation"]].head(10))
print("\nInteraction summary:")
print(interactions_data["outcome"].value_counts())
print("\nFinal agent states:")
print(agent_data.loc[agent_data.index.get_level_values("Step") == model.steps - 1].describe())from mesa import Agent, Model, DataCollector
class Predator(Agent):
def __init__(self, model, energy=100):
super().__init__(model)
self.energy = energy
self.hunting_success = 0
def step(self):
self.energy -= 2
if self.energy <= 0:
self.remove()
class Prey(Agent):
def __init__(self, model, energy=50):
super().__init__(model)
self.energy = energy
self.escapes = 0
def step(self):
self.energy -= 1
if self.energy <= 0:
self.remove()
class EcosystemModel(Model):
def __init__(self, n_predators=10, n_prey=50):
super().__init__()
# Multi-type data collection
self.datacollector = DataCollector(
model_reporters={
"Total Population": lambda m: len(m.agents),
"Predator-Prey Ratio": lambda m: (
len(m.agents.select(agent_type=Predator)) /
max(1, len(m.agents.select(agent_type=Prey)))
),
"Ecosystem Stability": self.calculate_stability
},
agent_reporters={
"Energy": "energy",
"Type": lambda a: type(a).__name__
},
agenttype_reporters={
"Count": lambda m, agent_type: len(m.agents.select(agent_type=agent_type)),
"Average Energy": lambda m, agent_type: (
m.agents.select(agent_type=agent_type).agg("energy",
lambda vals: sum(vals) / len(vals) if vals else 0)
),
"Total Energy": lambda m, agent_type: (
m.agents.select(agent_type=agent_type).agg("energy", sum)
)
}
)
# Create ecosystem
for i in range(n_predators):
Predator(self, energy=100)
for i in range(n_prey):
Prey(self, energy=50)
self.running = True
def calculate_stability(self):
"""Calculate ecosystem stability metric."""
predators = len(self.agents.select(agent_type=Predator))
prey = len(self.agents.select(agent_type=Prey))
if predators == 0 or prey == 0:
return 0.0
# Stability based on balanced populations
total = predators + prey
predator_ratio = predators / total
# Optimal ratio around 0.2 (20% predators)
optimal_ratio = 0.2
stability = 1.0 - abs(predator_ratio - optimal_ratio) / optimal_ratio
return max(0.0, stability)
def step(self):
self.datacollector.collect(self)
self.agents.shuffle_do("step")
# Check for extinction
predators = len(self.agents.select(agent_type=Predator))
prey = len(self.agents.select(agent_type=Prey))
if predators == 0 or prey == 0:
self.running = False
# Run ecosystem simulation
model = EcosystemModel(n_predators=8, n_prey=40)
for i in range(200):
model.step()
if not model.running:
break
# Analyze multi-type data
model_data = model.datacollector.get_model_vars_dataframe()
predator_data = model.datacollector.get_agenttype_vars_dataframe(Predator)
prey_data = model.datacollector.get_agenttype_vars_dataframe(Prey)
print("Ecosystem dynamics:")
print(model_data[["Total Population", "Predator-Prey Ratio", "Ecosystem Stability"]].tail(10))
print("\nPredator population:")
print(predator_data[["Count", "Average Energy"]].tail(5))
print("\nPrey population:")
print(prey_data[["Count", "Average Energy"]].tail(5))# Export data to files
model_df = model.datacollector.get_model_vars_dataframe()
agent_df = model.datacollector.get_agent_vars_dataframe()
# Save to CSV
model_df.to_csv("model_data.csv")
agent_df.to_csv("agent_data.csv")
# Save to other formats
model_df.to_parquet("model_data.parquet")
agent_df.to_json("agent_data.json")
# Custom table data
if "interactions" in model.datacollector.tables:
interactions_df = model.datacollector.get_table_dataframe("interactions")
interactions_df.to_csv("interactions.csv")import pandas as pd
import numpy as np
# Get data for analysis
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe()
# Time series analysis
model_data["Population_Change"] = model_data["Total Agents"].diff()
model_data["Population_Growth_Rate"] = model_data["Population_Change"] / model_data["Total Agents"].shift(1)
# Agent-level analysis
# Calculate summary statistics by agent
agent_summary = agent_data.groupby(level="AgentID").agg({
"Wealth": ["mean", "std", "min", "max"],
"Transactions": "sum"
})
# Time-based agent analysis
# Get agent wealth over time
agent_wealth_over_time = agent_data.reset_index().pivot(
index="Step", columns="AgentID", values="Wealth"
)
# Calculate correlation between agents
wealth_correlations = agent_wealth_over_time.corr()
print("Model summary statistics:")
print(model_data.describe())
print("\nAgent summary statistics:")
print(agent_summary.head())# Efficient reporter functions
def efficient_model_reporter(model):
"""Example of efficient model reporter."""
# Collect all needed data in one pass
agent_data = model.agents.get(["wealth", "energy", "active"])
wealth_values = [d[0] for d in agent_data]
energy_values = [d[1] for d in agent_data]
active_count = sum(1 for d in agent_data if d[2])
return {
"total_wealth": sum(wealth_values),
"avg_wealth": sum(wealth_values) / len(wealth_values) if wealth_values else 0,
"total_energy": sum(energy_values),
"active_agents": active_count
}
# Conditional data collection
def conditional_collector(model):
"""Only collect data when needed."""
if model.steps % 10 == 0: # Collect every 10 steps
return model.calculate_complex_metric()
return None# For long-running simulations, periodically export and clear data
class LongRunningModel(Model):
def __init__(self):
super().__init__()
self.datacollector = DataCollector(
model_reporters={"Population": lambda m: len(m.agents)}
)
self.export_interval = 1000
def step(self):
self.datacollector.collect(self)
self.agents.shuffle_do("step")
# Export and clear data periodically
if self.steps % self.export_interval == 0:
self.export_data()
self.datacollector = DataCollector(
model_reporters={"Population": lambda m: len(m.agents)}
)
def export_data(self):
"""Export data and clear collector."""
df = self.datacollector.get_model_vars_dataframe()
filename = f"data_batch_{self.steps // self.export_interval}.csv"
df.to_csv(filename)