CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mesa

Agent-based modeling (ABM) in Python framework with spatial grids, agent schedulers, data collection tools, and browser-based visualization capabilities

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

data-collection.mddocs/

Data Collection

Mesa's data collection system provides comprehensive tools for gathering, organizing, and exporting simulation data. The DataCollector class enables systematic tracking of model dynamics, agent states, and custom metrics throughout simulation runs.

Imports

from mesa import DataCollector
from typing import Any, Callable, Dict
import pandas as pd

DataCollector Class

The DataCollector class serves as the central data collection system for Mesa models, supporting multiple types of reporters and flexible data organization.

class DataCollector:
    """
    Data collection system for Mesa models with multiple reporter types.
    
    DataCollector enables systematic collection of model-level statistics,
    agent-level data, agent-type summaries, and custom table data throughout
    simulation runs.
    
    Attributes:
        model_reporters: Dictionary of model-level variable reporters
        agent_reporters: Dictionary of agent-level variable reporters  
        agenttype_reporters: Dictionary of agent-type-level variable reporters
        model_vars: Dictionary of collected model variables
        tables: Dictionary of custom data tables
    """
    
    def __init__(self, 
                 model_reporters=None,
                 agent_reporters=None,
                 agenttype_reporters=None,
                 tables=None):
        """
        Initialize DataCollector with reporter configurations.
        
        Parameters:
            model_reporters: Dict mapping variable names to model reporter functions
            agent_reporters: Dict mapping variable names to agent attribute names or functions
            agenttype_reporters: Dict mapping variable names to agent-type reporter functions
            tables: Dict mapping table names to column specifications
        """
        ...
    
    def collect(self, model):
        """
        Collect data from the model for the current step.
        
        This method should be called at each simulation step to gather
        data according to the configured reporters.
        
        Parameters:
            model: The Mesa model instance to collect data from
        """
        ...
    
    def add_table_row(self, table_name, row, ignore_missing=False):
        """
        Add a row to a custom data table.
        
        Parameters:
            table_name: Name of the table to add data to
            row: Dictionary of column names to values
            ignore_missing: Whether to ignore missing columns in the row
        """
        ...
    
    def get_model_vars_dataframe(self):
        """
        Get model-level data as a pandas DataFrame.
        
        Returns:
            DataFrame with model variables over time, indexed by step
        """
        ...
    
    def get_agent_vars_dataframe(self):
        """
        Get agent-level data as a pandas DataFrame.
        
        Returns:
            DataFrame with agent variables over time, indexed by step and agent ID
        """
        ...
    
    def get_agenttype_vars_dataframe(self, agent_type):
        """
        Get agent-type-level data as a pandas DataFrame.
        
        Parameters:
            agent_type: The agent class to get data for
        
        Returns:
            DataFrame with agent-type aggregated data over time
        """
        ...
    
    def get_table_dataframe(self, table_name):
        """
        Get custom table data as a pandas DataFrame.
        
        Parameters:
            table_name: Name of the table to retrieve
        
        Returns:
            DataFrame containing the custom table data
        """
        ...

Reporter Types

Model Reporters

Model reporters collect model-level statistics at each simulation step. They can be functions, lambda expressions, or attribute names.

# Model reporter examples
model_reporters = {
    # Function that takes model as argument
    "Total Agents": lambda model: len(model.agents),
    
    # Model attribute access
    "Current Step": "steps",
    "Is Running": "running",
    
    # Complex calculations
    "Average Wealth": lambda m: m.agents.agg("wealth", lambda vals: sum(vals) / len(vals) if vals else 0),
    
    # Custom function with multiple operations  
    "Population Stats": lambda m: {
        "total": len(m.agents),
        "active": len(m.agents.select(lambda a: a.active)),
        "inactive": len(m.agents.select(lambda a: not a.active))
    }
}

Agent Reporters

Agent reporters collect data from individual agents at each step. They specify which agent attributes or computed values to track.

# Agent reporter examples
agent_reporters = {
    # Direct attribute access
    "Wealth": "wealth",
    "Position": "pos", 
    "Energy": "energy",
    
    # Computed properties
    "Neighborhood Size": lambda agent: len(agent.get_neighbors()) if hasattr(agent, 'get_neighbors') else 0,
    
    # Agent method calls
    "Social Network Size": lambda agent: len(agent.social_network) if hasattr(agent, 'social_network') else 0
}

Agent Type Reporters

Agent type reporters aggregate data across all agents of a specific type at each step.

