CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dustpy

Dust evolution in protoplanetary disks

Overview
Eval results
Files

simulation-control.mddocs/

Simulation Control Functions

Functions for managing simulation execution, time-stepping, and integration schemes. These functions provide the core control logic for DustPy simulations, handling the integration of dust and gas evolution equations.

Capabilities

Time Step Management

Functions for calculating appropriate time steps for stable evolution.

from dustpy.std import sim

def sim.dt(sim):
    """
    Calculate time step from source terms.

    Determines an appropriate time step based on the current
    source terms to ensure numerical stability and accuracy.
    Uses the fastest evolving components to set the time scale.

    Parameters:
    - sim: Simulation object

    Returns:
    float: Time step [s]
    """

def sim.dt_adaptive(sim):
    """
    Calculate adaptive time step.

    Uses error estimation and adaptive algorithms to determine
    the optimal time step for the current simulation state,
    balancing accuracy and computational efficiency.

    Parameters:
    - sim: Simulation object

    Returns:
    float: Adaptive time step [s]
    """

Explicit Integration Control

Functions for managing explicit integration schemes.

def sim.prepare_explicit_dust(sim):
    """
    Prepare explicit integration for dust evolution.

    Sets up the simulation state and calculates necessary
    quantities before performing an explicit integration
    step for the dust component.

    Parameters:
    - sim: Simulation object

    Returns:
    None

    Notes:
    - Updates dust source terms
    - Prepares collision kernels
    - Calculates transport coefficients
    """

def sim.finalize_explicit_dust(sim):
    """
    Finalize explicit integration for dust evolution.

    Completes the explicit integration step by applying
    boundary conditions, enforcing constraints, and
    updating derived quantities.

    Parameters:
    - sim: Simulation object

    Returns:
    None

    Notes:
    - Applies boundary conditions
    - Enforces floor values
    - Updates dependent variables
    """

Implicit Integration Control

Functions for managing implicit integration schemes.

def sim.prepare_implicit_dust(sim):
    """
    Prepare implicit integration for dust evolution.

    Sets up the linear system and necessary matrices
    for implicit integration of the dust evolution
    equations, including Jacobian calculation.

    Parameters:
    - sim: Simulation object

    Returns:
    None

    Notes:
    - Calculates Jacobian matrix
    - Sets up linear system
    - Prepares solver parameters
    """

def sim.finalize_implicit_dust(sim):
    """
    Finalize implicit integration for dust evolution.

    Completes the implicit integration step by solving
    the linear system, updating the simulation state,
    and applying post-integration corrections.

    Parameters:
    - sim: Simulation object

    Returns:
    None

    Notes:
    - Solves linear system
    - Updates simulation state
    - Applies corrections and constraints
    """

Usage Examples

Basic Time Step Analysis

from dustpy import Simulation
from dustpy.std import sim
import dustpy.constants as c

# Create and initialize simulation
simulation = Simulation()
simulation.makegrids()
simulation.initialize()

# Calculate time steps
fixed_dt = sim.dt(simulation)           # [s]
adaptive_dt = sim.dt_adaptive(simulation) # [s]

# Convert to useful units
fixed_years = fixed_dt / c.year
adaptive_years = adaptive_dt / c.year

print(f"Fixed time step: {fixed_years:.2e} years")
print(f"Adaptive time step: {adaptive_years:.2e} years")
print(f"Ratio: {adaptive_dt/fixed_dt:.2f}")

Custom Integration Loop

def custom_integration_step(simulation, dt):
    """Perform one integration step with custom control."""

    # Prepare for explicit dust integration
    sim.prepare_explicit_dust(simulation)

    # Get current state
    old_sigma = simulation.dust.Sigma.copy()

    # Manual explicit step (simplified)
    source_terms = simulation.dust.S.tot
    simulation.dust.Sigma += source_terms * dt

    # Finalize the step
    sim.finalize_explicit_dust(simulation)

    # Check for convergence
    relative_change = np.abs(simulation.dust.Sigma - old_sigma) / (old_sigma + 1e-100)
    max_change = np.max(relative_change)

    print(f"Maximum relative change: {max_change:.2e}")

    return max_change

# Run custom integration
simulation = Simulation()
simulation.makegrids()
simulation.initialize()

dt = sim.dt(simulation)
change = custom_integration_step(simulation, dt)

Adaptive Time Stepping Loop

def run_with_adaptive_timestepping(simulation, final_time):
    """Run simulation with adaptive time stepping control."""

    import numpy as np

    current_time = 0.0
    step_count = 0
    max_steps = 10000

    while current_time < final_time and step_count < max_steps:

        # Calculate adaptive time step
        dt = sim.dt_adaptive(simulation)

        # Don't overshoot final time
        if current_time + dt > final_time:
            dt = final_time - current_time

        # Prepare integration
        sim.prepare_explicit_dust(simulation)

        # Store old state for error estimation
        old_state = simulation.dust.Sigma.copy()

        # Take integration step
        simulation.dust.Sigma += simulation.dust.S.tot * dt

        # Finalize integration
        sim.finalize_explicit_dust(simulation)

        # Update time and step count
        current_time += dt
        step_count += 1

        # Progress reporting
        if step_count % 100 == 0:
            print(f"Step {step_count}: t = {current_time/c.year:.2e} years, dt = {dt/c.year:.2e} years")

    print(f"Completed in {step_count} steps")
    return current_time

# Run adaptive simulation
final_time = 1e5 * c.year  # 100,000 years
actual_time = run_with_adaptive_timestepping(simulation, final_time)

Integration Scheme Comparison

