CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flopy

FloPy is a Python package to create, run, and post-process MODFLOW-based models

66

1.13x
Overview
Eval results
Files

utilities.mddocs/

Utilities and Helper Functions

This module provides general utilities for model checking, parameter estimation, coordinate transformations, and integration with external tools. These utilities support model development, validation, analysis, and interoperability with other software systems.

Binary File Readers

Classes for reading MODFLOW binary output files including head files, budget files, and concentration files. These are essential for post-processing model results.

HeadFile

Read binary head output files from MODFLOW models, providing access to 2D or 3D head arrays and time series data.

class HeadFile:
    """Head file reader for MODFLOW binary output"""
    def __init__(self, filename: Union[str, os.PathLike], text: str = 'head', precision: str = 'auto', verbose: bool = False, **kwargs): ...
    def get_data(self, kstpkper=None, idx=None, totim=None, **kwargs) -> np.ndarray: ...
    def get_alldata(self, nodata=-999.99) -> np.ndarray: ...
    def get_ts(self, idx) -> np.ndarray: ...
    @property
    def headers() -> np.recarray: ...
    @property  
    def times() -> list: ...

Parameters:

  • filename (str | PathLike): Path to the head file
  • text (str): Text string identifier (default: 'head')
  • precision (str): Data precision ('auto', 'single', 'double')
  • verbose (bool): Enable verbose output

Key Methods:

  • get_data(): Retrieve head data for specific time steps
  • get_alldata(): Get all head data in the file
  • get_ts(): Get time series data for specific cells

CellBudgetFile

Read binary cell budget files containing flow terms and mass balance information.

class CellBudgetFile:
    """Cell budget file reader for MODFLOW binary output"""
    def __init__(self, filename: Union[str, os.PathLike], precision: str = 'single', verbose: bool = False, **kwargs): ...
    def get_data(self, kstpkper=None, text=None, paknam=None, totim=None, idx=None, **kwargs) -> list | np.ndarray: ...
    def get_times(self) -> list: ...
    def get_kstpkper(self) -> list: ...
    def list_records(self) -> list: ...
    @property
    def recordarray() -> np.recarray: ...

Parameters:

  • filename (str | PathLike): Path to the budget file
  • precision (str): Data precision ('single' or 'double')
  • verbose (bool): Enable verbose output

Key Methods:

  • get_data(): Retrieve budget data by time step and flow component
  • get_times(): Get list of simulation times
  • list_records(): List all available budget records

UcnFile

Read binary concentration files from MT3DMS transport models.

class UcnFile:
    """Concentration file reader for MT3DMS binary output"""
    def __init__(self, filename: Union[str, os.PathLike], text: str = 'concentration', precision: str = 'single', verbose: bool = False, **kwargs): ...
    def get_data(self, kstpkper=None, idx=None, totim=None, **kwargs) -> np.ndarray: ...
    def get_alldata(self, nodata=-999.99) -> np.ndarray: ...
    @property
    def headers() -> np.recarray: ...

ZoneBudget

Perform zone budget analysis on MODFLOW cell budget data.

class ZoneBudget:
    """Zone budget analysis for MODFLOW models"""
    def __init__(self, cbc_file, z, kstpkper=None, totim=None, aliases=None, verbose: bool = False, **kwargs): ...
    def get_budget(self, names=None, zones=None, **kwargs) -> pd.DataFrame: ...
    def get_dataframes(self, **kwargs) -> dict: ...
    def to_csv(self, fname, **kwargs) -> None: ...
    @staticmethod
    def read_zone_file(fname) -> np.ndarray: ...

Parameters:

  • cbc_file: CellBudgetFile object or path to budget file
  • z (np.ndarray): Zone array defining budget zones
  • kstpkper (tuple): Time step and stress period
  • totim (float): Total simulation time

Key Methods:

  • get_budget(): Calculate zone budget for specified zones
  • get_dataframes(): Get budget data as pandas DataFrames
  • to_csv(): Export budget results to CSV

BinaryHeader

Handle binary file headers for MODFLOW output files.

