Dust evolution in protoplanetary disks
Functions for managing simulation execution, time-stepping, and integration schemes. These functions provide the core control logic for DustPy simulations, handling the integration of dust and gas evolution equations.
Functions for calculating appropriate time steps for stable evolution.
from dustpy.std import sim
def sim.dt(sim):
"""
Calculate time step from source terms.
Determines an appropriate time step based on the current
source terms to ensure numerical stability and accuracy.
Uses the fastest evolving components to set the time scale.
Parameters:
- sim: Simulation object
Returns:
float: Time step [s]
"""
def sim.dt_adaptive(sim):
"""
Calculate adaptive time step.
Uses error estimation and adaptive algorithms to determine
the optimal time step for the current simulation state,
balancing accuracy and computational efficiency.
Parameters:
- sim: Simulation object
Returns:
float: Adaptive time step [s]
"""Functions for managing explicit integration schemes.
def sim.prepare_explicit_dust(sim):
"""
Prepare explicit integration for dust evolution.
Sets up the simulation state and calculates necessary
quantities before performing an explicit integration
step for the dust component.
Parameters:
- sim: Simulation object
Returns:
None
Notes:
- Updates dust source terms
- Prepares collision kernels
- Calculates transport coefficients
"""
def sim.finalize_explicit_dust(sim):
"""
Finalize explicit integration for dust evolution.
Completes the explicit integration step by applying
boundary conditions, enforcing constraints, and
updating derived quantities.
Parameters:
- sim: Simulation object
Returns:
None
Notes:
- Applies boundary conditions
- Enforces floor values
- Updates dependent variables
"""Functions for managing implicit integration schemes.
def sim.prepare_implicit_dust(sim):
"""
Prepare implicit integration for dust evolution.
Sets up the linear system and necessary matrices
for implicit integration of the dust evolution
equations, including Jacobian calculation.
Parameters:
- sim: Simulation object
Returns:
None
Notes:
- Calculates Jacobian matrix
- Sets up linear system
- Prepares solver parameters
"""
def sim.finalize_implicit_dust(sim):
"""
Finalize implicit integration for dust evolution.
Completes the implicit integration step by solving
the linear system, updating the simulation state,
and applying post-integration corrections.
Parameters:
- sim: Simulation object
Returns:
None
Notes:
- Solves linear system
- Updates simulation state
- Applies corrections and constraints
"""from dustpy import Simulation
from dustpy.std import sim
import dustpy.constants as c
# Create and initialize simulation
simulation = Simulation()
simulation.makegrids()
simulation.initialize()
# Calculate time steps
fixed_dt = sim.dt(simulation) # [s]
adaptive_dt = sim.dt_adaptive(simulation) # [s]
# Convert to useful units
fixed_years = fixed_dt / c.year
adaptive_years = adaptive_dt / c.year
print(f"Fixed time step: {fixed_years:.2e} years")
print(f"Adaptive time step: {adaptive_years:.2e} years")
print(f"Ratio: {adaptive_dt/fixed_dt:.2f}")def custom_integration_step(simulation, dt):
"""Perform one integration step with custom control."""
# Prepare for explicit dust integration
sim.prepare_explicit_dust(simulation)
# Get current state
old_sigma = simulation.dust.Sigma.copy()
# Manual explicit step (simplified)
source_terms = simulation.dust.S.tot
simulation.dust.Sigma += source_terms * dt
# Finalize the step
sim.finalize_explicit_dust(simulation)
# Check for convergence
relative_change = np.abs(simulation.dust.Sigma - old_sigma) / (old_sigma + 1e-100)
max_change = np.max(relative_change)
print(f"Maximum relative change: {max_change:.2e}")
return max_change
# Run custom integration
simulation = Simulation()
simulation.makegrids()
simulation.initialize()
dt = sim.dt(simulation)
change = custom_integration_step(simulation, dt)def run_with_adaptive_timestepping(simulation, final_time):
"""Run simulation with adaptive time stepping control."""
import numpy as np
current_time = 0.0
step_count = 0
max_steps = 10000
while current_time < final_time and step_count < max_steps:
# Calculate adaptive time step
dt = sim.dt_adaptive(simulation)
# Don't overshoot final time
if current_time + dt > final_time:
dt = final_time - current_time
# Prepare integration
sim.prepare_explicit_dust(simulation)
# Store old state for error estimation
old_state = simulation.dust.Sigma.copy()
# Take integration step
simulation.dust.Sigma += simulation.dust.S.tot * dt
# Finalize integration
sim.finalize_explicit_dust(simulation)
# Update time and step count
current_time += dt
step_count += 1
# Progress reporting
if step_count % 100 == 0:
print(f"Step {step_count}: t = {current_time/c.year:.2e} years, dt = {dt/c.year:.2e} years")
print(f"Completed in {step_count} steps")
return current_time
# Run adaptive simulation
final_time = 1e5 * c.year # 100,000 years
actual_time = run_with_adaptive_timestepping(simulation, final_time)def compare_integration_schemes(simulation):
"""Compare different integration approaches."""
# Store initial state
initial_sigma = simulation.dust.Sigma.copy()
results = {}
# Test explicit integration
simulation.dust.Sigma[:] = initial_sigma
sim.prepare_explicit_dust(simulation)
dt_explicit = sim.dt(simulation)
simulation.dust.Sigma += simulation.dust.S.tot * dt_explicit
sim.finalize_explicit_dust(simulation)
results['explicit'] = simulation.dust.Sigma.copy()
# Reset and test implicit integration
simulation.dust.Sigma[:] = initial_sigma
sim.prepare_implicit_dust(simulation)
# (Implicit step would be handled by the integrator)
sim.finalize_implicit_dust(simulation)
results['implicit'] = simulation.dust.Sigma.copy()
# Compare results
diff_explicit = np.abs(results['explicit'] - initial_sigma)
diff_implicit = np.abs(results['implicit'] - initial_sigma)
print(f"Explicit max change: {np.max(diff_explicit):.2e}")
print(f"Implicit max change: {np.max(diff_implicit):.2e}")
return results
# Compare schemes
results = compare_integration_schemes(simulation)class TimeStepMonitor:
"""Monitor and log time step evolution during simulation."""
def __init__(self):
self.steps = []
self.times = []
self.dt_values = []
def record_step(self, simulation, current_time):
"""Record time step information."""
dt = sim.dt(simulation)
dt_adaptive = sim.dt_adaptive(simulation)
self.steps.append(len(self.steps))
self.times.append(current_time)
self.dt_values.append({'fixed': dt, 'adaptive': dt_adaptive})
# Print periodic updates
if len(self.steps) % 100 == 0:
print(f"Step {len(self.steps)}: t = {current_time/c.year:.1e} yr, "
f"dt_fixed = {dt/c.year:.1e} yr, dt_adaptive = {dt_adaptive/c.year:.1e} yr")
def plot_evolution(self):
"""Plot time step evolution."""
import matplotlib.pyplot as plt
times_yr = np.array(self.times) / c.year
dt_fixed = [d['fixed'] for d in self.dt_values]
dt_adaptive = [d['adaptive'] for d in self.dt_values]
plt.figure(figsize=(10, 6))
plt.loglog(times_yr, np.array(dt_fixed)/c.year, 'b-', label='Fixed dt')
plt.loglog(times_yr, np.array(dt_adaptive)/c.year, 'r--', label='Adaptive dt')
plt.xlabel('Time [years]')
plt.ylabel('Time step [years]')
plt.legend()
plt.title('Time Step Evolution')
plt.grid(True)
plt.show()
# Use monitor
monitor = TimeStepMonitor()
# ... during simulation loop ...
# monitor.record_step(simulation, current_time)def analyze_integration_accuracy(simulation, true_solution=None):
"""Analyze integration accuracy and stability."""
# Take multiple time steps with different sizes
dt_base = sim.dt(simulation)
dt_sizes = [dt_base * f for f in [0.5, 1.0, 2.0]]
results = {}
for i, dt in enumerate(dt_sizes):
# Store initial state
initial_state = simulation.dust.Sigma.copy()
# Prepare and execute integration step
sim.prepare_explicit_dust(simulation)
simulation.dust.Sigma += simulation.dust.S.tot * dt
sim.finalize_explicit_dust(simulation)
# Store result
results[f'dt_{i}'] = simulation.dust.Sigma.copy()
# Reset for next test
simulation.dust.Sigma[:] = initial_state
# Analyze convergence
diff_01 = np.abs(results['dt_0'] - results['dt_1'])
diff_12 = np.abs(results['dt_1'] - results['dt_2'])
convergence_rate = np.log(np.max(diff_12) / np.max(diff_01)) / np.log(2.0)
print(f"Convergence rate: {convergence_rate:.2f}")
if convergence_rate > 0.8:
print("Integration appears to be converging (rate > 0.8)")
else:
print("Integration may be unstable (rate < 0.8)")
return results, convergence_rate
# Analyze accuracy
results, rate = analyze_integration_accuracy(simulation)def setup_integration_callbacks(simulation):
"""Set up callbacks for integration monitoring."""
def pre_step_callback():
"""Called before each integration step."""
# Update coagulation parameters
sim.prepare_explicit_dust(simulation)
# Check for stability
dt = sim.dt(simulation)
if dt < 1e-10: # Very small time step
print("Warning: Time step becoming very small")
def post_step_callback():
"""Called after each integration step."""
# Finalize step
sim.finalize_explicit_dust(simulation)
# Check mass conservation
total_mass = np.sum(simulation.dust.Sigma * simulation.grid.A)
if hasattr(simulation, '_initial_mass'):
mass_change = abs(total_mass - simulation._initial_mass) / simulation._initial_mass
if mass_change > 0.01: # 1% change
print(f"Warning: Mass conservation error: {mass_change:.2%}")
# Store callbacks
simulation._pre_step = pre_step_callback
simulation._post_step = post_step_callback
# Store initial mass for conservation check
simulation._initial_mass = np.sum(simulation.dust.Sigma * simulation.grid.A)
# Set up monitoring
setup_integration_callbacks(simulation)Install with Tessl CLI
npx tessl i tessl/pypi-dustpy