def compare_integration_schemes(simulation):
    """Compare different integration approaches."""

    # Store initial state
    initial_sigma = simulation.dust.Sigma.copy()

    results = {}

    # Test explicit integration
    simulation.dust.Sigma[:] = initial_sigma
    sim.prepare_explicit_dust(simulation)
    dt_explicit = sim.dt(simulation)
    simulation.dust.Sigma += simulation.dust.S.tot * dt_explicit
    sim.finalize_explicit_dust(simulation)
    results['explicit'] = simulation.dust.Sigma.copy()

    # Reset and test implicit integration
    simulation.dust.Sigma[:] = initial_sigma
    sim.prepare_implicit_dust(simulation)
    # (Implicit step would be handled by the integrator)
    sim.finalize_implicit_dust(simulation)
    results['implicit'] = simulation.dust.Sigma.copy()

    # Compare results
    diff_explicit = np.abs(results['explicit'] - initial_sigma)
    diff_implicit = np.abs(results['implicit'] - initial_sigma)

    print(f"Explicit max change: {np.max(diff_explicit):.2e}")
    print(f"Implicit max change: {np.max(diff_implicit):.2e}")

    return results

# Compare schemes
results = compare_integration_schemes(simulation)

Time Step Monitoring

class TimeStepMonitor:
    """Monitor and log time step evolution during simulation."""

    def __init__(self):
        self.steps = []
        self.times = []
        self.dt_values = []

    def record_step(self, simulation, current_time):
        """Record time step information."""

        dt = sim.dt(simulation)
        dt_adaptive = sim.dt_adaptive(simulation)

        self.steps.append(len(self.steps))
        self.times.append(current_time)
        self.dt_values.append({'fixed': dt, 'adaptive': dt_adaptive})

        # Print periodic updates
        if len(self.steps) % 100 == 0:
            print(f"Step {len(self.steps)}: t = {current_time/c.year:.1e} yr, "
                  f"dt_fixed = {dt/c.year:.1e} yr, dt_adaptive = {dt_adaptive/c.year:.1e} yr")

    def plot_evolution(self):
        """Plot time step evolution."""
        import matplotlib.pyplot as plt

        times_yr = np.array(self.times) / c.year
        dt_fixed = [d['fixed'] for d in self.dt_values]
        dt_adaptive = [d['adaptive'] for d in self.dt_values]

        plt.figure(figsize=(10, 6))
        plt.loglog(times_yr, np.array(dt_fixed)/c.year, 'b-', label='Fixed dt')
        plt.loglog(times_yr, np.array(dt_adaptive)/c.year, 'r--', label='Adaptive dt')
        plt.xlabel('Time [years]')
        plt.ylabel('Time step [years]')
        plt.legend()
        plt.title('Time Step Evolution')
        plt.grid(True)
        plt.show()

# Use monitor
monitor = TimeStepMonitor()
# ... during simulation loop ...
# monitor.record_step(simulation, current_time)

Integration Error Analysis

def analyze_integration_accuracy(simulation, true_solution=None):
    """Analyze integration accuracy and stability."""

    # Take multiple time steps with different sizes
    dt_base = sim.dt(simulation)
    dt_sizes = [dt_base * f for f in [0.5, 1.0, 2.0]]

    results = {}

    for i, dt in enumerate(dt_sizes):
        # Store initial state
        initial_state = simulation.dust.Sigma.copy()

        # Prepare and execute integration step
        sim.prepare_explicit_dust(simulation)
        simulation.dust.Sigma += simulation.dust.S.tot * dt
        sim.finalize_explicit_dust(simulation)

        # Store result
        results[f'dt_{i}'] = simulation.dust.Sigma.copy()

        # Reset for next test
        simulation.dust.Sigma[:] = initial_state

    # Analyze convergence
    diff_01 = np.abs(results['dt_0'] - results['dt_1'])
    diff_12 = np.abs(results['dt_1'] - results['dt_2'])

    convergence_rate = np.log(np.max(diff_12) / np.max(diff_01)) / np.log(2.0)

    print(f"Convergence rate: {convergence_rate:.2f}")
    if convergence_rate > 0.8:
        print("Integration appears to be converging (rate > 0.8)")
    else:
        print("Integration may be unstable (rate < 0.8)")

    return results, convergence_rate

# Analyze accuracy
results, rate = analyze_integration_accuracy(simulation)

Custom Integration Callbacks

def setup_integration_callbacks(simulation):
    """Set up callbacks for integration monitoring."""

    def pre_step_callback():
        """Called before each integration step."""
        # Update coagulation parameters
        sim.prepare_explicit_dust(simulation)

        # Check for stability
        dt = sim.dt(simulation)
        if dt < 1e-10:  # Very small time step
            print("Warning: Time step becoming very small")

    def post_step_callback():
        """Called after each integration step."""
        # Finalize step
        sim.finalize_explicit_dust(simulation)

        # Check mass conservation
        total_mass = np.sum(simulation.dust.Sigma * simulation.grid.A)
        if hasattr(simulation, '_initial_mass'):
            mass_change = abs(total_mass - simulation._initial_mass) / simulation._initial_mass
            if mass_change > 0.01:  # 1% change
                print(f"Warning: Mass conservation error: {mass_change:.2%}")

    # Store callbacks
    simulation._pre_step = pre_step_callback
    simulation._post_step = post_step_callback

    # Store initial mass for conservation check
    simulation._initial_mass = np.sum(simulation.dust.Sigma * simulation.grid.A)

# Set up monitoring
setup_integration_callbacks(simulation)

Install with Tessl CLI

npx tessl i tessl/pypi-dustpy

docs

constants.md

dust-physics.md

gas-physics.md

grid-stellar.md

index.md

plotting.md

simulation-control.md

simulation.md

utilities.md

tile.json