class BinaryHeader:
    """Binary file header handling"""
    def __init__(self, bintype=None, precision: str = 'single'): ...
    def set_values(self, **kwargs): ...
    @staticmethod
    def create(bintype=None, precision: str = 'single', **kwargs): ...

Model Execution

run_model

Execute MODFLOW models with subprocess management and output capture.

def run_model(
    namefile: str,
    exe_name: str,
    model_ws: str = '.',
    silent: bool = False,
    pause: bool = False,
    report: bool = False,
    normal_msg: str = 'normal termination',
    **kwargs
) -> tuple[bool, list[str]]:
    """Execute model runs with subprocess management"""
    ...

Parameters:

  • namefile (str): MODFLOW name file
  • exe_name (str): Executable name or path
  • model_ws (str): Model workspace directory
  • silent (bool): Suppress output during execution
  • pause (bool): Pause for user input after execution
  • report (bool): Print execution report
  • normal_msg (str): Message indicating successful completion

Returns:

  • tuple[bool, list[str]]: Success status and output lines

which

Find executable paths in system PATH.

def which(program: str) -> str:
    """Find executable paths (re-exported from shutil)"""
    ...

Model Validation

check

Comprehensive model checking function for validation and error detection.

def check(
    model: object,
    f: str = None,
    verbose: bool = True,
    level: int = 1,
    **kwargs
) -> object:
    """Model checking function for validation and diagnostics"""
    ...

Parameters:

  • model (object): FloPy model object to check
  • f (str): Output file for check results
  • verbose (bool): Print detailed check results
  • level (int): Check level (0=basic, 1=standard, 2=detailed)

Returns:

  • object: Check results object with summary and details

Flow Analysis

get_specific_discharge

Calculate specific discharge from flow fields.

