Python package for solving partial differential equations with a focus on ease of use and performance
—
Storage classes manage simulation data with support for various backends including memory, files, and movie generation with efficient data compression and retrieval.
Store simulation data in memory for fast access and analysis.
class MemoryStorage:
def __init__(self, *, write_mode="truncate_once"):
"""
Initialize in-memory storage.
Parameters:
- write_mode: str, how to handle multiple writes
("append", "truncate_once", "truncate_always")
"""
@classmethod
def from_fields(cls, fields, times=None, *, label=None, dtype=None):
"""
Create storage from field data.
Parameters:
- fields: list of FieldBase, field data
- times: array-like, time points for each field
- label: str, optional storage label
- dtype: data type for storage
Returns:
MemoryStorage containing the field data
"""
@classmethod
def from_collection(cls, field_collection, times=None):
"""
Create storage from field collection.
Parameters:
- field_collection: FieldCollection, collection of fields
- times: array-like, time points for each field
Returns:
MemoryStorage containing the collection data
"""
def append(self, data, info=None):
"""
Append data to storage.
Parameters:
- data: FieldBase, field data to store
- info: dict, optional metadata
"""
def __len__(self):
"""int: Number of stored data items"""
def __getitem__(self, index):
"""Get stored data by index or slice"""
@property
def times(self):
"""np.ndarray: Array of time points"""
@property
def data(self):
"""list: Stored field data"""
def clear(self, clear_data_shape=False):
"""
Clear all stored data.
Parameters:
- clear_data_shape: bool, whether to clear data shape info
"""
def start_writing(self, field, info=None):
"""
Prepare storage for writing field data.
Parameters:
- field: FieldBase, example field for metadata
- info: dict, optional additional information
"""
def get_memory_storage():
"""
Get shared memory storage instance.
Returns:
MemoryStorage: Shared storage instance
"""Store simulation data to files with support for various formats and compression.
class FileStorage:
def __init__(self, filename, *, info=None, write_mode="truncate_once"):
"""
Initialize file-based storage.
Parameters:
- filename: str, path to storage file
- info: dict, optional metadata to store
- write_mode: str, how to handle file writing
"""
@property
def filename(self):
"""str: Path to storage file"""
def write(self, data, *, info=None):
"""
Write data to file.
Parameters:
- data: FieldBase, field data to write
- info: dict, optional metadata
"""
def read(self, index=None):
"""
Read data from file.
Parameters:
- index: int or slice, data index to read (None for all)
Returns:
FieldBase or list: Field data
"""
@property
def items(self):
"""Generator: Iterate over all stored items"""
def close(self):
"""Close storage file"""
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()Create movies from simulation data with automatic frame generation.
class MovieStorage:
def __init__(self, filename, *, movie_writer="auto", **kwargs):
"""
Initialize movie storage.
Parameters:
- filename: str, output movie filename
- movie_writer: str, movie writing backend ("ffmpeg", "pillow")
- kwargs: additional movie parameters (fps, dpi, etc.)
"""
@property
def filename(self):
"""str: Output movie filename"""
def append(self, data, info=None):
"""
Add frame to movie.
Parameters:
- data: FieldBase, field data for frame
- info: dict, optional frame metadata
"""
def close(self):
"""Finalize and close movie file"""
@property
def is_writing(self):
"""bool: Whether movie is currently being written"""Integration with modelrunner platform for cloud-based data management.
class ModelrunnerStorage:
def __init__(self, *, info=None):
"""
Initialize modelrunner storage integration.
Parameters:
- info: dict, optional metadata
Note:
Only available if modelrunner is installed and configured
"""
def upload(self, data, *, tags=None):
"""
Upload data to modelrunner platform.
Parameters:
- data: FieldBase, field data to upload
- tags: list, optional tags for data organization
"""
def download(self, run_id, index=None):
"""
Download data from modelrunner platform.
Parameters:
- run_id: str, modelrunner run identifier
- index: int, specific data index to download
Returns:
FieldBase: Downloaded field data
"""Common functionality for all storage backends.
class StorageBase:
@property
def data_shape(self):
"""tuple: Shape of stored data"""
@property
def dtype(self):
"""np.dtype: Data type of stored fields"""
@property
def shape(self):
"""tuple: Shape including time dimension"""
@property
def has_collection(self):
"""bool: Whether storage contains field collections"""
@property
def grid(self):
"""GridBase: Grid associated with stored fields"""
def append(self, field, time=None):
"""
Append field data to storage.
Parameters:
- field: FieldBase, field to store
- time: float, time point for the field
"""
def clear(self, clear_data_shape=False):
"""
Clear stored data.
Parameters:
- clear_data_shape: bool, whether to clear data shape info
"""
def items(self):
"""
Iterate over stored time-field pairs.
Yields:
tuple: (time, field) pairs
"""
def tracker(self, interrupts='0.1', transformation=None, filename=None):
"""
Create tracker for this storage.
Parameters:
- interrupts: interrupt specification
- transformation: optional field transformation
- filename: optional filename for output
Returns:
StorageTracker: Tracker instance
"""
def start_writing(self, field, info=None):
"""
Prepare storage for writing.
Parameters:
- field: FieldBase, example field
- info: dict, optional metadata
"""
def end_writing(self):
"""
Finalize writing process.
"""
def view_field(self, field_id):
"""
Create view of specific field.
Parameters:
- field_id: int or str, field identifier
Returns:
StorageView: View of the field data
"""
def extract_field(self, field_id, *, copy=True):
"""
Extract specific field data.
Parameters:
- field_id: int or str, field identifier
- copy: bool, whether to copy data
Returns:
DataFieldBase: Extracted field
"""
def extract_time_range(self, t_start=None, t_end=None, *, copy=True):
"""
Extract data within time range.
Parameters:
- t_start: float, start time (None for beginning)
- t_end: float, end time (None for end)
- copy: bool, whether to copy data
Returns:
StorageBase: Storage with extracted data
"""
def apply(self, func, progress=None, parallel='auto', **kwargs):
"""
Apply function to all stored fields.
Parameters:
- func: callable, function to apply
- progress: bool, whether to show progress
- parallel: str or bool, parallelization mode
- kwargs: additional arguments for func
Returns:
Any: Result of function application
"""
def copy(self, filename=None):
"""
Create copy of storage.
Parameters:
- filename: str, optional filename for file-based storage
Returns:
StorageBase: Copy of storage
"""
@property
def write_mode(self):
"""str: Current write mode"""
def start_writing(self, field, info=None):
"""
Initialize storage for writing.
Parameters:
- field: FieldBase, example field for initialization
- info: dict, optional metadata
"""
def end_writing(self):
"""Finalize storage writing"""
@property
def has_collection(self):
"""bool: Whether storage contains FieldCollection data"""
def extract_field(self, *indices, **kwargs):
"""
Extract specific field from stored data.
Parameters:
- indices: field indices to extract
- kwargs: extraction options
Returns:
FieldBase: Extracted field
"""import pde
# Create storage
storage = pde.MemoryStorage()
# Run simulation with storage
grid = pde.UnitGrid([32], periodic=True)
eq = pde.DiffusionPDE(diffusivity=0.1)
state = pde.ScalarField.random_uniform(grid)
# Use DataTracker to fill storage
tracker = pde.DataTracker(storage=storage, interrupts=0.5)
result = eq.solve(state, t_range=5.0, tracker=tracker)
# Access stored data
print(f"Stored {len(storage)} time points")
print(f"Times: {storage.times}")
# Plot evolution
import matplotlib.pyplot as plt
for i, field in enumerate(storage):
if i % 2 == 0: # Plot every other frame
plt.plot(field.data, alpha=0.7, label=f't={storage.times[i]:.1f}')
plt.legend()
plt.show()import pde
# Set up large simulation
grid = pde.CartesianGrid([[0, 10], [0, 10]], [128, 128])
eq = pde.AllenCahnPDE()
state = eq.get_initial_condition(grid)
# Use file storage to handle large datasets
with pde.FileStorage("simulation_data.h5") as storage:
tracker = pde.DataTracker(storage=storage, interrupts=0.1)
result = eq.solve(state, t_range=10.0, tracker=tracker)
# Read back data for analysis
with pde.FileStorage("simulation_data.h5") as storage:
# Read specific time points
initial = storage.read(0)
final = storage.read(-1)
print(f"Initial average: {initial.average:.3f}")
print(f"Final average: {final.average:.3f}")
# Read time series data
all_data = storage.read()
print(f"Loaded {len(all_data)} time points")import pde
# 2D simulation for movie
grid = pde.CartesianGrid([[0, 10], [0, 10]], [64, 64])
eq = pde.SwiftHohenbergPDE()
state = eq.get_initial_condition(grid)
# Create movie during simulation
movie_storage = pde.MovieStorage(
"pattern_formation.mp4",
fps=10,
dpi=100
)
tracker = pde.DataTracker(storage=movie_storage, interrupts=0.1)
result = eq.solve(state, t_range=20.0, tracker=tracker)
print("Movie saved as pattern_formation.mp4")import pde
import pickle
class PickleStorage(pde.StorageBase):
"""Custom storage using pickle format"""
def __init__(self, filename):
super().__init__(write_mode="truncate_once")
self.filename = filename
self.data_list = []
self.time_list = []
def append(self, data, info=None):
self.data_list.append(data.copy())
self.time_list.append(info.get('t', 0) if info else 0)
def __len__(self):
return len(self.data_list)
def __getitem__(self, index):
return self.data_list[index]
def save(self):
with open(self.filename, 'wb') as f:
pickle.dump({
'data': self.data_list,
'times': self.time_list
}, f)
@classmethod
def load(cls, filename):
storage = cls(filename)
with open(filename, 'rb') as f:
saved = pickle.load(f)
storage.data_list = saved['data']
storage.time_list = saved['times']
return storage
# Use custom storage
grid = pde.UnitGrid([32], periodic=True)
eq = pde.DiffusionPDE()
state = pde.ScalarField.random_uniform(grid)
custom_storage = PickleStorage("custom_data.pkl")
tracker = pde.DataTracker(storage=custom_storage, interrupts=1.0)
result = eq.solve(state, t_range=5.0, tracker=tracker)
# Save and reload
custom_storage.save()
loaded_storage = PickleStorage.load("custom_data.pkl")
print(f"Loaded {len(loaded_storage)} time points")import pde
# Large dataset with compression
grid = pde.CartesianGrid([[0, 20], [0, 20]], [200, 200])
eq = pde.CahnHilliardPDE()
state = eq.get_initial_condition(grid)
# Configure file storage with compression
storage_config = {
"compression": "gzip",
"compression_opts": 9, # Maximum compression
"shuffle": True # Better compression for numerical data
}
with pde.FileStorage("large_simulation.h5", **storage_config) as storage:
tracker = pde.DataTracker(storage=storage, interrupts=0.2)
result = eq.solve(state, t_range=10.0, tracker=tracker)
print("Large simulation saved with compression")import pde
# Set up simulation
grid = pde.UnitGrid([64, 64], periodic=True)
eq = pde.AllenCahnPDE()
state = eq.get_initial_condition(grid)
# Multiple storage backends
storages = [
pde.MemoryStorage(), # For immediate analysis
pde.FileStorage("backup.h5"), # For archival
pde.MovieStorage("visualization.mp4") # For presentation
]
# Use multiple trackers with different storages
trackers = [
pde.DataTracker(storage=storages[0], interrupts=1.0), # Memory
pde.DataTracker(storage=storages[1], interrupts=0.5), # File
pde.DataTracker(storage=storages[2], interrupts=0.1) # Movie
]
result = eq.solve(state, t_range=10.0, tracker=trackers)
print(f"Memory storage: {len(storages[0])} points")
print("File and movie outputs saved")Install with Tessl CLI
npx tessl i tessl/pypi-py-pde