FloPy is a Python package to create, run, and post-process MODFLOW-based models
66
This module provides comprehensive export functionality for model grids, arrays, and results to various formats including NetCDF, VTK, shapefiles, and other GIS-compatible formats. These tools enable interoperability with other modeling software, GIS systems, and visualization platforms.
Primary class for exporting MODFLOW models and results to NetCDF format.
class NetCdf:
"""NetCDF export functionality for MODFLOW models"""
def __init__(
self,
output_filename: str,
model: object,
array_dict: dict = None,
meshtype: str = None,
**kwargs
): ...
def write(self) -> None:
"""Write model and data to NetCDF file"""
...
def add_model(self, model: object) -> None:
"""Add model to NetCDF file"""
...
def add_package(
self,
package: object,
text: str = None,
**kwargs
) -> None:
"""Add package data to NetCDF file"""
...
def add_transient_array(
self,
array: np.ndarray,
name: str,
units: str = None,
**kwargs
) -> None:
"""Add time-varying array data"""
...
def add_array(
self,
array: np.ndarray,
name: str,
units: str = None,
**kwargs
) -> None:
"""Add array data to NetCDF file"""
...
def add_grid(
self,
grid: object,
**kwargs
) -> None:
"""Add grid information to NetCDF file"""
...
def add_coordinate_system(
self,
epsg: int = None,
proj4_str: str = None,
**kwargs
) -> None:
"""Add coordinate reference system"""
...
@property
def nc(self) -> object:
"""NetCDF4 Dataset object"""
...
@property
def logger(self) -> object:
"""Export logger"""
...Logging functionality for export operations.
class Logger:
"""Logging for export operations"""
def __init__(
self,
level: str = 'INFO',
filename: str = None,
**kwargs
): ...
def info(self, message: str) -> None:
"""Log info message"""
...
def warning(self, message: str) -> None:
"""Log warning message"""
...
def error(self, message: str) -> None:
"""Log error message"""
...
def debug(self, message: str) -> None:
"""Log debug message"""
...VTK format export for 3D visualization and analysis.
class Vtk:
"""VTK format export for MODFLOW models"""
def __init__(
self,
model: object = None,
modelgrid: object = None,
vertical_exageration: float = 1.0,
binary: bool = True,
smooth: bool = False,
point_scalars: bool = False,
fmt: str = None,
**kwargs
): ...
def add_model(self, model: object) -> None:
"""Add model to VTK export"""
...
def add_array(
self,
array: np.ndarray,
name: str,
**kwargs
) -> None:
"""Add array data for export"""
...
def add_transient_array(
self,
d: dict,
name: str,
**kwargs
) -> None:
"""Add time-varying array data"""
...
def add_vector(
self,
vectors: tuple[np.ndarray, np.ndarray, np.ndarray],
name: str,
**kwargs
) -> None:
"""Add vector field data"""
...
def add_pathline_points(
self,
pathlines: list,
**kwargs
) -> None:
"""Add MODPATH pathline data"""
...
def add_endpoint_points(
self,
endpoints: np.ndarray,
**kwargs
) -> None:
"""Add MODPATH endpoint data"""
...
def write(
self,
filename: str,
**kwargs
) -> None:
"""Write VTK file"""
...
@property
def arrays(self) -> dict:
"""Dictionary of arrays for export"""
...ParaView data file management for time series VTK files.
class Pvd:
"""ParaView data file management"""
def __init__(
self,
filename: str,
**kwargs
): ...
def add_dataset(
self,
filename: str,
timestep: float,
**kwargs
) -> None:
"""Add VTK dataset to time series"""
...
def write(self) -> None:
"""Write PVD file"""
...Functions for exporting model data to ESRI shapefile format.
def write_grid_shapefile(
filename: str,
mg: object,
array_dict: dict = None,
**kwargs
) -> None:
"""Export model grid to shapefile"""
...
def write_shapefile(
filename: str,
geoms: list,
attribute_dict: dict = None,
**kwargs
) -> None:
"""Write geometries and attributes to shapefile"""
...
def model_attributes_to_shapefile(
filename: str,
ml: object,
package_names: list = None,
array_dict: dict = None,
**kwargs
) -> None:
"""Export model attributes to shapefile"""
...
def package_export_shapefile(
pak: object,
filename: str,
**kwargs
) -> None:
"""Export package data to shapefile"""
...
def boundary_condition_export(
filename: str,
pak: object,
kper: int = 0,
**kwargs
) -> None:
"""Export boundary condition data to shapefile"""
...Attribute Convention for Data Discovery (ACDD) metadata standards.
class acdd:
"""ACDD metadata standards for export files"""
@staticmethod
def get_standard_attributes() -> dict:
"""Get standard ACDD attributes"""
...
@staticmethod
def create_attributes(
title: str,
summary: str,
institution: str = None,
source: str = None,
**kwargs
) -> dict:
"""Create ACDD-compliant attribute dictionary"""
...
@staticmethod
def add_global_attributes(
nc_file: object,
attributes: dict,
**kwargs
) -> None:
"""Add global attributes to NetCDF file"""
...def model_export(
filename: str,
model: object,
fmt: str = 'netcdf',
**kwargs
) -> None:
"""Export complete model to specified format"""
...
def package_export(
filename: str,
package: object,
fmt: str = 'netcdf',
**kwargs
) -> None:
"""Export individual package to specified format"""
...
def array2d_export(
filename: str,
array: np.ndarray,
modelgrid: object,
**kwargs
) -> None:
"""Export 2D array with grid information"""
...
def array3d_export(
filename: str,
array: np.ndarray,
modelgrid: object,
**kwargs
) -> None:
"""Export 3D array with grid information"""
...
def export_array(
filename: str,
array: np.ndarray,
fmt: str = 'netcdf',
**kwargs
) -> None:
"""Generic array export function"""
...
def export_contours(
filename: str,
array: np.ndarray,
levels: list = None,
modelgrid: object = None,
**kwargs
) -> None:
"""Export contour data to shapefile"""
...import flopy
import flopy.export as fpe
import numpy as np
# Load or create MODFLOW model
mf = flopy.modflow.Modflow.load('model.nam')
# Basic NetCDF export
nc_file = fpe.NetCdf('model_export.nc', mf)
# Add model grid and packages
nc_file.add_model(mf)
# Add arrays from packages
if hasattr(mf, 'lpf'):
nc_file.add_array(mf.lpf.hk.array, 'hydraulic_conductivity', units='m/day')
nc_file.add_array(mf.lpf.sy.array, 'specific_yield', units='dimensionless')
if hasattr(mf, 'dis'):
nc_file.add_array(mf.dis.top.array, 'top_elevation', units='m')
nc_file.add_array(mf.dis.botm.array, 'bottom_elevation', units='m')
# Add coordinate system information
nc_file.add_coordinate_system(epsg=32633) # UTM Zone 33N
# Write the NetCDF file
nc_file.write()
print("NetCDF export completed: model_export.nc")import flopy
import flopy.export as fpe
import flopy.utils as fpu
import numpy as np
# Load model and results
mf = flopy.modflow.Modflow.load('model.nam')
# Read head and budget files
hds = fpu.HeadFile('model.hds')
cbb = fpu.CellBudgetFile('model.cbb')
# Create NetCDF export with metadata
nc_file = fpe.NetCdf(
'complete_model_export.nc',
mf,
array_dict=None,
meshtype='structured'
)
# Add ACDD metadata
metadata = fpe.acdd.create_attributes(
title='Regional Groundwater Flow Model',
summary='Three-dimensional groundwater flow model for water resource assessment',
institution='University Research Center',
source='MODFLOW-2005',
creator_name='Jane Doe',
creator_email='jane.doe@university.edu',
project='Regional Water Study',
geospatial_bounds='POLYGON((...))'
)
fpe.acdd.add_global_attributes(nc_file.nc, metadata)
# Add model structure
nc_file.add_model(mf)
# Add static arrays
array_dict = {
'hydraulic_conductivity': (mf.lpf.hk.array, 'm/day'),
'specific_yield': (mf.lpf.sy.array, 'dimensionless'),
'specific_storage': (mf.lpf.ss.array, '1/m'),
'top_elevation': (mf.dis.top.array, 'm'),
'bottom_elevation': (mf.dis.botm.array, 'm'),
'porosity': (np.ones(mf.dis.botm.array.shape) * 0.25, 'dimensionless')
}
for name, (array, units) in array_dict.items():
nc_file.add_array(array, name, units=units)
# Add transient results
# Heads for all time steps
head_dict = {}
times = hds.get_times()
kstpkper_list = hds.get_kstpkper()
for i, (time, kstpkper) in enumerate(zip(times, kstpkper_list)):
head = hds.get_data(kstpkper=kstpkper)
head_dict[time] = head
nc_file.add_transient_array(head_dict, 'head', units='m')
# Add budget components
budget_components = ['STORAGE', 'CONSTANT HEAD', 'WELLS', 'RECHARGE']
for comp in budget_components:
try:
budget_dict = {}
for i, (time, kstpkper) in enumerate(zip(times, kstpkper_list)):
budget_data = cbb.get_data(text=comp, kstpkper=kstpkper)
if budget_data is not None:
budget_dict[time] = budget_data[0]
if budget_dict:
nc_file.add_transient_array(
budget_dict,
f'budget_{comp.lower().replace(" ", "_")}',
units='m3/day'
)
except:
print(f"Could not export {comp} budget data")
# Add coordinate system and finish
nc_file.add_coordinate_system(epsg=4326) # WGS84
nc_file.write()
print("Complete NetCDF export finished")
# Close files
hds.close()
cbb.close()import flopy
import flopy.export as fpe
import flopy.utils as fpu
import numpy as np
# Load model
mf = flopy.modflow.Modflow.load('model.nam')
# Create VTK export object
vtk_file = fpe.Vtk(
model=mf,
vertical_exageration=10.0, # Exaggerate vertical for better visualization
binary=True, # Binary format for smaller files
smooth=True # Smooth surfaces
)
# Add model structure
vtk_file.add_model(mf)
# Add hydraulic conductivity
if hasattr(mf, 'lpf'):
vtk_file.add_array(mf.lpf.hk.array, 'K_horizontal')
vtk_file.add_array(mf.lpf.vka.array, 'K_vertical')
# Add head results
try:
hds = fpu.HeadFile('model.hds')
# Add steady-state heads
head_ss = hds.get_data(kstpkper=(0, 0))
vtk_file.add_array(head_ss, 'head_steady_state')
# Add transient heads (multiple time steps)
head_transient = {}
times = hds.get_times()
kstpkper_list = hds.get_kstpkper()
for time, kstpkper in zip(times, kstpkper_list):
head = hds.get_data(kstpkper=kstpkper)
head_transient[time] = head
vtk_file.add_transient_array(head_transient, 'head_transient')
hds.close()
except FileNotFoundError:
print("Head file not found")
# Add velocity vectors
try:
cbb = fpu.CellBudgetFile('model.cbb')
# Get face flows
frf = cbb.get_data(text='FLOW RIGHT FACE')[0]
fff = cbb.get_data(text='FLOW FRONT FACE')[0]
flf = cbb.get_data(text='FLOW LOWER FACE')[0]
# Convert to cell-centered velocities (simplified)
# Proper conversion would account for cell dimensions and porosity
qx = np.zeros_like(frf)
qy = np.zeros_like(fff)
qz = np.zeros_like(flf)
# Simple averaging (not rigorous)
qx[:, :, :-1] = frf[:, :, :-1]
qy[:, :-1, :] = fff[:, :-1, :]
qz[:-1, :, :] = flf[:-1, :, :]
vtk_file.add_vector((qx, qy, qz), 'velocity_vectors')
cbb.close()
except FileNotFoundError:
print("Budget file not found")
# Add MODPATH results if available
try:
# Pathlines
pth_file = fpu.PathlineFile('model.mppth')
pathlines = pth_file.get_alldata()
vtk_file.add_pathline_points(pathlines[:100]) # First 100 pathlines
# Endpoints
ep_file = fpu.EndpointFile('model.mpend')
endpoints = ep_file.get_alldata()
vtk_file.add_endpoint_points(endpoints)
except FileNotFoundError:
print("MODPATH files not found")
# Write VTK file
vtk_file.write('model_3d.vtu')
print("VTK export completed: model_3d.vtu")
# For time series, create PVD file
pvd_file = fpe.Pvd('model_timeseries.pvd')
for i, time in enumerate(times[:10]): # First 10 time steps
# Create individual VTK files for each time step
vtk_time = fpe.Vtk(model=mf)
vtk_time.add_model(mf)
# Add head for this time step
head = hds.get_data(idx=i)
vtk_time.add_array(head, f'head_t{i}')
filename = f'model_t{i:03d}.vtu'
vtk_time.write(filename)
# Add to PVD file
pvd_file.add_dataset(filename, time)
# Write PVD file
pvd_file.write()
print("Time series VTK export completed")import flopy
import flopy.export as fpe
import flopy.utils as fpu
import numpy as np
# Load model
mf = flopy.modflow.Modflow.load('model.nam')
# Export model grid to shapefile
grid_dict = {
'layer': np.repeat(range(mf.dis.nlay), mf.dis.nrow * mf.dis.ncol),
'row': np.tile(np.repeat(range(mf.dis.nrow), mf.dis.ncol), mf.dis.nlay),
'column': np.tile(range(mf.dis.ncol), mf.dis.nlay * mf.dis.nrow)
}
# Add hydraulic properties
if hasattr(mf, 'lpf'):
grid_dict['k_horizontal'] = mf.lpf.hk.array.ravel()
grid_dict['k_vertical'] = mf.lpf.vka.array.ravel()
grid_dict['specific_yield'] = mf.lpf.sy.array.ravel()
# Add elevations
grid_dict['top_elev'] = np.repeat(mf.dis.top.array.ravel(), mf.dis.nlay)
botm_3d = np.broadcast_to(mf.dis.botm.array, (mf.dis.nlay, mf.dis.nrow, mf.dis.ncol))
grid_dict['bot_elev'] = botm_3d.ravel()
# Export grid with attributes
fpe.write_grid_shapefile('model_grid.shp', mf.modelgrid, array_dict=grid_dict)
print("Grid shapefile exported: model_grid.shp")
# Export head results
try:
hds = fpu.HeadFile('model.hds')
# Final heads
head_final = hds.get_data(kstpkper=(-1, -1))
head_dict = {
'head_final': head_final.ravel(),
'layer': grid_dict['layer'],
'row': grid_dict['row'],
'column': grid_dict['column']
}
fpe.write_grid_shapefile('model_heads.shp', mf.modelgrid, array_dict=head_dict)
hds.close()
except FileNotFoundError:
print("Head file not found")
# Export boundary conditions
bc_packages = ['WEL', 'RIV', 'GHB', 'DRN', 'CHD']
for bc_name in bc_packages:
try:
package = mf.get_package(bc_name)
if package is not None:
fpe.package_export_shapefile(package, f'{bc_name.lower()}_bc.shp')
print(f"Exported {bc_name} package to shapefile")
except:
print(f"Could not export {bc_name} package")
# Export well data with detailed attributes
if hasattr(mf, 'wel'):
wel_package = mf.wel
# Get well data for stress period 0
wel_data = wel_package.stress_period_data[0]
# Create well points with attributes
well_geoms = []
well_attrs = {
'well_id': [],
'layer': [],
'row': [],
'column': [],
'pump_rate': [],
'x_coord': [],
'y_coord': []
}
for i, well in enumerate(wel_data):
lay, row, col, rate = well[0], well[1], well[2], well[3]
# Get coordinates from model grid
x, y = mf.modelgrid.get_coords(row, col)
# Create point geometry (simplified - would use shapely in practice)
well_geoms.append((x, y))
well_attrs['well_id'].append(f'WELL_{i+1:03d}')
well_attrs['layer'].append(lay + 1) # 1-based indexing
well_attrs['row'].append(row + 1)
well_attrs['column'].append(col + 1)
well_attrs['pump_rate'].append(rate)
well_attrs['x_coord'].append(x)
well_attrs['y_coord'].append(y)
# Export wells (would need actual shapefile writing function)
# fpe.write_shapefile('wells.shp', well_geoms, well_attrs)
print(f"Found {len(wel_data)} wells for export")
# Export contours
try:
hds = fpu.HeadFile('model.hds')
head = hds.get_data(kstpkper=(0, 0))
# Define contour levels
levels = np.linspace(head.min(), head.max(), 20)
fpe.export_contours(
'head_contours.shp',
head[0, :, :], # Top layer only
levels=levels,
modelgrid=mf.modelgrid
)
print("Head contours exported")
hds.close()
except FileNotFoundError:
print("Head file not found for contour export")import flopy
import flopy.export as fpe
import flopy.utils as fpu
import os
def export_model_complete(model_name: str, export_formats: list = None):
"""Complete model export to multiple formats"""
if export_formats is None:
export_formats = ['netcdf', 'vtk', 'shapefile']
# Load model
mf = flopy.modflow.Modflow.load(f'{model_name}.nam')
# Create export directory
export_dir = f'{model_name}_export'
os.makedirs(export_dir, exist_ok=True)
print(f"Exporting model {model_name} to formats: {export_formats}")
# NetCDF export
if 'netcdf' in export_formats:
print("Exporting to NetCDF...")
nc_file = fpe.NetCdf(
os.path.join(export_dir, f'{model_name}.nc'),
mf
)
# Add comprehensive metadata
metadata = {
'title': f'MODFLOW Model: {model_name}',
'summary': 'Groundwater flow model with complete package and result data',
'source': 'MODFLOW-2005',
'Conventions': 'CF-1.6, ACDD-1.3',
'date_created': '2024-01-01',
'geospatial_bounds_crs': 'EPSG:4326'
}
fpe.acdd.add_global_attributes(nc_file.nc, metadata)
nc_file.add_model(mf)
# Add results if available
try:
hds = fpu.HeadFile(f'{model_name}.hds')
head_dict = {}
for i, time in enumerate(hds.get_times()):
head = hds.get_data(idx=i)
head_dict[time] = head
nc_file.add_transient_array(head_dict, 'head', units='m')
hds.close()
except:
pass
nc_file.write()
print(f" NetCDF export complete: {model_name}.nc")
# VTK export
if 'vtk' in export_formats:
print("Exporting to VTK...")
vtk_file = fpe.Vtk(model=mf, binary=True)
vtk_file.add_model(mf)
# Add results
try:
hds = fpu.HeadFile(f'{model_name}.hds')
head = hds.get_data(kstpkper=(0, 0))
vtk_file.add_array(head, 'head')
hds.close()
except:
pass
vtk_output = os.path.join(export_dir, f'{model_name}.vtu')
vtk_file.write(vtk_output)
print(f" VTK export complete: {model_name}.vtu")
# Shapefile export
if 'shapefile' in export_formats:
print("Exporting to Shapefiles...")
# Grid with attributes
grid_dict = {
'layer': np.repeat(range(mf.dis.nlay), mf.dis.nrow * mf.dis.ncol),
'k_horiz': mf.lpf.hk.array.ravel() if hasattr(mf, 'lpf') else None,
'top_elev': np.repeat(mf.dis.top.array.ravel(), mf.dis.nlay)
}
# Filter out None values
grid_dict = {k: v for k, v in grid_dict.items() if v is not None}
grid_output = os.path.join(export_dir, f'{model_name}_grid.shp')
fpe.write_grid_shapefile(grid_output, mf.modelgrid, array_dict=grid_dict)
# Boundary conditions
bc_packages = ['WEL', 'RIV', 'GHB', 'DRN']
for bc_name in bc_packages:
try:
package = mf.get_package(bc_name)
if package is not None:
bc_output = os.path.join(export_dir, f'{model_name}_{bc_name.lower()}.shp')
fpe.package_export_shapefile(package, bc_output)
except:
continue
print(f" Shapefile export complete in directory: {export_dir}")
print(f"Model export completed. Files saved in: {export_dir}")
# Example usage
export_model_complete('regional_model', ['netcdf', 'vtk', 'shapefile'])import flopy
import flopy.export as fpe
import numpy as np
import json
def export_model_summary_json(model: object, filename: str):
"""Export model summary information to JSON"""
summary = {
'model_info': {
'name': model.name,
'version': model.version,
'workspace': model.model_ws,
'executable': model.exe_name
},
'discretization': {
'nlay': model.dis.nlay,
'nrow': model.dis.nrow,
'ncol': model.dis.ncol,
'nper': model.dis.nper,
'total_cells': model.dis.nlay * model.dis.nrow * model.dis.ncol
},
'packages': [pak.name[0] for pak in model.packagelist],
'grid_info': {
'delr_min': float(np.min(model.dis.delr.array)),
'delr_max': float(np.max(model.dis.delr.array)),
'delc_min': float(np.min(model.dis.delc.array)),
'delc_max': float(np.max(model.dis.delc.array)),
'extent': model.modelgrid.extent
}
}
# Add package-specific information
if hasattr(model, 'lpf'):
summary['hydraulic_properties'] = {
'k_horizontal_range': [float(np.min(model.lpf.hk.array)),
float(np.max(model.lpf.hk.array))],
'porosity_range': [float(np.min(model.lpf.sy.array)),
float(np.max(model.lpf.sy.array))]
}
if hasattr(model, 'wel'):
wel_data = model.wel.stress_period_data[0]
summary['wells'] = {
'count': len(wel_data),
'total_pumping': float(np.sum([w[3] for w in wel_data if w[3] < 0])),
'total_injection': float(np.sum([w[3] for w in wel_data if w[3] > 0]))
}
# Write JSON file
with open(filename, 'w') as f:
json.dump(summary, f, indent=2)
print(f"Model summary exported to: {filename}")
def export_budget_csv(cbb_file: str, output_file: str):
"""Export budget components to CSV format"""
import pandas as pd
cbb = fpu.CellBudgetFile(cbb_file)
# Get all available records
records = cbb.list_unique_records()
times = cbb.get_times()
# Create DataFrame
budget_data = []
for time in times:
for record in records:
try:
data = cbb.get_data(text=record, totim=time)
if data is not None:
total_flow = np.sum(data[0])
budget_data.append({
'time': time,
'component': record,
'total_flow': total_flow
})
except:
continue
df = pd.DataFrame(budget_data)
df.to_csv(output_file, index=False)
cbb.close()
print(f"Budget data exported to: {output_file}")
# Example usage
mf = flopy.modflow.Modflow.load('model.nam')
export_model_summary_json(mf, 'model_summary.json')
export_budget_csv('model.cbb', 'budget_summary.csv')# Export format types
ExportFormat = Literal['netcdf', 'vtk', 'shapefile', 'csv', 'json']
FileFormat = Literal['binary', 'ascii']
CompressionLevel = int # 0-9 for compression
# Data types for export
ArrayData = np.ndarray
TransientData = dict[float, np.ndarray] # Time -> Array
VectorData = tuple[np.ndarray, np.ndarray, np.ndarray] # (vx, vy, vz)
# Geometry types
GeometryType = Literal['Point', 'LineString', 'Polygon']
ShapefileGeometry = list[tuple[float, ...]]
AttributeDict = dict[str, list]
# Metadata types
MetadataDict = dict[str, Union[str, int, float]]
ACDDAttributes = dict[str, str]
CoordinateSystem = Union[int, str] # EPSG code or proj4 string
# File and path types
FilePath = Union[str, os.PathLike]
OutputDirectory = str
FileExtension = str
# VTK specific types
VTKDataType = Literal['SCALARS', 'VECTORS', 'TENSORS']
VTKFormat = Literal['ascii', 'binary']
TimeStepData = dict[float, str] # Time -> filename mapping
# NetCDF specific types
NCVariable = str
NCDimension = str
NCAttributes = dict[str, Union[str, int, float, list]]
# Shapefile specific types
FieldType = Literal['C', 'N', 'F', 'L', 'D'] # Character, Numeric, Float, Logical, Date
FieldDefinition = tuple[str, str, int, int] # (name, type, width, precision)This comprehensive documentation covers the complete data export API for FloPy including NetCDF, VTK, and shapefile export capabilities. The examples demonstrate basic to advanced export scenarios including complete model export workflows, multi-format exports, and custom export functions for various use cases.
Install with Tessl CLI
npx tessl i tessl/pypi-flopydocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10