def get_specific_discharge(
    frf: np.ndarray,
    fff: np.ndarray, 
    flf: np.ndarray,
    grid: object,
    head: np.ndarray = None,
    **kwargs
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Calculate specific discharge from flow fields"""
    ...

Parameters:

  • frf (np.ndarray): Flow right face
  • fff (np.ndarray): Flow front face
  • flf (np.ndarray): Flow lower face
  • grid (object): Model grid object
  • head (np.ndarray): Head array for calculations

Returns:

  • tuple[np.ndarray, np.ndarray, np.ndarray]: Specific discharge components (qx, qy, qz)

get_transmissivities

Calculate transmissivities from hydraulic parameters.

def get_transmissivities(
    heads: np.ndarray,
    m: object,
    r: int = None,
    c: int = None,
    x: float = None,
    y: float = None,
    sctop: np.ndarray = None,
    scbot: np.ndarray = None,
    **kwargs
) -> np.ndarray:
    """Calculate transmissivities from model parameters"""
    ...

Parameters:

  • heads (np.ndarray): Head array
  • m (object): MODFLOW model object
  • r (int): Row index for specific location
  • c (int): Column index for specific location
  • x (float): X coordinate for interpolation
  • y (float): Y coordinate for interpolation
  • sctop (np.ndarray): Screen top elevations
  • scbot (np.ndarray): Screen bottom elevations

Time Utilities

totim_to_datetime

Convert model time to datetime objects.

def totim_to_datetime(
    totim: ArrayData,
    start: str = '1-1-1970',
    timeunit: str = 'days',
    **kwargs
) -> list:
    """Convert model time to datetime objects"""
    ...

Parameters:

  • totim (ArrayData): Total elapsed times
  • start (str): Starting date string
  • timeunit (str): Time units ('days', 'hours', 'minutes', 'seconds', 'years')

Returns:

  • list: List of datetime objects

File Parsing

parsenamefile

Parse MODFLOW name files and extract package information.

def parsenamefile(
    namefile: str,
    packages: dict = None,
    **kwargs
) -> tuple[list, dict]:
    """Parse MODFLOW name files"""
    ...

Parameters:

  • namefile (str): Path to MODFLOW name file
  • packages (dict): Package type definitions

Returns:

  • tuple[list, dict]: Package list and file dictionary

Array Utilities

create_empty_recarray

Create empty record arrays with specified dtypes.

def create_empty_recarray(
    length: int,
    dtype: np.dtype,
    default_value: float = 0.0,
    **kwargs
) -> np.recarray:
    """Create empty record arrays"""
    ...

recarray

Create and manipulate record arrays for MODFLOW data.

def recarray(
    data: ArrayData,
    dtype: np.dtype = None,
    **kwargs
) -> np.recarray:
    """Create record arrays from data"""
    ...

ra_slice

Slice record arrays with advanced indexing.

def ra_slice(
    ra: np.recarray,
    columns: list[str],
    **kwargs
) -> np.recarray:
    """Slice record arrays by columns"""
    ...

File I/O Utilities

read_fixed_var

Read fixed format variables from files.

def read_fixed_var(
    line: str,
    dtype: type = float,
    **kwargs
) -> object:
    """Read fixed format variables"""
    ...

write_fixed_var

Write variables in fixed format.

def write_fixed_var(
    value: object,
    dtype: type = float,
    width: int = 10,
    **kwargs
) -> str:
    """Write fixed format variables"""
    ...

read1d

Read 1D arrays from various sources.

def read1d(
    f: object,
    array: np.ndarray,
    **kwargs
) -> np.ndarray:
    """Read 1D arrays from files or other sources"""
    ...

Dependency Management

import_optional_dependency

Dynamic import utility for optional dependencies.

def import_optional_dependency(
    name: str,
    extra: str = None,
    **kwargs
) -> object:
    """Dynamic import utility for optional packages"""
    ...

Parameters:

  • name (str): Package name to import
  • extra (str): Additional information for error messages

Returns:

  • object: Imported module or raises ImportError

Array Classes

Util2d

2D utility arrays for MODFLOW input data management.

class Util2d:
    """2D utility arrays for MODFLOW data"""
    def __init__(
        self,
        model: object,
        shape: tuple[int, int],
        dtype: type = np.float32,
        value: ArrayData = 0.0,
        name: str = 'util2d',
        fmtin: str = None,
        **kwargs
    ): ...
    
    @property
    def array(self) -> np.ndarray:
        """Get array data"""
        ...
    
    def get_file_entry(self) -> str:
        """Get file entry string for MODFLOW input"""
        ...
    
    def write_file(
        self,
        f_name: str = None,
        **kwargs
    ) -> str:
        """Write array to file"""
        ...
    
    def load(
        cls,
        f: object,
        model: object,
        shape: tuple[int, int],
        dtype: type,
        **kwargs
    ) -> 'Util2d':
        """Load Util2d from file"""
        ...

Util3d

3D utility arrays for layered model data.

class Util3d:
    """3D utility arrays for MODFLOW data"""
    def __init__(
        self,
        model: object,
        shape: tuple[int, int, int],
        dtype: type = np.float32,
        value: ArrayData = 0.0,
        name: str = 'util3d',
        fmtin: str = None,
        **kwargs
    ): ...
    
    @property
    def array(self) -> np.ndarray:
        """Get 3D array data"""
        ...
    
    def get_file_entry(self) -> list[str]:
        """Get file entries for each layer"""
        ...
    
    def write_file(
        self,
        f_name: str = None,
        **kwargs
    ) -> list[str]:
        """Write 3D array to file(s)"""
        ...

Transient2d

Transient 2D arrays for time-varying data.

class Transient2d:
    """Transient 2D arrays for time-varying MODFLOW data"""
    def __init__(
        self,
        model: object,
        shape: tuple[int, int],
        dtype: type = np.float32,
        value: ArrayData = 0.0,
        name: str = 'transient2d',
        **kwargs
    ): ...
    
    def get_data_array(
        self,
        kper: int = 0,
        **kwargs
    ) -> np.ndarray:
        """Get array for specific stress period"""
        ...
    
    def add_record(
        self,
        kper: int,
        array: np.ndarray,
        **kwargs
    ) -> None:
        """Add array for stress period"""
        ...

Transient3d

Transient 3D arrays for time-varying layered data.

class Transient3d:
    """Transient 3D arrays for time-varying MODFLOW data"""
    def __init__(
        self,
        model: object,
        shape: tuple[int, int, int],
        dtype: type = np.float32,
        value: ArrayData = 0.0,
        name: str = 'transient3d',
        **kwargs
    ): ...
    
    def get_data_array(
        self,
        kper: int = 0,
        **kwargs
    ) -> np.ndarray:
        """Get 3D array for specific stress period"""
        ...

MfList

List-based data structures for boundary conditions and other tabular data.

class MfList:
    """List-based data structures for MODFLOW"""
    def __init__(
        self,
        model: object,
        dtype: np.dtype = None,
        data: ArrayData = None,
        **kwargs
    ): ...
    
    @property
    def array(self) -> np.recarray:
        """Get record array data"""
        ...
    
    def add_record(
        self,
        kper: int,
        values: list,
        **kwargs
    ) -> None:
        """Add record for stress period"""
        ...
    
    def get_data(
        self,
        kper: int = 0,
        **kwargs
    ) -> np.recarray:
        """Get data for specific stress period"""
        ...

Grid Utilities

GridIntersect

Grid intersection utilities for spatial analysis.

class GridIntersect:
    """Grid intersection utilities"""
    def __init__(
        self,
        grid: object,
        method: str = 'vertex',
        **kwargs
    ): ...
    
    def intersect_point(
        self,
        point: tuple[float, float],
        **kwargs
    ) -> int:
        """Find cell containing point"""
        ...
    
    def intersect_linestring(
        self,
        linestring: list[tuple[float, float]],
        **kwargs
    ) -> list[dict]:
        """Intersect linestring with grid"""
        ...
    
    def intersect_polygon(
        self,
        polygon: list[tuple[float, float]],
        **kwargs
    ) -> list[dict]:
        """Intersect polygon with grid"""
        ...

ModflowGridIndices

Grid indexing utilities for coordinate transformations.

class ModflowGridIndices:
    """Grid indexing utilities"""
    def __init__(
        self,
        grid: object,
        **kwargs
    ): ...
    
    def get_lrc_from_node(
        self,
        nodes: list[int],
        **kwargs
    ) -> list[tuple[int, int, int]]:
        """Convert node numbers to layer-row-column"""
        ...
    
    def get_node_from_lrc(
        self,
        lrc: list[tuple[int, int, int]],
        **kwargs
    ) -> list[int]:
        """Convert layer-row-column to node numbers"""
        ...

Raster

Raster data handling and manipulation.

class Raster:
    """Raster data handling"""
    def __init__(
        self,
        array: np.ndarray,
        bands: list = None,
        crs: object = None,
        transform: object = None,
        **kwargs
    ): ...
    
    def sample_polygon(
        self,
        polygon: list[tuple[float, float]],
        **kwargs
    ) -> dict:
        """Sample raster values within polygon"""
        ...
    
    def sample_points(
        self,
        points: list[tuple[float, float]],
        **kwargs
    ) -> list[float]:
        """Sample raster values at points"""
        ...
    
    def resample_to_grid(
        self,
        modelgrid: object,
        **kwargs
    ) -> np.ndarray:
        """Resample raster to model grid"""
        ...

MODFLOW Executable Management

get_modflow

Download and manage MODFLOW executables.

def get_modflow(
    bindir: str = None,
    **kwargs
) -> str:
    """Download and manage MODFLOW executables"""
    ...

cli_main

Command-line interface for get-modflow script.

def cli_main() -> None:
    """Command-line interface for get-modflow script"""
    ...

Option Handling

OptionBlock

Parse and manage MODFLOW option blocks.

class OptionBlock:
    """Options block parsing and management"""
    def __init__(
        self,
        options_line: str,
        package: object,
        **kwargs
    ): ...
    
    def parse_options(
        self,
        options_line: str,
        **kwargs
    ) -> dict:
        """Parse options from input line"""
        ...
    
    def write_options(
        self,
        f: object,
        **kwargs
    ) -> None:
        """Write options to file"""
        ...

Usage Examples

Model Execution and Validation

import flopy
import flopy.utils as fpu
import numpy as np

# Create or load a model
mf = flopy.modflow.Modflow(modelname='test_model')

# Add basic packages
dis = flopy.modflow.ModflowDis(mf, nlay=1, nrow=10, ncol=10)
bas = flopy.modflow.ModflowBas(mf, ibound=1, strt=1.0)
lpf = flopy.modflow.ModflowLpf(mf, hk=1.0, sy=0.1, ss=1e-5)
pcg = flopy.modflow.ModflowPcg(mf)
oc = flopy.modflow.ModflowOc(mf)

# Write input files
mf.write_input()

# Check model before running
print("Checking model...")
chk = fpu.check(mf, verbose=True, level=2)

if chk.summary_array.passed.all():
    print("All checks passed!")
    
    # Run model using utility function
    success, output = fpu.run_model(
        namefile='test_model.nam',
        exe_name='mf2005',
        model_ws=mf.model_ws,
        report=True
    )
    
    if success:
        print("Model run completed successfully")
        print("Output:")
        for line in output[-10:]:  # Last 10 lines
            print(f"  {line}")
    else:
        print("Model run failed")
        for line in output:
            print(f"  {line}")
else:
    print("Model check failed:")
    failed_checks = chk.summary_array[~chk.summary_array.passed]
    for check in failed_checks:
        print(f"  {check.desc}: {check.summary}")

Flow Analysis and Post-Processing

import flopy.utils as fpu
import numpy as np
import matplotlib.pyplot as plt

# Load model and results
mf = flopy.modflow.Modflow.load('model.nam')

# Read flow data
cbb = fpu.CellBudgetFile('model.cbb')
frf = cbb.get_data(text='FLOW RIGHT FACE')[0]
fff = cbb.get_data(text='FLOW FRONT FACE')[0]
flf = cbb.get_data(text='FLOW LOWER FACE')[0]

# Read head data
hds = fpu.HeadFile('model.hds')
head = hds.get_data(kstpkper=(0, 0))

# Calculate specific discharge
qx, qy, qz = fpu.get_specific_discharge(
    frf, fff, flf,
    grid=mf.modelgrid,
    head=head
)

print(f"Specific discharge statistics:")
print(f"  qx range: {qx.min():.2e} to {qx.max():.2e}")
print(f"  qy range: {qy.min():.2e} to {qy.max():.2e}")
print(f"  qz range: {qz.min():.2e} to {qz.max():.2e}")

# Calculate magnitude
q_magnitude = np.sqrt(qx**2 + qy**2 + qz**2)
print(f"  q magnitude range: {q_magnitude.min():.2e} to {q_magnitude.max():.2e}")

# Calculate transmissivities at specific locations
well_locations = [(0, 5, 5), (0, 3, 7), (0, 8, 2)]

for i, (lay, row, col) in enumerate(well_locations):
    trans = fpu.get_transmissivities(
        heads=head,
        m=mf,
        r=row,
        c=col
    )
    print(f"Transmissivity at well {i+1} (L{lay},R{row},C{col}): {trans[lay]:.2f}")

# Close files
cbb.close()
hds.close()

Array Utilities and Data Management

import flopy
import flopy.utils as fpu
import numpy as np

# Create model
mf = flopy.modflow.Modflow(modelname='array_example')

# Create 2D array with Util2d
nrow, ncol = 20, 30
k_array = fpu.Util2d(
    model=mf,
    shape=(nrow, ncol),
    dtype=np.float32,
    value=1.0,
    name='hydraulic_conductivity'
)

# Set different K values by zone
k_data = np.ones((nrow, ncol))
k_data[:10, :] = 5.0     # High K zone (top half)
k_data[10:, :] = 0.1     # Low K zone (bottom half)
k_data[:, :5] = 50.0     # Very high K zone (left edge)

k_array.array = k_data

print(f"K array statistics:")
print(f"  Shape: {k_array.array.shape}")
print(f"  Min: {k_array.array.min()}")
print(f"  Max: {k_array.array.max()}")
print(f"  Mean: {k_array.array.mean():.2f}")

# Create 3D array with Util3d
nlay = 3
porosity_3d = fpu.Util3d(
    model=mf,
    shape=(nlay, nrow, ncol),
    dtype=np.float32,
    value=0.25,
    name='porosity'
)

# Set layer-specific porosity
por_data = np.full((nlay, nrow, ncol), 0.25)
por_data[0, :, :] = 0.35  # Higher porosity in top layer
por_data[1, :, :] = 0.20  # Lower porosity in middle layer
por_data[2, :, :] = 0.15  # Lowest porosity in bottom layer

porosity_3d.array = por_data

print(f"\nPorosity 3D array statistics:")
print(f"  Shape: {porosity_3d.array.shape}")
for lay in range(nlay):
    layer_data = porosity_3d.array[lay, :, :]
    print(f"  Layer {lay+1}: mean = {layer_data.mean():.3f}")

# Create transient 2D array for recharge
recharge_transient = fpu.Transient2d(
    model=mf,
    shape=(nrow, ncol),
    dtype=np.float32,
    value=0.001,
    name='recharge'
)

# Add seasonal recharge patterns
base_recharge = np.full((nrow, ncol), 0.001)

# Winter (high recharge)
winter_recharge = base_recharge * 3.0
recharge_transient.add_record(0, winter_recharge)

# Spring (medium recharge) 
spring_recharge = base_recharge * 1.5
recharge_transient.add_record(1, spring_recharge)

# Summer (low recharge)
summer_recharge = base_recharge * 0.5
recharge_transient.add_record(2, summer_recharge)

# Fall (medium recharge)
fall_recharge = base_recharge * 1.2
recharge_transient.add_record(3, fall_recharge)

print(f"\nTransient recharge statistics:")
for kper in range(4):
    rch_data = recharge_transient.get_data_array(kper)
    season = ['Winter', 'Spring', 'Summer', 'Fall'][kper]
    print(f"  {season}: mean = {rch_data.mean():.4f}")

# Create MfList for well data
well_dtype = np.dtype([
    ('k', int), ('i', int), ('j', int), ('flux', float)
])

well_data = fpu.MfList(
    model=mf,
    dtype=well_dtype
)

# Add wells for different stress periods
# Stress period 0: Base pumping
wells_sp0 = [
    (0, 10, 15, -1000.0),  # Production well
    (0, 5, 20, -500.0),    # Smaller well
    (0, 15, 10, 200.0)     # Injection well
]

for well in wells_sp0:
    well_data.add_record(0, well)

# Stress period 1: Increased pumping
wells_sp1 = [
    (0, 10, 15, -1500.0),  # Increased pumping
    (0, 5, 20, -750.0),    # Increased pumping
    (0, 15, 10, 300.0),    # Increased injection
    (0, 8, 25, -800.0)     # Additional well
]

for well in wells_sp1:
    well_data.add_record(1, well)

print(f"\nWell data statistics:")
for kper in range(2):
    wells = well_data.get_data(kper)
    total_pumping = np.sum(wells['flux'][wells['flux'] < 0])
    total_injection = np.sum(wells['flux'][wells['flux'] > 0])
    print(f"  Period {kper}: {len(wells)} wells, pumping = {total_pumping:.0f}, injection = {total_injection:.0f}")

Time Conversion and Utilities

import flopy.utils as fpu
import numpy as np
from datetime import datetime

# Model simulation times (in days)
totim = np.array([0, 30, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 365])

# Convert to datetime objects
start_date = '2023-01-01'
datetimes = fpu.totim_to_datetime(
    totim=totim,
    start=start_date,
    timeunit='days'
)

print("Time conversion:")
print(f"Start date: {start_date}")
for i, (time, dt) in enumerate(zip(totim, datetimes)):
    print(f"  Day {time:3.0f}: {dt.strftime('%Y-%m-%d')}")

# Different time units
totim_hours = np.array([0, 6, 12, 24, 48, 72])
datetimes_hours = fpu.totim_to_datetime(
    totim=totim_hours,
    start='2023-06-15 08:00:00',
    timeunit='hours'
)

print(f"\nHourly simulation:")
for time, dt in zip(totim_hours, datetimes_hours):
    print(f"  Hour {time:2.0f}: {dt.strftime('%Y-%m-%d %H:%M:%S')}")

# Parse name file
try:
    packages, files = fpu.parsenamefile('model.nam')
    
    print(f"\nName file contents:")
    print(f"  Found {len(packages)} packages:")
    for pkg in packages:
        ftype, unit, fname, status = pkg
        print(f"    {ftype:8s} (unit {unit:2d}): {fname}")
    
    print(f"\n  File dictionary:")
    for key, value in files.items():
        print(f"    {key}: {value}")
        
except FileNotFoundError:
    print("Name file not found")

# Create and manipulate record arrays
dtype = np.dtype([
    ('layer', int),
    ('row', int), 
    ('col', int),
    ('head', float),
    ('drawdown', float)
])

# Create empty record array
obs_data = fpu.create_empty_recarray(10, dtype, default_value=-999.0)

# Fill with synthetic observation data
for i in range(10):
    obs_data[i] = (
        1,                           # layer
        np.random.randint(0, 20),    # row
        np.random.randint(0, 30),    # col
        100.0 + np.random.normal(0, 5),  # head
        np.random.exponential(2.0)   # drawdown
    )

print(f"\nObservation data:")
print(f"  Array shape: {obs_data.shape}")
print(f"  Data type: {obs_data.dtype}")
print(f"  First 5 records:")
for i in range(5):
    print(f"    {obs_data[i]}")

# Slice record array
head_data = fpu.ra_slice(obs_data, ['layer', 'row', 'col', 'head'])
print(f"\nSliced array (head data only):")
print(f"  New shape: {head_data.shape}")
print(f"  New dtype: {head_data.dtype}")

# Statistics
print(f"\nObservation statistics:")
print(f"  Mean head: {obs_data['head'].mean():.2f}")
print(f"  Mean drawdown: {obs_data['drawdown'].mean():.2f}")
print(f"  Head range: {obs_data['head'].min():.2f} to {obs_data['head'].max():.2f}")

Grid Intersection and Spatial Analysis

import flopy
import flopy.utils as fpu
import numpy as np

# Create model with grid
mf = flopy.modflow.Modflow(modelname='grid_example')
dis = flopy.modflow.ModflowDis(
    mf,
    nlay=1, nrow=20, ncol=30,
    delr=100.0, delc=100.0,
    top=50.0, botm=0.0,
    xul=0.0, yul=2000.0  # Upper left coordinates
)

# Create grid intersect object
grid_intersect = fpu.GridIntersect(mf.modelgrid, method='vertex')

# Point intersections
test_points = [
    (500.0, 1500.0),   # Should be in grid
    (1500.0, 1000.0),  # Should be in grid
    (5000.0, 500.0),   # May be outside grid
]

print("Point intersections:")
for i, point in enumerate(test_points):
    try:
        cell_id = grid_intersect.intersect_point(point)
        if cell_id >= 0:
            # Convert to row, col
            row, col = divmod(cell_id, mf.dis.ncol)
            print(f"  Point {i+1} ({point[0]}, {point[1]}): Cell {cell_id} (Row {row+1}, Col {col+1})")
        else:
            print(f"  Point {i+1} ({point[0]}, {point[1]}): Outside grid")
    except:
        print(f"  Point {i+1} ({point[0]}, {point[1]}): Error in intersection")

# Line intersection (river or stream)
river_line = [
    (200.0, 1800.0),   # Start point
    (800.0, 1600.0),   # Bend point
    (1500.0, 1200.0),  # Another bend
    (2500.0, 800.0)    # End point
]

print(f"\nRiver line intersection:")
try:
    intersections = grid_intersect.intersect_linestring(river_line)
    print(f"  Line intersects {len(intersections)} cells:")
    
    for intersection in intersections[:10]:  # First 10 intersections
        cell_id = intersection.get('cellid', -1)
        if cell_id >= 0:
            row, col = divmod(cell_id, mf.dis.ncol)
            length = intersection.get('length', 0.0)
            print(f"    Cell {cell_id} (R{row+1},C{col+1}): Length = {length:.1f}")
            
except Exception as e:
    print(f"  Error in line intersection: {e}")

# Polygon intersection (capture zone or contamination area)
contamination_area = [
    (1000.0, 1500.0),  # Bottom left
    (2000.0, 1500.0),  # Bottom right
    (2000.0, 1000.0),  # Top right
    (1000.0, 1000.0),  # Top left
    (1000.0, 1500.0)   # Close polygon
]

print(f"\nContamination area intersection:")
try:
    poly_intersections = grid_intersect.intersect_polygon(contamination_area)
    print(f"  Polygon intersects {len(poly_intersections)} cells:")
    
    total_area = 0.0
    for intersection in poly_intersections:
        cell_id = intersection.get('cellid', -1)
        if cell_id >= 0:
            row, col = divmod(cell_id, mf.dis.ncol)
            area = intersection.get('area', 0.0)
            total_area += area
            print(f"    Cell {cell_id} (R{row+1},C{col+1}): Area = {area:.0f}")
    
    print(f"  Total intersection area: {total_area:.0f}")
    
except Exception as e:
    print(f"  Error in polygon intersection: {e}")

# Grid indexing utilities
grid_indices = fpu.ModflowGridIndices(mf.modelgrid)

# Convert between different indexing schemes
test_nodes = [0, 150, 299, 450, 599]  # Some node numbers
lrc_coords = grid_indices.get_lrc_from_node(test_nodes)

print(f"\nNode to LRC conversion:")
for node, lrc in zip(test_nodes, lrc_coords):
    print(f"  Node {node:3d} -> Layer {lrc[0]+1}, Row {lrc[1]+1}, Col {lrc[2]+1}")

# Convert back
nodes_back = grid_indices.get_node_from_lrc(lrc_coords)
print(f"\nLRC to Node conversion (verification):")
for lrc, node in zip(lrc_coords, nodes_back):
    print(f"  Layer {lrc[0]+1}, Row {lrc[1]+1}, Col {lrc[2]+1} -> Node {node}")

# Grid statistics
print(f"\nGrid information:")
print(f"  Total nodes: {mf.dis.nlay * mf.dis.nrow * mf.dis.ncol}")
print(f"  Grid extent: {mf.modelgrid.extent}")
print(f"  Cell size: {mf.dis.delr.array[0]} x {mf.dis.delc.array[0]}")
print(f"  Grid area: {(mf.dis.ncol * mf.dis.delr.array[0]) * (mf.dis.nrow * mf.dis.delc.array[0]):,.0f} m²")

Common Types

# Utility function types
ArrayData = Union[int, float, np.ndarray, list]
ExecutionResult = tuple[bool, list[str]]
CheckResult = object

# Time and date types
TimeValue = Union[int, float]
TimeUnit = Literal['days', 'hours', 'minutes', 'seconds', 'years']
DateString = str
DateTimeList = list[datetime]

# File and path types
FilePath = Union[str, os.PathLike]
NameFileEntry = tuple[str, int, str, str]  # (ftype, unit, fname, status)
FileDict = dict[str, str]

# Array and data types
DataType = Union[type, np.dtype]
RecordArray = np.recarray
ArrayShape = tuple[int, ...]
DefaultValue = Union[int, float, str]

# Grid and spatial types
GridMethod = Literal['vertex', 'centroid', 'structured']
Coordinates = tuple[float, float]
LineString = list[tuple[float, float]]
Polygon = list[tuple[float, float]]
CellID = int
IntersectionResult = dict[str, Union[int, float]]

# Model validation types
CheckLevel = Literal[0, 1, 2]  # Basic, Standard, Detailed
ValidationResult = object
CheckSummary = np.ndarray

# Dependency types
ModuleName = str
ImportResult = object
OptionalExtra = str

# Flow analysis types
FlowArray = np.ndarray
DischargeComponents = tuple[np.ndarray, np.ndarray, np.ndarray]
TransmissivityArray = np.ndarray
VelocityField = tuple[np.ndarray, np.ndarray, np.ndarray]

# Format and I/O types
FormatString = str
FieldWidth = int
Precision = int
IOMode = Literal['r', 'w', 'a', 'rb', 'wb']

This comprehensive documentation covers the complete utilities API for FloPy including model execution, validation, flow analysis, array utilities, grid operations, and various helper functions. The examples demonstrate basic to advanced utility usage scenarios including model checking, spatial analysis, time conversions, and data management operations.

Install with Tessl CLI

npx tessl i tessl/pypi-flopy

docs

discretization.md

export.md

file-io.md

index.md

modflow6.md

modflow2005.md

particle-tracking.md

plotting.md

transport.md

utilities.md

tile.json