FloPy is a Python package to create, run, and post-process MODFLOW-based models
66
This module provides general utilities for model checking, parameter estimation, coordinate transformations, and integration with external tools. These utilities support model development, validation, analysis, and interoperability with other software systems.
Classes for reading MODFLOW binary output files including head files, budget files, and concentration files. These are essential for post-processing model results.
Read binary head output files from MODFLOW models, providing access to 2D or 3D head arrays and time series data.
class HeadFile:
"""Head file reader for MODFLOW binary output"""
def __init__(self, filename: Union[str, os.PathLike], text: str = 'head', precision: str = 'auto', verbose: bool = False, **kwargs): ...
def get_data(self, kstpkper=None, idx=None, totim=None, **kwargs) -> np.ndarray: ...
def get_alldata(self, nodata=-999.99) -> np.ndarray: ...
def get_ts(self, idx) -> np.ndarray: ...
@property
def headers() -> np.recarray: ...
@property
def times() -> list: ...Parameters:
filename (str | PathLike): Path to the head filetext (str): Text string identifier (default: 'head')precision (str): Data precision ('auto', 'single', 'double')verbose (bool): Enable verbose outputKey Methods:
get_data(): Retrieve head data for specific time stepsget_alldata(): Get all head data in the fileget_ts(): Get time series data for specific cellsRead binary cell budget files containing flow terms and mass balance information.
class CellBudgetFile:
"""Cell budget file reader for MODFLOW binary output"""
def __init__(self, filename: Union[str, os.PathLike], precision: str = 'single', verbose: bool = False, **kwargs): ...
def get_data(self, kstpkper=None, text=None, paknam=None, totim=None, idx=None, **kwargs) -> list | np.ndarray: ...
def get_times(self) -> list: ...
def get_kstpkper(self) -> list: ...
def list_records(self) -> list: ...
@property
def recordarray() -> np.recarray: ...Parameters:
filename (str | PathLike): Path to the budget fileprecision (str): Data precision ('single' or 'double')verbose (bool): Enable verbose outputKey Methods:
get_data(): Retrieve budget data by time step and flow componentget_times(): Get list of simulation timeslist_records(): List all available budget recordsRead binary concentration files from MT3DMS transport models.
class UcnFile:
"""Concentration file reader for MT3DMS binary output"""
def __init__(self, filename: Union[str, os.PathLike], text: str = 'concentration', precision: str = 'single', verbose: bool = False, **kwargs): ...
def get_data(self, kstpkper=None, idx=None, totim=None, **kwargs) -> np.ndarray: ...
def get_alldata(self, nodata=-999.99) -> np.ndarray: ...
@property
def headers() -> np.recarray: ...Perform zone budget analysis on MODFLOW cell budget data.
class ZoneBudget:
"""Zone budget analysis for MODFLOW models"""
def __init__(self, cbc_file, z, kstpkper=None, totim=None, aliases=None, verbose: bool = False, **kwargs): ...
def get_budget(self, names=None, zones=None, **kwargs) -> pd.DataFrame: ...
def get_dataframes(self, **kwargs) -> dict: ...
def to_csv(self, fname, **kwargs) -> None: ...
@staticmethod
def read_zone_file(fname) -> np.ndarray: ...Parameters:
cbc_file: CellBudgetFile object or path to budget filez (np.ndarray): Zone array defining budget zoneskstpkper (tuple): Time step and stress periodtotim (float): Total simulation timeKey Methods:
get_budget(): Calculate zone budget for specified zonesget_dataframes(): Get budget data as pandas DataFramesto_csv(): Export budget results to CSVHandle binary file headers for MODFLOW output files.
class BinaryHeader:
"""Binary file header handling"""
def __init__(self, bintype=None, precision: str = 'single'): ...
def set_values(self, **kwargs): ...
@staticmethod
def create(bintype=None, precision: str = 'single', **kwargs): ...Execute MODFLOW models with subprocess management and output capture.
def run_model(
namefile: str,
exe_name: str,
model_ws: str = '.',
silent: bool = False,
pause: bool = False,
report: bool = False,
normal_msg: str = 'normal termination',
**kwargs
) -> tuple[bool, list[str]]:
"""Execute model runs with subprocess management"""
...Parameters:
namefile (str): MODFLOW name fileexe_name (str): Executable name or pathmodel_ws (str): Model workspace directorysilent (bool): Suppress output during executionpause (bool): Pause for user input after executionreport (bool): Print execution reportnormal_msg (str): Message indicating successful completionReturns:
tuple[bool, list[str]]: Success status and output linesFind executable paths in system PATH.
def which(program: str) -> str:
"""Find executable paths (re-exported from shutil)"""
...Comprehensive model checking function for validation and error detection.
def check(
model: object,
f: str = None,
verbose: bool = True,
level: int = 1,
**kwargs
) -> object:
"""Model checking function for validation and diagnostics"""
...Parameters:
model (object): FloPy model object to checkf (str): Output file for check resultsverbose (bool): Print detailed check resultslevel (int): Check level (0=basic, 1=standard, 2=detailed)Returns:
object: Check results object with summary and detailsCalculate specific discharge from flow fields.
def get_specific_discharge(
frf: np.ndarray,
fff: np.ndarray,
flf: np.ndarray,
grid: object,
head: np.ndarray = None,
**kwargs
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate specific discharge from flow fields"""
...Parameters:
frf (np.ndarray): Flow right facefff (np.ndarray): Flow front faceflf (np.ndarray): Flow lower facegrid (object): Model grid objecthead (np.ndarray): Head array for calculationsReturns:
tuple[np.ndarray, np.ndarray, np.ndarray]: Specific discharge components (qx, qy, qz)Calculate transmissivities from hydraulic parameters.
def get_transmissivities(
heads: np.ndarray,
m: object,
r: int = None,
c: int = None,
x: float = None,
y: float = None,
sctop: np.ndarray = None,
scbot: np.ndarray = None,
**kwargs
) -> np.ndarray:
"""Calculate transmissivities from model parameters"""
...Parameters:
heads (np.ndarray): Head arraym (object): MODFLOW model objectr (int): Row index for specific locationc (int): Column index for specific locationx (float): X coordinate for interpolationy (float): Y coordinate for interpolationsctop (np.ndarray): Screen top elevationsscbot (np.ndarray): Screen bottom elevationsConvert model time to datetime objects.
def totim_to_datetime(
totim: ArrayData,
start: str = '1-1-1970',
timeunit: str = 'days',
**kwargs
) -> list:
"""Convert model time to datetime objects"""
...Parameters:
totim (ArrayData): Total elapsed timesstart (str): Starting date stringtimeunit (str): Time units ('days', 'hours', 'minutes', 'seconds', 'years')Returns:
list: List of datetime objectsParse MODFLOW name files and extract package information.
def parsenamefile(
namefile: str,
packages: dict = None,
**kwargs
) -> tuple[list, dict]:
"""Parse MODFLOW name files"""
...Parameters:
namefile (str): Path to MODFLOW name filepackages (dict): Package type definitionsReturns:
tuple[list, dict]: Package list and file dictionaryCreate empty record arrays with specified dtypes.
def create_empty_recarray(
length: int,
dtype: np.dtype,
default_value: float = 0.0,
**kwargs
) -> np.recarray:
"""Create empty record arrays"""
...Create and manipulate record arrays for MODFLOW data.
def recarray(
data: ArrayData,
dtype: np.dtype = None,
**kwargs
) -> np.recarray:
"""Create record arrays from data"""
...Slice record arrays with advanced indexing.
def ra_slice(
ra: np.recarray,
columns: list[str],
**kwargs
) -> np.recarray:
"""Slice record arrays by columns"""
...Read fixed format variables from files.
def read_fixed_var(
line: str,
dtype: type = float,
**kwargs
) -> object:
"""Read fixed format variables"""
...Write variables in fixed format.
def write_fixed_var(
value: object,
dtype: type = float,
width: int = 10,
**kwargs
) -> str:
"""Write fixed format variables"""
...Read 1D arrays from various sources.
def read1d(
f: object,
array: np.ndarray,
**kwargs
) -> np.ndarray:
"""Read 1D arrays from files or other sources"""
...Dynamic import utility for optional dependencies.
def import_optional_dependency(
name: str,
extra: str = None,
**kwargs
) -> object:
"""Dynamic import utility for optional packages"""
...Parameters:
name (str): Package name to importextra (str): Additional information for error messagesReturns:
object: Imported module or raises ImportError2D utility arrays for MODFLOW input data management.
class Util2d:
"""2D utility arrays for MODFLOW data"""
def __init__(
self,
model: object,
shape: tuple[int, int],
dtype: type = np.float32,
value: ArrayData = 0.0,
name: str = 'util2d',
fmtin: str = None,
**kwargs
): ...
@property
def array(self) -> np.ndarray:
"""Get array data"""
...
def get_file_entry(self) -> str:
"""Get file entry string for MODFLOW input"""
...
def write_file(
self,
f_name: str = None,
**kwargs
) -> str:
"""Write array to file"""
...
def load(
cls,
f: object,
model: object,
shape: tuple[int, int],
dtype: type,
**kwargs
) -> 'Util2d':
"""Load Util2d from file"""
...3D utility arrays for layered model data.
class Util3d:
"""3D utility arrays for MODFLOW data"""
def __init__(
self,
model: object,
shape: tuple[int, int, int],
dtype: type = np.float32,
value: ArrayData = 0.0,
name: str = 'util3d',
fmtin: str = None,
**kwargs
): ...
@property
def array(self) -> np.ndarray:
"""Get 3D array data"""
...
def get_file_entry(self) -> list[str]:
"""Get file entries for each layer"""
...
def write_file(
self,
f_name: str = None,
**kwargs
) -> list[str]:
"""Write 3D array to file(s)"""
...Transient 2D arrays for time-varying data.
class Transient2d:
"""Transient 2D arrays for time-varying MODFLOW data"""
def __init__(
self,
model: object,
shape: tuple[int, int],
dtype: type = np.float32,
value: ArrayData = 0.0,
name: str = 'transient2d',
**kwargs
): ...
def get_data_array(
self,
kper: int = 0,
**kwargs
) -> np.ndarray:
"""Get array for specific stress period"""
...
def add_record(
self,
kper: int,
array: np.ndarray,
**kwargs
) -> None:
"""Add array for stress period"""
...Transient 3D arrays for time-varying layered data.
class Transient3d:
"""Transient 3D arrays for time-varying MODFLOW data"""
def __init__(
self,
model: object,
shape: tuple[int, int, int],
dtype: type = np.float32,
value: ArrayData = 0.0,
name: str = 'transient3d',
**kwargs
): ...
def get_data_array(
self,
kper: int = 0,
**kwargs
) -> np.ndarray:
"""Get 3D array for specific stress period"""
...List-based data structures for boundary conditions and other tabular data.
class MfList:
"""List-based data structures for MODFLOW"""
def __init__(
self,
model: object,
dtype: np.dtype = None,
data: ArrayData = None,
**kwargs
): ...
@property
def array(self) -> np.recarray:
"""Get record array data"""
...
def add_record(
self,
kper: int,
values: list,
**kwargs
) -> None:
"""Add record for stress period"""
...
def get_data(
self,
kper: int = 0,
**kwargs
) -> np.recarray:
"""Get data for specific stress period"""
...Grid intersection utilities for spatial analysis.
class GridIntersect:
"""Grid intersection utilities"""
def __init__(
self,
grid: object,
method: str = 'vertex',
**kwargs
): ...
def intersect_point(
self,
point: tuple[float, float],
**kwargs
) -> int:
"""Find cell containing point"""
...
def intersect_linestring(
self,
linestring: list[tuple[float, float]],
**kwargs
) -> list[dict]:
"""Intersect linestring with grid"""
...
def intersect_polygon(
self,
polygon: list[tuple[float, float]],
**kwargs
) -> list[dict]:
"""Intersect polygon with grid"""
...Grid indexing utilities for coordinate transformations.
class ModflowGridIndices:
"""Grid indexing utilities"""
def __init__(
self,
grid: object,
**kwargs
): ...
def get_lrc_from_node(
self,
nodes: list[int],
**kwargs
) -> list[tuple[int, int, int]]:
"""Convert node numbers to layer-row-column"""
...
def get_node_from_lrc(
self,
lrc: list[tuple[int, int, int]],
**kwargs
) -> list[int]:
"""Convert layer-row-column to node numbers"""
...Raster data handling and manipulation.
class Raster:
"""Raster data handling"""
def __init__(
self,
array: np.ndarray,
bands: list = None,
crs: object = None,
transform: object = None,
**kwargs
): ...
def sample_polygon(
self,
polygon: list[tuple[float, float]],
**kwargs
) -> dict:
"""Sample raster values within polygon"""
...
def sample_points(
self,
points: list[tuple[float, float]],
**kwargs
) -> list[float]:
"""Sample raster values at points"""
...
def resample_to_grid(
self,
modelgrid: object,
**kwargs
) -> np.ndarray:
"""Resample raster to model grid"""
...Download and manage MODFLOW executables.
def get_modflow(
bindir: str = None,
**kwargs
) -> str:
"""Download and manage MODFLOW executables"""
...Command-line interface for get-modflow script.
def cli_main() -> None:
"""Command-line interface for get-modflow script"""
...Parse and manage MODFLOW option blocks.
class OptionBlock:
"""Options block parsing and management"""
def __init__(
self,
options_line: str,
package: object,
**kwargs
): ...
def parse_options(
self,
options_line: str,
**kwargs
) -> dict:
"""Parse options from input line"""
...
def write_options(
self,
f: object,
**kwargs
) -> None:
"""Write options to file"""
...import flopy
import flopy.utils as fpu
import numpy as np
# Create or load a model
mf = flopy.modflow.Modflow(modelname='test_model')
# Add basic packages
dis = flopy.modflow.ModflowDis(mf, nlay=1, nrow=10, ncol=10)
bas = flopy.modflow.ModflowBas(mf, ibound=1, strt=1.0)
lpf = flopy.modflow.ModflowLpf(mf, hk=1.0, sy=0.1, ss=1e-5)
pcg = flopy.modflow.ModflowPcg(mf)
oc = flopy.modflow.ModflowOc(mf)
# Write input files
mf.write_input()
# Check model before running
print("Checking model...")
chk = fpu.check(mf, verbose=True, level=2)
if chk.summary_array.passed.all():
print("All checks passed!")
# Run model using utility function
success, output = fpu.run_model(
namefile='test_model.nam',
exe_name='mf2005',
model_ws=mf.model_ws,
report=True
)
if success:
print("Model run completed successfully")
print("Output:")
for line in output[-10:]: # Last 10 lines
print(f" {line}")
else:
print("Model run failed")
for line in output:
print(f" {line}")
else:
print("Model check failed:")
failed_checks = chk.summary_array[~chk.summary_array.passed]
for check in failed_checks:
print(f" {check.desc}: {check.summary}")import flopy.utils as fpu
import numpy as np
import matplotlib.pyplot as plt
# Load model and results
mf = flopy.modflow.Modflow.load('model.nam')
# Read flow data
cbb = fpu.CellBudgetFile('model.cbb')
frf = cbb.get_data(text='FLOW RIGHT FACE')[0]
fff = cbb.get_data(text='FLOW FRONT FACE')[0]
flf = cbb.get_data(text='FLOW LOWER FACE')[0]
# Read head data
hds = fpu.HeadFile('model.hds')
head = hds.get_data(kstpkper=(0, 0))
# Calculate specific discharge
qx, qy, qz = fpu.get_specific_discharge(
frf, fff, flf,
grid=mf.modelgrid,
head=head
)
print(f"Specific discharge statistics:")
print(f" qx range: {qx.min():.2e} to {qx.max():.2e}")
print(f" qy range: {qy.min():.2e} to {qy.max():.2e}")
print(f" qz range: {qz.min():.2e} to {qz.max():.2e}")
# Calculate magnitude
q_magnitude = np.sqrt(qx**2 + qy**2 + qz**2)
print(f" q magnitude range: {q_magnitude.min():.2e} to {q_magnitude.max():.2e}")
# Calculate transmissivities at specific locations
well_locations = [(0, 5, 5), (0, 3, 7), (0, 8, 2)]
for i, (lay, row, col) in enumerate(well_locations):
trans = fpu.get_transmissivities(
heads=head,
m=mf,
r=row,
c=col
)
print(f"Transmissivity at well {i+1} (L{lay},R{row},C{col}): {trans[lay]:.2f}")
# Close files
cbb.close()
hds.close()import flopy
import flopy.utils as fpu
import numpy as np
# Create model
mf = flopy.modflow.Modflow(modelname='array_example')
# Create 2D array with Util2d
nrow, ncol = 20, 30
k_array = fpu.Util2d(
model=mf,
shape=(nrow, ncol),
dtype=np.float32,
value=1.0,
name='hydraulic_conductivity'
)
# Set different K values by zone
k_data = np.ones((nrow, ncol))
k_data[:10, :] = 5.0 # High K zone (top half)
k_data[10:, :] = 0.1 # Low K zone (bottom half)
k_data[:, :5] = 50.0 # Very high K zone (left edge)
k_array.array = k_data
print(f"K array statistics:")
print(f" Shape: {k_array.array.shape}")
print(f" Min: {k_array.array.min()}")
print(f" Max: {k_array.array.max()}")
print(f" Mean: {k_array.array.mean():.2f}")
# Create 3D array with Util3d
nlay = 3
porosity_3d = fpu.Util3d(
model=mf,
shape=(nlay, nrow, ncol),
dtype=np.float32,
value=0.25,
name='porosity'
)
# Set layer-specific porosity
por_data = np.full((nlay, nrow, ncol), 0.25)
por_data[0, :, :] = 0.35 # Higher porosity in top layer
por_data[1, :, :] = 0.20 # Lower porosity in middle layer
por_data[2, :, :] = 0.15 # Lowest porosity in bottom layer
porosity_3d.array = por_data
print(f"\nPorosity 3D array statistics:")
print(f" Shape: {porosity_3d.array.shape}")
for lay in range(nlay):
layer_data = porosity_3d.array[lay, :, :]
print(f" Layer {lay+1}: mean = {layer_data.mean():.3f}")
# Create transient 2D array for recharge
recharge_transient = fpu.Transient2d(
model=mf,
shape=(nrow, ncol),
dtype=np.float32,
value=0.001,
name='recharge'
)
# Add seasonal recharge patterns
base_recharge = np.full((nrow, ncol), 0.001)
# Winter (high recharge)
winter_recharge = base_recharge * 3.0
recharge_transient.add_record(0, winter_recharge)
# Spring (medium recharge)
spring_recharge = base_recharge * 1.5
recharge_transient.add_record(1, spring_recharge)
# Summer (low recharge)
summer_recharge = base_recharge * 0.5
recharge_transient.add_record(2, summer_recharge)
# Fall (medium recharge)
fall_recharge = base_recharge * 1.2
recharge_transient.add_record(3, fall_recharge)
print(f"\nTransient recharge statistics:")
for kper in range(4):
rch_data = recharge_transient.get_data_array(kper)
season = ['Winter', 'Spring', 'Summer', 'Fall'][kper]
print(f" {season}: mean = {rch_data.mean():.4f}")
# Create MfList for well data
well_dtype = np.dtype([
('k', int), ('i', int), ('j', int), ('flux', float)
])
well_data = fpu.MfList(
model=mf,
dtype=well_dtype
)
# Add wells for different stress periods
# Stress period 0: Base pumping
wells_sp0 = [
(0, 10, 15, -1000.0), # Production well
(0, 5, 20, -500.0), # Smaller well
(0, 15, 10, 200.0) # Injection well
]
for well in wells_sp0:
well_data.add_record(0, well)
# Stress period 1: Increased pumping
wells_sp1 = [
(0, 10, 15, -1500.0), # Increased pumping
(0, 5, 20, -750.0), # Increased pumping
(0, 15, 10, 300.0), # Increased injection
(0, 8, 25, -800.0) # Additional well
]
for well in wells_sp1:
well_data.add_record(1, well)
print(f"\nWell data statistics:")
for kper in range(2):
wells = well_data.get_data(kper)
total_pumping = np.sum(wells['flux'][wells['flux'] < 0])
total_injection = np.sum(wells['flux'][wells['flux'] > 0])
print(f" Period {kper}: {len(wells)} wells, pumping = {total_pumping:.0f}, injection = {total_injection:.0f}")import flopy.utils as fpu
import numpy as np
from datetime import datetime
# Model simulation times (in days)
totim = np.array([0, 30, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 365])
# Convert to datetime objects
start_date = '2023-01-01'
datetimes = fpu.totim_to_datetime(
totim=totim,
start=start_date,
timeunit='days'
)
print("Time conversion:")
print(f"Start date: {start_date}")
for i, (time, dt) in enumerate(zip(totim, datetimes)):
print(f" Day {time:3.0f}: {dt.strftime('%Y-%m-%d')}")
# Different time units
totim_hours = np.array([0, 6, 12, 24, 48, 72])
datetimes_hours = fpu.totim_to_datetime(
totim=totim_hours,
start='2023-06-15 08:00:00',
timeunit='hours'
)
print(f"\nHourly simulation:")
for time, dt in zip(totim_hours, datetimes_hours):
print(f" Hour {time:2.0f}: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
# Parse name file
try:
packages, files = fpu.parsenamefile('model.nam')
print(f"\nName file contents:")
print(f" Found {len(packages)} packages:")
for pkg in packages:
ftype, unit, fname, status = pkg
print(f" {ftype:8s} (unit {unit:2d}): {fname}")
print(f"\n File dictionary:")
for key, value in files.items():
print(f" {key}: {value}")
except FileNotFoundError:
print("Name file not found")
# Create and manipulate record arrays
dtype = np.dtype([
('layer', int),
('row', int),
('col', int),
('head', float),
('drawdown', float)
])
# Create empty record array
obs_data = fpu.create_empty_recarray(10, dtype, default_value=-999.0)
# Fill with synthetic observation data
for i in range(10):
obs_data[i] = (
1, # layer
np.random.randint(0, 20), # row
np.random.randint(0, 30), # col
100.0 + np.random.normal(0, 5), # head
np.random.exponential(2.0) # drawdown
)
print(f"\nObservation data:")
print(f" Array shape: {obs_data.shape}")
print(f" Data type: {obs_data.dtype}")
print(f" First 5 records:")
for i in range(5):
print(f" {obs_data[i]}")
# Slice record array
head_data = fpu.ra_slice(obs_data, ['layer', 'row', 'col', 'head'])
print(f"\nSliced array (head data only):")
print(f" New shape: {head_data.shape}")
print(f" New dtype: {head_data.dtype}")
# Statistics
print(f"\nObservation statistics:")
print(f" Mean head: {obs_data['head'].mean():.2f}")
print(f" Mean drawdown: {obs_data['drawdown'].mean():.2f}")
print(f" Head range: {obs_data['head'].min():.2f} to {obs_data['head'].max():.2f}")import flopy
import flopy.utils as fpu
import numpy as np
# Create model with grid
mf = flopy.modflow.Modflow(modelname='grid_example')
dis = flopy.modflow.ModflowDis(
mf,
nlay=1, nrow=20, ncol=30,
delr=100.0, delc=100.0,
top=50.0, botm=0.0,
xul=0.0, yul=2000.0 # Upper left coordinates
)
# Create grid intersect object
grid_intersect = fpu.GridIntersect(mf.modelgrid, method='vertex')
# Point intersections
test_points = [
(500.0, 1500.0), # Should be in grid
(1500.0, 1000.0), # Should be in grid
(5000.0, 500.0), # May be outside grid
]
print("Point intersections:")
for i, point in enumerate(test_points):
try:
cell_id = grid_intersect.intersect_point(point)
if cell_id >= 0:
# Convert to row, col
row, col = divmod(cell_id, mf.dis.ncol)
print(f" Point {i+1} ({point[0]}, {point[1]}): Cell {cell_id} (Row {row+1}, Col {col+1})")
else:
print(f" Point {i+1} ({point[0]}, {point[1]}): Outside grid")
except:
print(f" Point {i+1} ({point[0]}, {point[1]}): Error in intersection")
# Line intersection (river or stream)
river_line = [
(200.0, 1800.0), # Start point
(800.0, 1600.0), # Bend point
(1500.0, 1200.0), # Another bend
(2500.0, 800.0) # End point
]
print(f"\nRiver line intersection:")
try:
intersections = grid_intersect.intersect_linestring(river_line)
print(f" Line intersects {len(intersections)} cells:")
for intersection in intersections[:10]: # First 10 intersections
cell_id = intersection.get('cellid', -1)
if cell_id >= 0:
row, col = divmod(cell_id, mf.dis.ncol)
length = intersection.get('length', 0.0)
print(f" Cell {cell_id} (R{row+1},C{col+1}): Length = {length:.1f}")
except Exception as e:
print(f" Error in line intersection: {e}")
# Polygon intersection (capture zone or contamination area)
contamination_area = [
(1000.0, 1500.0), # Bottom left
(2000.0, 1500.0), # Bottom right
(2000.0, 1000.0), # Top right
(1000.0, 1000.0), # Top left
(1000.0, 1500.0) # Close polygon
]
print(f"\nContamination area intersection:")
try:
poly_intersections = grid_intersect.intersect_polygon(contamination_area)
print(f" Polygon intersects {len(poly_intersections)} cells:")
total_area = 0.0
for intersection in poly_intersections:
cell_id = intersection.get('cellid', -1)
if cell_id >= 0:
row, col = divmod(cell_id, mf.dis.ncol)
area = intersection.get('area', 0.0)
total_area += area
print(f" Cell {cell_id} (R{row+1},C{col+1}): Area = {area:.0f}")
print(f" Total intersection area: {total_area:.0f}")
except Exception as e:
print(f" Error in polygon intersection: {e}")
# Grid indexing utilities
grid_indices = fpu.ModflowGridIndices(mf.modelgrid)
# Convert between different indexing schemes
test_nodes = [0, 150, 299, 450, 599] # Some node numbers
lrc_coords = grid_indices.get_lrc_from_node(test_nodes)
print(f"\nNode to LRC conversion:")
for node, lrc in zip(test_nodes, lrc_coords):
print(f" Node {node:3d} -> Layer {lrc[0]+1}, Row {lrc[1]+1}, Col {lrc[2]+1}")
# Convert back
nodes_back = grid_indices.get_node_from_lrc(lrc_coords)
print(f"\nLRC to Node conversion (verification):")
for lrc, node in zip(lrc_coords, nodes_back):
print(f" Layer {lrc[0]+1}, Row {lrc[1]+1}, Col {lrc[2]+1} -> Node {node}")
# Grid statistics
print(f"\nGrid information:")
print(f" Total nodes: {mf.dis.nlay * mf.dis.nrow * mf.dis.ncol}")
print(f" Grid extent: {mf.modelgrid.extent}")
print(f" Cell size: {mf.dis.delr.array[0]} x {mf.dis.delc.array[0]}")
print(f" Grid area: {(mf.dis.ncol * mf.dis.delr.array[0]) * (mf.dis.nrow * mf.dis.delc.array[0]):,.0f} m²")# Utility function types
ArrayData = Union[int, float, np.ndarray, list]
ExecutionResult = tuple[bool, list[str]]
CheckResult = object
# Time and date types
TimeValue = Union[int, float]
TimeUnit = Literal['days', 'hours', 'minutes', 'seconds', 'years']
DateString = str
DateTimeList = list[datetime]
# File and path types
FilePath = Union[str, os.PathLike]
NameFileEntry = tuple[str, int, str, str] # (ftype, unit, fname, status)
FileDict = dict[str, str]
# Array and data types
DataType = Union[type, np.dtype]
RecordArray = np.recarray
ArrayShape = tuple[int, ...]
DefaultValue = Union[int, float, str]
# Grid and spatial types
GridMethod = Literal['vertex', 'centroid', 'structured']
Coordinates = tuple[float, float]
LineString = list[tuple[float, float]]
Polygon = list[tuple[float, float]]
CellID = int
IntersectionResult = dict[str, Union[int, float]]
# Model validation types
CheckLevel = Literal[0, 1, 2] # Basic, Standard, Detailed
ValidationResult = object
CheckSummary = np.ndarray
# Dependency types
ModuleName = str
ImportResult = object
OptionalExtra = str
# Flow analysis types
FlowArray = np.ndarray
DischargeComponents = tuple[np.ndarray, np.ndarray, np.ndarray]
TransmissivityArray = np.ndarray
VelocityField = tuple[np.ndarray, np.ndarray, np.ndarray]
# Format and I/O types
FormatString = str
FieldWidth = int
Precision = int
IOMode = Literal['r', 'w', 'a', 'rb', 'wb']This comprehensive documentation covers the complete utilities API for FloPy including model execution, validation, flow analysis, array utilities, grid operations, and various helper functions. The examples demonstrate basic to advanced utility usage scenarios including model checking, spatial analysis, time conversions, and data management operations.
Install with Tessl CLI
npx tessl i tessl/pypi-flopydocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10