# Agent type reporter examples  
agenttype_reporters = {
    # Count agents by type
    "Count": lambda model, agent_type: len(model.agents.select(agent_type=agent_type)),
    
    # Average values by type
    "Average Wealth": lambda model, agent_type: (
        model.agents.select(agent_type=agent_type).agg("wealth", 
        lambda vals: sum(vals) / len(vals) if vals else 0)
    ),
    
    # Type-specific statistics
    "Total Energy": lambda model, agent_type: (
        model.agents.select(agent_type=agent_type).agg("energy", sum)
    )
}

Custom Tables

Custom tables allow for flexible data structures beyond the standard time-series format.

# Custom table specification
tables = {
    "interactions": {
        "columns": ["step", "agent_1", "agent_2", "interaction_type", "outcome"]
    },
    "events": {
        "columns": ["step", "event_type", "location", "participants", "data"]
    }
}

Usage Examples

Basic Data Collection Setup

from mesa import Agent, Model, DataCollector

class WealthAgent(Agent):
    def __init__(self, model, wealth=100):
        super().__init__(model)
        self.wealth = wealth
        self.transactions = 0
        
    def step(self):
        # Simple wealth transfer
        if len(self.model.agents) > 1:
            other = self.random.choice([a for a in self.model.agents if a != self])
            transfer = self.random.randint(1, min(10, self.wealth))
            self.wealth -= transfer
            other.wealth += transfer
            self.transactions += 1

class WealthModel(Model):
    def __init__(self, n_agents=100, initial_wealth=100):
        super().__init__()
        
        # Set up data collection
        self.datacollector = DataCollector(
            model_reporters={
                "Total Wealth": lambda m: m.agents.agg("wealth", sum),
                "Gini Coefficient": self.calculate_gini,
                "Active Agents": lambda m: len(m.agents.select(lambda a: a.wealth > 0))
            },
            agent_reporters={
                "Wealth": "wealth",
                "Transactions": "transactions"
            }
        )
        
        # Create agents
        for i in range(n_agents):
            agent = WealthAgent(self, wealth=initial_wealth)
        
        self.running = True
    
    def calculate_gini(self):
        """Calculate Gini coefficient of wealth distribution."""
        wealth_values = self.agents.get("wealth")
        if not wealth_values:
            return 0
        
        wealth_values = sorted(wealth_values)
        n = len(wealth_values)
        cumsum = sum((i + 1) * wealth for i, wealth in enumerate(wealth_values))
        return (2 * cumsum) / (n * sum(wealth_values)) - (n + 1) / n
    
    def step(self):
        self.datacollector.collect(self)
        self.agents.shuffle_do("step")

# Run simulation and collect data
model = WealthModel(n_agents=50)
for i in range(100):
    model.step()

# Get collected data
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe()

print("Model data shape:", model_data.shape)
print("Agent data shape:", agent_data.shape)
print("Final Gini coefficient:", model_data["Gini Coefficient"].iloc[-1])

Advanced Data Collection with Custom Tables

from mesa import Agent, Model, DataCollector

class SocialAgent(Agent):
    def __init__(self, model, cooperation_level=0.5):
        super().__init__(model)
        self.cooperation_level = cooperation_level
        self.reputation = 0.5
        self.interactions_this_step = []
    
    def interact(self, other):
        """Interact with another agent."""
        # Determine cooperation
        cooperate = self.random.random() < self.cooperation_level
        other_cooperates = self.random.random() < other.cooperation_level
        
        # Record interaction
        interaction_data = {
            "step": self.model.steps,
            "agent_1": self.unique_id,
            "agent_2": other.unique_id, 
            "agent_1_cooperates": cooperate,
            "agent_2_cooperates": other_cooperates,
            "outcome": "mutual_cooperation" if cooperate and other_cooperates
                      else "mutual_defection" if not cooperate and not other_cooperates
                      else "mixed"
        }
        
        # Add to model's interaction table
        self.model.datacollector.add_table_row("interactions", interaction_data)
        
        # Update reputation based on outcome
        if cooperate and other_cooperates:
            self.reputation += 0.1
            other.reputation += 0.1
        elif cooperate and not other_cooperates:
            self.reputation -= 0.05
            other.reputation += 0.05
    
    def step(self):
        # Interact with random other agents
        num_interactions = self.random.randint(1, 3)
        for _ in range(num_interactions):
            if len(self.model.agents) > 1:
                other = self.random.choice([a for a in self.model.agents if a != self])
                self.interact(other)

