Mathematical functions for verification, evaluation and optimization of forecasts, predictions or models
Data preprocessing, discretization, bootstrapping, and CDF manipulation tools required for forecast verification workflows. These utilities prepare data for scoring functions and provide statistical resampling capabilities.
Tools for converting continuous data to categorical or binary formats required for categorical scoring.
Discretizes both forecast and observation data using common bin edges.
def comparative_discretise(
fcst: XarrayLike,
obs: XarrayLike,
*,
bin_edges: Sequence[float],
reduce_dims: Optional[FlexibleDimensionTypes] = None,
preserve_dims: Optional[FlexibleDimensionTypes] = None,
) -> Tuple[XarrayLike, XarrayLike]:
"""
Discretize forecast and observation data using common bin edges.
Args:
fcst: Continuous forecast data
obs: Continuous observation data
bin_edges: Bin edge values for discretization
reduce_dims: Dimensions to reduce during binning
preserve_dims: Dimensions to preserve
Returns:
Tuple of (discretized_forecast, discretized_observation)
Notes:
- Creates consistent categorical bins for both datasets
- Bin edges define category boundaries
- Returns integer category labels starting from 0
- Values outside bin range assigned to nearest bin
"""Converts continuous data to binary events using threshold comparison.
def binary_discretise(
data: XarrayLike,
threshold: Union[float, int, XarrayLike],
*,
threshold_operator: Callable = operator.ge,
) -> XarrayLike:
"""
Convert continuous data to binary events using threshold.
Args:
data: Input continuous data
threshold: Threshold value(s) for binary conversion
threshold_operator: Comparison operator (ge, le, gt, lt)
Returns:
Binary data {0, 1} where 1 indicates event occurrence
Formula:
binary_result = threshold_operator(data, threshold)
Notes:
- Default: event = data >= threshold
- Threshold can be scalar or array with compatible dimensions
- NaN values preserved as NaN in output
"""Creates binary events based on data percentiles rather than fixed thresholds.
def binary_discretise_proportion(
data: XarrayLike,
proportion: float,
*,
dims: Optional[FlexibleDimensionTypes] = None,
threshold_operator: Callable = operator.ge,
) -> XarrayLike:
"""
Convert data to binary events using proportion-based threshold.
Args:
data: Input continuous data
proportion: Proportion of data to classify as events (0-1)
dims: Dimensions over which to calculate percentiles
threshold_operator: Comparison operator
Returns:
Binary data where specified proportion are events
Notes:
- proportion=0.1 creates events for top 10% of values
- Threshold determined from data percentiles
- Useful for relative event definitions
"""Calculates the fraction of data exceeding a specified threshold.
def proportion_exceeding(
data: XarrayLike,
threshold: Union[float, int, XarrayLike],
*,
dims: Optional[FlexibleDimensionTypes] = None,
threshold_operator: Callable = operator.ge,
) -> XarrayLike:
"""
Calculate proportion of data exceeding threshold.
Args:
data: Input data
threshold: Threshold value(s)
dims: Dimensions over which to calculate proportion
threshold_operator: Comparison operator
Returns:
Proportion values [0, 1]
Notes:
- Returns fraction of values satisfying threshold condition
- Useful for climate statistics and event frequencies
"""Utilities for aligning and preparing data arrays for scoring.
Ensures consistent NaN patterns between forecast and observation arrays.
def broadcast_and_match_nan(
*args: XarrayLike,
) -> Tuple[XarrayLike, ...]:
"""
Broadcast arrays and match NaN patterns.
Args:
*args: Variable number of xarray data arrays
Returns:
Tuple of arrays with matching dimensions and NaN patterns
Notes:
- Broadcasts arrays to common dimension set
- Propagates NaN values across all arrays
- Ensures consistent missing data handling
- Required preprocessing for many scoring functions
"""Bootstrap methods for uncertainty quantification and confidence interval estimation.
Block bootstrap resampling for time series data with temporal dependence.
def block_bootstrap(
data: xr.DataArray,
*,
block_length: int,
n_samples: int = 1000,
rng: Optional[Union[int, np.random.Generator]] = None,
dim: str = "time",
) -> xr.DataArray:
"""
Generate block bootstrap samples from time series data.
Args:
data: Input time series data
block_length: Length of each bootstrap block
n_samples: Number of bootstrap samples to generate
rng: Random number generator or seed
dim: Name of time dimension for resampling
Returns:
Bootstrap samples with new 'bootstrap_sample' dimension
Notes:
- Preserves temporal autocorrelation within blocks
- Block length should reflect data correlation timescale
- Suitable for dependent time series data
- Output shape: original + (n_samples,)
"""Isotonic fitting for reliability diagrams and calibration analysis.
def isotonic_fit(
fcst: XarrayLike,
obs: XarrayLike,
*,
reduce_dims: Optional[FlexibleDimensionTypes] = None,
preserve_dims: Optional[FlexibleDimensionTypes] = None,
) -> XarrayLike:
"""
Fit isotonic regression for reliability diagram analysis.
Args:
fcst: Forecast probability values [0, 1]
obs: Binary observation values {0, 1}
reduce_dims: Dimensions to reduce
preserve_dims: Dimensions to preserve
Returns:
Isotonic regression fit values
Notes:
- Creates monotonic mapping from forecast to observed frequency
- Used for probability forecast calibration assessment
- Output represents conditional event frequency given forecast
"""Specialized functions for manipulating Cumulative Distribution Function data.
from scores.processing.cdf import (
round_values,
propagate_nan,
observed_cdf,
integrate_square_piecewise_linear,
add_thresholds,
fill_cdf,
decreasing_cdfs,
cdf_envelope
)
def round_values(
values: xr.DataArray,
precision: int = 6
) -> xr.DataArray:
"""Round values for numerical stability in CDF processing."""
def propagate_nan(
data: xr.DataArray,
threshold_dim: str
) -> xr.DataArray:
"""Propagate NaN values consistently through CDF dimensions."""
def observed_cdf(
obs: xr.DataArray,
thresholds: xr.DataArray,
threshold_dim: str
) -> xr.DataArray:
"""Create empirical CDF from observation data."""
def integrate_square_piecewise_linear(
values: xr.DataArray,
coords: xr.DataArray,
coord_dim: str
) -> xr.DataArray:
"""Integrate squared piecewise linear functions for CRPS calculation."""
def add_thresholds(
cdf_data: xr.DataArray,
additional_thresholds: xr.DataArray,
threshold_dim: str
) -> xr.DataArray:
"""Add additional threshold points to CDF data."""
def fill_cdf(
cdf_data: xr.DataArray,
method: str = "linear",
threshold_dim: str = "threshold"
) -> xr.DataArray:
"""Fill missing values in CDF using specified interpolation method."""
def decreasing_cdfs(
cdf_data: xr.DataArray,
threshold_dim: str
) -> xr.DataArray:
"""Handle and correct decreasing CDF values."""
def cdf_envelope(
cdf_data: xr.DataArray,
threshold_dim: str
) -> xr.DataArray:
"""Create CDF envelope for ensemble of CDFs."""from scores.processing import (
binary_discretise, comparative_discretise,
broadcast_and_match_nan
)
import xarray as xr
import numpy as np
# Create sample continuous data
forecast = xr.DataArray(np.random.exponential(3, 1000))
observation = xr.DataArray(np.random.exponential(3, 1000))
# Convert to binary events (>= 5.0)
fcst_binary = binary_discretise(forecast, 5.0)
obs_binary = binary_discretise(observation, 5.0)
print(f"Event frequency in forecast: {fcst_binary.mean().values:.3f}")
print(f"Event frequency in observations: {obs_binary.mean().values:.3f}")
# Create multicategorical data
bin_edges = [0, 1, 5, 10, 20, 100]
fcst_categorical, obs_categorical = comparative_discretise(
forecast, observation, bin_edges=bin_edges
)
print(f"Categorical forecast range: {fcst_categorical.min().values} to {fcst_categorical.max().values}")# Define events based on data distribution
from scores.processing import binary_discretise_proportion, proportion_exceeding
# Top 20% of values as events
fcst_top20 = binary_discretise_proportion(forecast, 0.2)
obs_top20 = binary_discretise_proportion(observation, 0.2)
# Verify proportion
print(f"Forecast top 20% frequency: {fcst_top20.mean().values:.3f}")
print(f"Observation top 20% frequency: {obs_top20.mean().values:.3f}")
# Calculate actual proportions exceeding various thresholds
thresholds = [1, 2, 5, 10, 15]
for threshold in thresholds:
fcst_prop = proportion_exceeding(forecast, threshold)
obs_prop = proportion_exceeding(observation, threshold)
print(f"Threshold {threshold:2d}: fcst={fcst_prop.values:.3f}, obs={obs_prop.values:.3f}")# Handle misaligned data with NaN patterns
forecast_with_nan = forecast.copy()
observation_with_nan = observation.copy()
# Introduce different NaN patterns
forecast_with_nan[::10] = np.nan # Every 10th value
observation_with_nan[5::15] = np.nan # Different pattern
# Align and match NaN patterns
aligned_fcst, aligned_obs = broadcast_and_match_nan(
forecast_with_nan, observation_with_nan
)
print(f"Original forecast NaN count: {forecast_with_nan.isnull().sum().values}")
print(f"Original observation NaN count: {observation_with_nan.isnull().sum().values}")
print(f"Aligned data NaN count: {aligned_fcst.isnull().sum().values}")from scores.processing import block_bootstrap
from scores.continuous import mse
# Create time series data
time_index = pd.date_range("2023-01-01", periods=365, freq="D")
forecast_ts = xr.DataArray(
np.random.normal(10, 2, 365) + 3*np.sin(np.arange(365)*2*np.pi/365),
dims=["time"],
coords={"time": time_index}
)
observation_ts = xr.DataArray(
forecast_ts.values + np.random.normal(0, 1, 365),
dims=["time"],
coords={"time": time_index}
)
# Generate bootstrap samples
bootstrap_samples = block_bootstrap(
forecast_ts,
block_length=14, # 2-week blocks
n_samples=1000,
dim="time"
)
# Calculate MSE for each bootstrap sample
mse_bootstrap = []
for i in range(1000):
boot_fcst = bootstrap_samples.isel(bootstrap_sample=i)
boot_obs = observation_ts.isel(time=boot_fcst.time) # Match time indices
mse_val = mse(boot_fcst, boot_obs)
mse_bootstrap.append(mse_val.values)
mse_bootstrap = np.array(mse_bootstrap)
print(f"Bootstrap MSE: {np.mean(mse_bootstrap):.3f} ± {np.std(mse_bootstrap):.3f}")
print(f"95% CI: [{np.percentile(mse_bootstrap, 2.5):.3f}, {np.percentile(mse_bootstrap, 97.5):.3f}]")from scores.processing import isotonic_fit
# Create probability forecast and binary observations
prob_forecast = xr.DataArray(np.random.beta(2, 2, 500))
# Generate observations with some relationship to forecast
binary_obs = xr.DataArray(
np.random.binomial(1, 0.3 + 0.4*prob_forecast.values)
)
# Fit isotonic regression for reliability analysis
reliability_fit = isotonic_fit(prob_forecast, binary_obs)
print(f"Original forecast range: [{prob_forecast.min().values:.3f}, {prob_forecast.max().values:.3f}]")
print(f"Reliability fit range: [{reliability_fit.min().values:.3f}, {reliability_fit.max().values:.3f}]")
# Perfect reliability would have reliability_fit ≈ prob_forecast
reliability_bias = (reliability_fit - prob_forecast).mean()
print(f"Mean reliability bias: {reliability_bias.values:.3f}")# Process multi-dimensional data
forecast_3d = xr.DataArray(
np.random.exponential(2, (50, 20, 30)), # time, lat, lon
dims=["time", "lat", "lon"]
)
observation_3d = xr.DataArray(
np.random.exponential(2, (50, 20, 30)),
dims=["time", "lat", "lon"]
)
# Binary discretization preserves all dimensions
fcst_events_3d = binary_discretise(forecast_3d, 5.0)
obs_events_3d = binary_discretise(observation_3d, 5.0)
# Calculate event frequencies over different dimensions
temporal_frequency = fcst_events_3d.mean(dim="time") # Spatial event frequency
spatial_frequency = fcst_events_3d.mean(dim=["lat", "lon"]) # Temporal event frequency
overall_frequency = fcst_events_3d.mean() # Overall frequency
print(f"Spatial frequency range: {temporal_frequency.min().values:.3f} to {temporal_frequency.max().values:.3f}")
print(f"Overall event frequency: {overall_frequency.values:.3f}")
# Proportion-based events calculated over specific dimensions
# Top 10% events at each grid point over time
spatial_top10 = binary_discretise_proportion(
forecast_3d, 0.1, dims="time"
)
print(f"Spatial top 10% events shape: {spatial_top10.shape}")from scores.processing.cdf import fill_cdf, add_thresholds, observed_cdf
# Create sample CDF data with missing values
thresholds = xr.DataArray(np.linspace(0, 20, 21))
cdf_values = xr.DataArray(
np.cumsum(np.random.exponential(0.1, (100, 21)), axis=1),
dims=["sample", "threshold"],
coords={"threshold": thresholds}
)
# Normalize to [0,1]
cdf_values = cdf_values / cdf_values.isel(threshold=-1, drop=True)
# Introduce missing values
cdf_values = cdf_values.where(np.random.random(cdf_values.shape) > 0.05)
# Fill missing values
filled_cdf = fill_cdf(cdf_values, method="linear", threshold_dim="threshold")
# Add additional threshold points
additional_thresh = xr.DataArray([2.5, 7.5, 12.5, 17.5])
extended_cdf = add_thresholds(filled_cdf, additional_thresh, "threshold")
print(f"Original CDF shape: {cdf_values.shape}")
print(f"Extended CDF shape: {extended_cdf.shape}")
print(f"Missing values before: {cdf_values.isnull().sum().values}")
print(f"Missing values after: {extended_cdf.isnull().sum().values}")Install with Tessl CLI
npx tessl i tessl/pypi-scores