CMA-ES, Covariance Matrix Adaptation Evolution Strategy for non-linear numerical optimization in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced optimization techniques including surrogate models, pure Python implementation, and noise handling for specialized use cases.
CMA-ES with linear-quadratic surrogate models (lq-CMA-ES) for expensive objective functions where function evaluations are costly.
def fmin_lq_surr2(
objective_function,
x0,
sigma0,
options=None,
inject=True,
restarts=0,
incpopsize=2,
keep_model=False,
not_evaluated=np.isnan,
callback=None
):
"""
Minimize objective function using lq-CMA-ES with surrogate models.
Linear-quadratic CMA-ES builds global surrogate models (linear or quadratic)
to reduce the number of expensive function evaluations by predicting
function values for candidate solutions.
Parameters:
-----------
objective_function : callable
Expensive function to minimize. Called as objective_function(x).
x0 : array-like or callable
Initial solution estimate. Can be callable returning different
starting points for each restart.
sigma0 : float or array-like
Initial standard deviation (step size).
options : dict, optional
CMA-ES options. See CMAOptions() for available options.
Important for surrogate models:
- 'maxfevals': Maximum true function evaluations (crucial for expensive functions)
- 'ftarget': Target function value for termination
inject : bool, optional
Whether to inject best surrogate solution in each iteration (default True).
restarts : int, optional
Number of IPOP restarts to perform (default 0).
incpopsize : float, optional
Population size multiplier for restarts (default 2).
keep_model : bool, optional
Whether to keep surrogate model between restarts (default False).
If False, new model is built for each restart.
not_evaluated : callable, optional
Function to test if value indicates missing evaluation (default np.isnan).
callback : callable, optional
Callback function called each iteration.
Returns:
--------
tuple[numpy.ndarray, CMAEvolutionStrategy]
(xbest, es) where xbest is best solution and es contains optimization details.
Note: es.result.xfavorite is often the best estimate for the optimum.
Examples:
---------
>>> import cma
>>> import numpy as np
>>>
>>> # Expensive function simulation
>>> def expensive_function(x):
... # Simulate expensive computation
... return sum((x - 1)**2) + 0.1 * np.random.randn()
>>>
>>> # Optimize with limited evaluations
>>> x, es = cma.fmin_lq_surr2(
... expensive_function,
... [0, 0, 0],
... 0.5,
... options={'maxfevals': 100, 'verb_disp': 0}
... )
>>> print(f"Best solution: {x}")
>>> print(f"Evaluations used: {es.countevals}")
>>>
>>> # With restarts for robustness
>>> x, es = cma.fmin_lq_surr2(
... expensive_function,
... lambda: np.random.randn(3), # Random start each restart
... 0.5,
... restarts=2,
... options={'maxfevals': 200}
... )
>>>
>>> # For very expensive functions
>>> def very_expensive(x):
... # Simulate 10 second computation
... import time
... time.sleep(0.1) # Reduced for example
... return sum(x**2)
>>>
>>> x, es = cma.fmin_lq_surr2(
... very_expensive,
... [1, 1],
... 0.3,
... options={'maxfevals': 50, 'ftarget': 1e-6}
... )
"""
passdef fmin_lq_surr(objective_function, x0, sigma0, options=None, **kwargs):
"""
Minimize objective function using lq-CMA-ES with surrogate models.
Legacy interface to surrogate model optimization. Use fmin_lq_surr2
for new code as it provides better control and cleaner results.
Parameters:
-----------
Same as fmin_lq_surr2 but with different defaults and behavior.
Returns:
--------
tuple[numpy.ndarray, CMAEvolutionStrategy]
(xbest, es) where results may be based on surrogate values.
Notes:
------
- xbest takes into account only recent history, not all evaluations
- es.result may be confusing as it's partly based on surrogate values
- es.best contains solution with best surrogate value (not true value)
- Model is kept same for each restart (use fmin_lq_surr2 to change this)
"""
passimport cma
import numpy as np
import functools
# Pattern 1: Simple expensive function optimization
def expensive_objective(x):
"""Simulate expensive function with 1-second delay."""
import time
time.sleep(0.01) # Simulate computation time
return sum((x - np.array([1, 2, 3]))**2) + 0.01 * np.random.randn()
# Optimize with surrogate model
x_best, es = cma.fmin_lq_surr2(
expensive_objective,
[0, 0, 0],
0.5,
options={
'maxfevals': 80, # Limit true evaluations
'ftarget': 1e-4, # Stop when good enough
'verb_disp': 10 # Show progress every 10 iterations
}
)
print(f"Found solution: {x_best}")
print(f"True evaluations: {es.countevals}")
print(f"Total iterations: {es.countiter}")
# Pattern 2: Using with functools.partial for additional arguments
def parametric_function(x, a, b):
"""Function with additional parameters."""
return sum((x - a)**2) + b * sum(np.sin(x))
# Create partial function with fixed parameters
objective = functools.partial(parametric_function, a=np.array([1, 2]), b=0.1)
x_best, es = cma.fmin_lq_surr2(
objective,
[0, 0],
0.3,
options={'maxfevals': 60}
)
# Pattern 3: Multiple restarts with different starting points
def random_start():
"""Generate random starting point for each restart."""
return np.random.uniform(-2, 2, 5)
x_best, es = cma.fmin_lq_surr2(
lambda x: sum(x**4 - 16*x**2 + 5*x), # Multi-modal function
random_start, # Callable for different starts
0.5,
restarts=3,
incpopsize=1.5, # Smaller population increase
options={'maxfevals': 200}
)
# Pattern 4: Custom evaluation detection
def custom_not_evaluated(value):
"""Custom function to detect missing evaluations."""
return np.isnan(value) or np.isinf(value) or value < -1e10
x_best, es = cma.fmin_lq_surr2(
expensive_objective,
[0, 0, 0],
0.5,
not_evaluated=custom_not_evaluated,
options={'maxfevals': 100}
)Pure Python CMA-ES implementation that doesn't require numpy, suitable for environments where numpy is unavailable.
def purecma.fmin(
objective_fct,
xstart,
sigma,
args=(),
maxfevals='1e3 * N**2',
ftarget=None,
verb_disp=100,
verb_log=1,
verb_save=1000
):
"""
Pure Python CMA-ES minimization without numpy dependency.
Minimalistic implementation for reading/understanding CMA-ES algorithm
or when numpy is not available. For serious applications with numpy
available, use cma.fmin2 or cma.CMAEvolutionStrategy instead.
Parameters:
-----------
objective_fct : callable
Function to minimize. Takes list of floats, returns single float.
Should be minimize (lower values are better).
xstart : list or sequence
Initial solution vector. Length defines search space dimension.
sigma : float
Initial step-size (standard deviation in any coordinate).
args : tuple, optional
Additional arguments passed to objective_fct as objective_fct(x, *args).
maxfevals : int or str, optional
Maximum function evaluations. String is evaluated with N as dimension
(default '1e3 * N**2').
ftarget : float, optional
Target function value for termination.
verb_disp : int, optional
Display progress every verb_disp iterations (default 100, 0 for never).
verb_log : int, optional
Log data every verb_log iterations (default 1, 0 for never).
verb_save : int, optional
Save plot every verb_save iterations (default 1000, 0 for never).
Returns:
--------
tuple[list, dict]
(xbest, dict_info) where:
- xbest: best solution found (list of floats)
- dict_info: dictionary with optimization information including
'fbest', 'evals', 'iterations', 'stop'
Examples:
---------
>>> import cma.purecma as purecma
>>>
>>> # Simple optimization without numpy
>>> def sphere(x):
... return sum(xi**2 for xi in x)
>>>
>>> xbest, info = purecma.fmin(sphere, [1, 2, 3], 0.5)
>>> print(f"Best solution: {xbest}")
>>> print(f"Best fitness: {info['fbest']}")
>>> print(f"Evaluations: {info['evals']}")
>>>
>>> # With additional arguments
>>> def shifted_sphere(x, shift):
... return sum((xi - shi)**2 for xi, shi in zip(x, shift))
>>>
>>> xbest, info = purecma.fmin(
... shifted_sphere,
... [0, 0, 0],
... 0.5,
... args=([1, 2, 3],) # Shift vector
... )
>>>
>>> # With termination criteria
>>> xbest, info = purecma.fmin(
... sphere,
... [1, 2, 3, 4, 5],
... 0.3,
... maxfevals=1000,
... ftarget=1e-8,
... verb_disp=50
... )
"""
passclass purecma.CMAES:
"""
Pure Python CMA-ES class providing ask-and-tell interface.
Minimalistic implementation without numpy dependency. Suitable for
understanding the CMA-ES algorithm or when numpy is unavailable.
"""
def __init__(self, xmean, sigma, opts=None):
"""
Initialize pure Python CMA-ES optimizer.
Parameters:
-----------
xmean : list
Initial mean (starting point), determines dimension.
sigma : float
Initial step size (standard deviation).
opts : dict, optional
Options dictionary. Available options:
- 'maxiter': maximum iterations
- 'maxfevals': maximum function evaluations
- 'ftarget': target function value
- 'popsize': population size (default 4 + floor(3*log(N)))
- 'seed': random seed
- 'verb_disp': display verbosity
Examples:
---------
>>> import cma.purecma as purecma
>>>
>>> # Basic initialization
>>> es = purecma.CMAES([0, 0, 0], 0.5)
>>>
>>> # With options
>>> opts = {'maxfevals': 1000, 'popsize': 10, 'seed': 42}
>>> es = purecma.CMAES([1, 2, 3], 0.3, opts)
"""
pass
def ask(self):
"""
Sample candidate solutions from current distribution.
Returns:
--------
list[list]
List of candidate solutions (each solution is a list of floats).
Examples:
---------
>>> es = purecma.CMAES([0, 0], 0.5)
>>> solutions = es.ask()
>>> len(solutions) == es.popsize
True
>>> isinstance(solutions[0], list)
True
"""
pass
def tell(self, arx, arf):
"""
Update distribution based on function evaluations.
This is where the main CMA-ES algorithm implementation occurs.
Parameters:
-----------
arx : list[list]
List of candidate solutions (same as returned by ask()).
arf : list[float]
Corresponding fitness values (lower is better).
Examples:
---------
>>> import cma.purecma as purecma
>>>
>>> def objective(x):
... return sum(xi**2 for xi in x)
>>>
>>> es = purecma.CMAES([1, 2], 0.5)
>>> while not es.stop():
... solutions = es.ask()
... fitness = [objective(x) for x in solutions]
... es.tell(solutions, fitness)
>>>
>>> print(f"Best solution: {es.result()[0]}")
"""
pass
def stop(self):
"""
Check termination criteria.
Returns:
--------
dict or False
Termination conditions if stopped, False otherwise.
"""
pass
def result(self):
"""
Return current best result.
Returns:
--------
tuple[list, float, int, int]
(xbest, fbest, evals, iterations)
"""
pass
def disp(self, verb_modulo=1):
"""
Display current optimization status.
Parameters:
-----------
verb_modulo : int, optional
Display every verb_modulo iterations.
"""
passimport cma.purecma as purecma
# Pattern 1: Simple function minimization
def rosenbrock(x):
"""Pure Python Rosenbrock function."""
return sum(100 * (x[i+1] - x[i]**2)**2 + (1 - x[i])**2
for i in range(len(x) - 1))
# Using functional interface
xbest, info = purecma.fmin(rosenbrock, [0, 0], 0.5, verb_disp=50)
print(f"Functional result: {xbest}, fitness: {info['fbest']}")
# Pattern 2: Ask-and-tell interface with custom loop
es = purecma.CMAES([0, 0], 0.5, {'maxfevals': 2000, 'ftarget': 1e-8})
iteration = 0
while not es.stop():
solutions = es.ask()
fitness_values = [rosenbrock(x) for x in solutions]
es.tell(solutions, fitness_values)
iteration += 1
if iteration % 50 == 0:
es.disp()
result = es.result()
print(f"Ask-tell result: {result[0]}, fitness: {result[1]}")
# Pattern 3: Multi-dimensional problem without numpy
def high_dim_sphere(x):
"""High-dimensional sphere function."""
return sum(xi**2 for xi in x)
# 20-dimensional problem
dimension = 20
initial_point = [0.5] * dimension
xbest, info = purecma.fmin(
high_dim_sphere,
initial_point,
0.3,
maxfevals=dimension * 1000,
verb_disp=100
)
print(f"20D result: fitness = {info['fbest']}, evals = {info['evals']}")
# Pattern 4: Custom termination and monitoring
class OptimizationLogger:
def __init__(self):
self.history = []
def log(self, iteration, fbest, sigma):
self.history.append((iteration, fbest, sigma))
def custom_optimization():
"""Custom optimization with logging and early termination."""
def objective(x):
# Noisy sphere function
import random
return sum(xi**2 for xi in x) + 0.01 * random.gauss(0, 1)
logger = OptimizationLogger()
es = purecma.CMAES([1, 1, 1], 0.5, {'maxfevals': 5000})
best_ever = float('inf')
no_improvement_count = 0
while not es.stop():
solutions = es.ask()
fitness_values = [objective(x) for x in solutions]
es.tell(solutions, fitness_values)
current_best = min(fitness_values)
logger.log(es.countiter, current_best, es.sigma)
# Custom termination: no improvement for 100 iterations
if current_best < best_ever:
best_ever = current_best
no_improvement_count = 0
else:
no_improvement_count += 1
if no_improvement_count > 100:
break
if es.countiter % 50 == 0:
print(f"Iter {es.countiter}: best = {current_best:.6f}, "
f"sigma = {es.sigma:.6f}")
return es.result(), logger.history
result, history = custom_optimization()
print(f"Custom optimization completed: {result[1]:.6f}")Tools for handling noisy objective functions where function values are subject to uncertainty.
class NoiseHandler:
"""
Noise handling for uncertain/noisy objective functions.
Implements adaptive noise handling according to Hansen et al. 2009
"A Method for Handling Uncertainty in Evolutionary Optimization".
Controls noise via step-size increase and number of re-evaluations.
"""
def __init__(self, N, maxevals=[1, 1, 30], aggregate=np.median):
"""
Initialize noise handler.
Parameters:
-----------
N : int
Search space dimension.
maxevals : int or list, optional
Maximum evaluations per solution. If list:
- Single value: max evaluations
- Two values: [min_evals, max_evals]
- Three values: [min_evals, start_evals, max_evals]
Default [1, 1, 30].
aggregate : callable, optional
Function to aggregate multiple evaluations (default np.median).
Common choices: np.mean, np.median, min, max.
Examples:
---------
>>> import cma
>>> import numpy as np
>>>
>>> # Create noise handler for 5D problem
>>> noise_handler = cma.NoiseHandler(5, maxevals=[1, 2, 10])
>>>
>>> # Noisy objective function
>>> def noisy_sphere(x, noise_level=0.1):
... true_value = sum(x**2)
... noise = noise_level * np.random.randn()
... return true_value + noise
>>>
>>> # Use with fmin2
>>> x, es = cma.fmin2(
... noisy_sphere,
... 5 * [0.1],
... 0.5,
... noise_handler=noise_handler,
... options={'maxfevals': 2000}
... )
"""
pass
def __call__(self, func, x, *args, **kwargs):
"""
Evaluate function with noise handling.
Parameters:
-----------
func : callable
Noisy function to evaluate.
x : array-like
Solution to evaluate.
*args, **kwargs :
Additional arguments for func.
Returns:
--------
float
Aggregated function value after multiple evaluations.
"""
pass
@property
def evaluations(self):
"""Current number of evaluations per solution."""
pass
def reeval_if_needed(self, es):
"""
Re-evaluate solutions if noise level suggests it's beneficial.
Parameters:
-----------
es : CMAEvolutionStrategy
Evolution strategy instance to potentially re-evaluate.
"""
passimport cma
import numpy as np
# Pattern 1: Simple noisy optimization
def noisy_rosenbrock(x, noise_std=0.1):
"""Rosenbrock function with additive Gaussian noise."""
true_value = sum(100*(x[1:] - x[:-1]**2)**2 + (1 - x[:-1])**2)
noise = noise_std * np.random.randn()
return true_value + noise
# Use noise handler with fmin2
noise_handler = cma.NoiseHandler(4, maxevals=[1, 2, 20])
x_best, es = cma.fmin2(
noisy_rosenbrock,
4 * [0.1],
0.5,
noise_handler=noise_handler,
options={'maxfevals': 3000, 'verb_disp': 100}
)
print(f"Noisy optimization result: {x_best}")
print(f"Final evaluations per point: {noise_handler.evaluations}")
# Pattern 2: Custom noise aggregation
def robust_aggregation(values):
"""Custom aggregation using trimmed mean."""
values = np.array(values)
if len(values) <= 2:
return np.mean(values)
# Remove extreme 20% of values
trim = max(1, int(0.1 * len(values)))
sorted_vals = np.sort(values)
return np.mean(sorted_vals[trim:-trim])
noise_handler_robust = cma.NoiseHandler(
3,
maxevals=[2, 5, 15],
aggregate=robust_aggregation
)
# Pattern 3: Manual ask-and-tell with noise handler
def manual_noisy_optimization():
"""Manual optimization loop with noise handling."""
def very_noisy_function(x):
# High noise level
return sum(x**2) + 0.5 * np.random.randn()
es = cma.CMAEvolutionStrategy([1, 1, 1], 0.5, {'maxfevals': 2000})
noise_handler = cma.NoiseHandler(3, maxevals=[1, 3, 10])
while not es.stop():
solutions = es.ask()
# Evaluate with noise handling
fitness_values = []
for x in solutions:
# Use noise handler directly
f_val = noise_handler(very_noisy_function, x)
fitness_values.append(f_val)
es.tell(solutions, fitness_values)
# Let noise handler decide about re-evaluations
noise_handler.reeval_if_needed(es)
if es.countiter % 50 == 0:
print(f"Iter {es.countiter}: "
f"evals/point = {noise_handler.evaluations}, "
f"best = {es.result.fbest:.6f}")
return es.result
result = manual_noisy_optimization()
print(f"Manual noisy result: {result.fbest}")
# Pattern 4: Adaptive noise handling parameters
def adaptive_noise_optimization():
"""Optimization with noise level that changes over time."""
iteration_counter = [0] # Use list for mutable closure
def time_varying_noisy_function(x):
iteration_counter[0] += 1
true_val = sum((x - np.array([1, 2, 3]))**2)
# Noise decreases over time (learning/adaptation scenario)
noise_level = 1.0 / (1 + iteration_counter[0] / 1000)
noise = noise_level * np.random.randn()
return true_val + noise
# Start with conservative noise handling
noise_handler = cma.NoiseHandler(3, maxevals=[2, 4, 20])
x_best, es = cma.fmin2(
time_varying_noisy_function,
[0, 0, 0],
0.5,
noise_handler=noise_handler,
options={'maxfevals': 2500}
)
return x_best, es
x_best, es = adaptive_noise_optimization()
print(f"Adaptive noise result: {x_best}")import cma
import numpy as np
def expensive_noisy_function(x):
"""Simulate expensive function with noise and computational cost."""
import time
time.sleep(0.01) # Simulate computation time
true_value = sum((x - np.array([1, 2, 3]))**2)
noise = 0.1 * np.random.randn()
return true_value + noise
# Cannot directly combine lq-surr with NoiseHandler,
# but can use noise-aware evaluation
def noise_aware_expensive_function(x, num_evals=3):
"""Evaluate expensive function multiple times for noise reduction."""
evaluations = [expensive_noisy_function(x) for _ in range(num_evals)]
return np.median(evaluations) # Robust aggregation
# Use surrogate model with noise-reduced evaluations
x_best, es = cma.fmin_lq_surr2(
lambda x: noise_aware_expensive_function(x, 2), # 2 evaluations per point
[0, 0, 0],
0.5,
options={'maxfevals': 60} # Very limited budget
)
print(f"Expensive noisy optimization: {x_best}")
print(f"Total expensive evaluations: ~{es.countevals * 2}")import cma.purecma as purecma
def pure_python_with_constraints():
"""Pure Python optimization with simple box constraints."""
def constrained_objective(x):
# Penalty for constraint violations (simple box constraints)
penalty = 0
for i, xi in enumerate(x):
if xi < -2:
penalty += (xi + 2)**2 * 1000
elif xi > 2:
penalty += (xi - 2)**2 * 1000
# Original objective
obj = sum((xi - (i+1)*0.5)**2 for i, xi in enumerate(x))
return obj + penalty
# Optimize with pure Python CMA-ES
es = purecma.CMAES([0, 0, 0, 0], 0.3, {'maxfevals': 2000})
best_feasible = None
best_feasible_f = float('inf')
while not es.stop():
solutions = es.ask()
fitness_values = [constrained_objective(x) for x in solutions]
es.tell(solutions, fitness_values)
# Track best feasible solution
for x, f in zip(solutions, fitness_values):
if all(-2 <= xi <= 2 for xi in x): # Feasible
if f < best_feasible_f:
best_feasible = x[:]
best_feasible_f = f
if es.countiter % 100 == 0:
es.disp()
result = es.result()
print(f"Pure Python with constraints:")
print(f"Best CMA-ES solution: {result[0]}")
print(f"Best feasible solution: {best_feasible}")
return best_feasible, best_feasible_f
pure_python_with_constraints()Install with Tessl CLI
npx tessl i tessl/pypi-cma