class SocialModel(Model):
    def __init__(self, n_agents=50):
        super().__init__()
        
        # Set up comprehensive data collection
        self.datacollector = DataCollector(
            model_reporters={
                "Average Cooperation": lambda m: m.agents.agg("cooperation_level", 
                                                            lambda vals: sum(vals) / len(vals)),
                "Average Reputation": lambda m: m.agents.agg("reputation",
                                                           lambda vals: sum(vals) / len(vals)),
                "High Cooperators": lambda m: len(m.agents.select(lambda a: a.cooperation_level > 0.7)),
                "Low Cooperators": lambda m: len(m.agents.select(lambda a: a.cooperation_level < 0.3))
            },
            agent_reporters={
                "Cooperation Level": "cooperation_level",
                "Reputation": "reputation"
            },
            agenttype_reporters={
                "Count": lambda m, agent_type: len(m.agents.select(agent_type=agent_type)),
                "Avg Cooperation": lambda m, agent_type: (
                    m.agents.select(agent_type=agent_type).agg("cooperation_level", 
                    lambda vals: sum(vals) / len(vals) if vals else 0)
                )
            },
            tables={
                "interactions": {
                    "columns": ["step", "agent_1", "agent_2", "agent_1_cooperates", 
                               "agent_2_cooperates", "outcome"]
                },
                "reputation_events": {
                    "columns": ["step", "agent_id", "old_reputation", "new_reputation", "cause"]
                }
            }
        )
        
        # Create agents
        for i in range(n_agents):
            cooperation = self.random.uniform(0.1, 0.9)
            agent = SocialAgent(self, cooperation_level=cooperation)
        
        self.running = True
    
    def step(self):
        self.datacollector.collect(self)
        self.agents.shuffle_do("step")
        
        # Add reputation change events to custom table
        for agent in self.agents:
            if hasattr(agent, 'old_reputation') and agent.reputation != agent.old_reputation:
                self.datacollector.add_table_row("reputation_events", {
                    "step": self.steps,
                    "agent_id": agent.unique_id,
                    "old_reputation": agent.old_reputation,
                    "new_reputation": agent.reputation,
                    "cause": "interaction"
                })
            agent.old_reputation = agent.reputation

# Run simulation
model = SocialModel(n_agents=30)
for i in range(50):
    model.step()

# Analyze collected data
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe() 
interactions_data = model.datacollector.get_table_dataframe("interactions")
reputation_events = model.datacollector.get_table_dataframe("reputation_events")

print("Model evolution:")
print(model_data[["Average Cooperation", "Average Reputation"]].head(10))

print("\nInteraction summary:")
print(interactions_data["outcome"].value_counts())

print("\nFinal agent states:")
print(agent_data.loc[agent_data.index.get_level_values("Step") == model.steps - 1].describe())

Multi-Agent Type Data Collection

from mesa import Agent, Model, DataCollector

class Predator(Agent):
    def __init__(self, model, energy=100):
        super().__init__(model)
        self.energy = energy
        self.hunting_success = 0
    
    def step(self):
        self.energy -= 2
        if self.energy <= 0:
            self.remove()

class Prey(Agent):
    def __init__(self, model, energy=50):
        super().__init__(model)
        self.energy = energy
        self.escapes = 0
    
    def step(self):
        self.energy -= 1
        if self.energy <= 0:
            self.remove()

class EcosystemModel(Model):
    def __init__(self, n_predators=10, n_prey=50):
        super().__init__()
        
        # Multi-type data collection
        self.datacollector = DataCollector(
            model_reporters={
                "Total Population": lambda m: len(m.agents),
                "Predator-Prey Ratio": lambda m: (
                    len(m.agents.select(agent_type=Predator)) / 
                    max(1, len(m.agents.select(agent_type=Prey)))
                ),
                "Ecosystem Stability": self.calculate_stability
            },
            agent_reporters={
                "Energy": "energy",
                "Type": lambda a: type(a).__name__
            },
            agenttype_reporters={
                "Count": lambda m, agent_type: len(m.agents.select(agent_type=agent_type)),
                "Average Energy": lambda m, agent_type: (
                    m.agents.select(agent_type=agent_type).agg("energy",
                    lambda vals: sum(vals) / len(vals) if vals else 0)
                ),
                "Total Energy": lambda m, agent_type: (
                    m.agents.select(agent_type=agent_type).agg("energy", sum)
                )
            }
        )
        
        # Create ecosystem
        for i in range(n_predators):
            Predator(self, energy=100)
        
        for i in range(n_prey):
            Prey(self, energy=50)
        
        self.running = True
    
    def calculate_stability(self):
        """Calculate ecosystem stability metric."""
        predators = len(self.agents.select(agent_type=Predator))
        prey = len(self.agents.select(agent_type=Prey))
        
        if predators == 0 or prey == 0:
            return 0.0
        
        # Stability based on balanced populations
        total = predators + prey
        predator_ratio = predators / total
        
        # Optimal ratio around 0.2 (20% predators)
        optimal_ratio = 0.2
        stability = 1.0 - abs(predator_ratio - optimal_ratio) / optimal_ratio
        return max(0.0, stability)
    
    def step(self):
        self.datacollector.collect(self)
        self.agents.shuffle_do("step")
        
        # Check for extinction
        predators = len(self.agents.select(agent_type=Predator))
        prey = len(self.agents.select(agent_type=Prey))
        
        if predators == 0 or prey == 0:
            self.running = False

# Run ecosystem simulation
model = EcosystemModel(n_predators=8, n_prey=40)
for i in range(200):
    model.step()
    if not model.running:
        break

# Analyze multi-type data
model_data = model.datacollector.get_model_vars_dataframe()
predator_data = model.datacollector.get_agenttype_vars_dataframe(Predator)
prey_data = model.datacollector.get_agenttype_vars_dataframe(Prey)

print("Ecosystem dynamics:")
print(model_data[["Total Population", "Predator-Prey Ratio", "Ecosystem Stability"]].tail(10))

print("\nPredator population:")
print(predator_data[["Count", "Average Energy"]].tail(5))

print("\nPrey population:")
print(prey_data[["Count", "Average Energy"]].tail(5))

Data Export and Analysis

Exporting Data

# Export data to files
model_df = model.datacollector.get_model_vars_dataframe()
agent_df = model.datacollector.get_agent_vars_dataframe()

# Save to CSV
model_df.to_csv("model_data.csv")
agent_df.to_csv("agent_data.csv")

# Save to other formats
model_df.to_parquet("model_data.parquet")
agent_df.to_json("agent_data.json")

# Custom table data
if "interactions" in model.datacollector.tables:
    interactions_df = model.datacollector.get_table_dataframe("interactions")
    interactions_df.to_csv("interactions.csv")

Data Analysis Integration

import pandas as pd
import numpy as np

# Get data for analysis
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe()

# Time series analysis
model_data["Population_Change"] = model_data["Total Agents"].diff()
model_data["Population_Growth_Rate"] = model_data["Population_Change"] / model_data["Total Agents"].shift(1)

# Agent-level analysis
# Calculate summary statistics by agent
agent_summary = agent_data.groupby(level="AgentID").agg({
    "Wealth": ["mean", "std", "min", "max"],
    "Transactions": "sum"
})

# Time-based agent analysis
# Get agent wealth over time
agent_wealth_over_time = agent_data.reset_index().pivot(
    index="Step", columns="AgentID", values="Wealth"
)

# Calculate correlation between agents
wealth_correlations = agent_wealth_over_time.corr()

print("Model summary statistics:")
print(model_data.describe())

print("\nAgent summary statistics:")
print(agent_summary.head())

Best Practices

Performance Optimization

# Efficient reporter functions
def efficient_model_reporter(model):
    """Example of efficient model reporter."""
    # Collect all needed data in one pass
    agent_data = model.agents.get(["wealth", "energy", "active"])
    wealth_values = [d[0] for d in agent_data]
    energy_values = [d[1] for d in agent_data]
    active_count = sum(1 for d in agent_data if d[2])
    
    return {
        "total_wealth": sum(wealth_values),
        "avg_wealth": sum(wealth_values) / len(wealth_values) if wealth_values else 0,
        "total_energy": sum(energy_values), 
        "active_agents": active_count
    }

# Conditional data collection
def conditional_collector(model):
    """Only collect data when needed."""
    if model.steps % 10 == 0:  # Collect every 10 steps
        return model.calculate_complex_metric()
    return None

Memory Management

# For long-running simulations, periodically export and clear data
class LongRunningModel(Model):
    def __init__(self):
        super().__init__()
        self.datacollector = DataCollector(
            model_reporters={"Population": lambda m: len(m.agents)}
        )
        self.export_interval = 1000
    
    def step(self):
        self.datacollector.collect(self)
        self.agents.shuffle_do("step")
        
        # Export and clear data periodically
        if self.steps % self.export_interval == 0:
            self.export_data()
            self.datacollector = DataCollector(
                model_reporters={"Population": lambda m: len(m.agents)}
            )
    
    def export_data(self):
        """Export data and clear collector."""
        df = self.datacollector.get_model_vars_dataframe()
        filename = f"data_batch_{self.steps // self.export_interval}.csv"
        df.to_csv(filename)

docs

batch-running.md

core.md

data-collection.md

experimental.md

index.md

spatial.md

tile.json