CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cma

CMA-ES, Covariance Matrix Adaptation Evolution Strategy for non-linear numerical optimization in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

advanced-optimization.mddocs/

Advanced Optimization

Advanced optimization techniques including surrogate models, pure Python implementation, and noise handling for specialized use cases.

Surrogate Model Optimization

CMA-ES with linear-quadratic surrogate models (lq-CMA-ES) for expensive objective functions where function evaluations are costly.

fmin_lq_surr2 - Surrogate Model Interface (Recommended)

def fmin_lq_surr2(
    objective_function,
    x0,
    sigma0,
    options=None,
    inject=True,
    restarts=0,
    incpopsize=2,
    keep_model=False,
    not_evaluated=np.isnan,
    callback=None
):
    """
    Minimize objective function using lq-CMA-ES with surrogate models.
    
    Linear-quadratic CMA-ES builds global surrogate models (linear or quadratic)
    to reduce the number of expensive function evaluations by predicting
    function values for candidate solutions.
    
    Parameters:
    -----------
    objective_function : callable
        Expensive function to minimize. Called as objective_function(x).
        
    x0 : array-like or callable
        Initial solution estimate. Can be callable returning different 
        starting points for each restart.
        
    sigma0 : float or array-like
        Initial standard deviation (step size).
        
    options : dict, optional
        CMA-ES options. See CMAOptions() for available options.
        Important for surrogate models:
        - 'maxfevals': Maximum true function evaluations (crucial for expensive functions)
        - 'ftarget': Target function value for termination
        
    inject : bool, optional
        Whether to inject best surrogate solution in each iteration (default True).
        
    restarts : int, optional
        Number of IPOP restarts to perform (default 0).
        
    incpopsize : float, optional
        Population size multiplier for restarts (default 2).
        
    keep_model : bool, optional
        Whether to keep surrogate model between restarts (default False).
        If False, new model is built for each restart.
        
    not_evaluated : callable, optional
        Function to test if value indicates missing evaluation (default np.isnan).
        
    callback : callable, optional
        Callback function called each iteration.
        
    Returns:
    --------
    tuple[numpy.ndarray, CMAEvolutionStrategy]
        (xbest, es) where xbest is best solution and es contains optimization details.
        Note: es.result.xfavorite is often the best estimate for the optimum.
        
    Examples:
    ---------
    >>> import cma
    >>> import numpy as np
    >>> 
    >>> # Expensive function simulation
    >>> def expensive_function(x):
    ...     # Simulate expensive computation
    ...     return sum((x - 1)**2) + 0.1 * np.random.randn()
    >>> 
    >>> # Optimize with limited evaluations
    >>> x, es = cma.fmin_lq_surr2(
    ...     expensive_function, 
    ...     [0, 0, 0], 
    ...     0.5,
    ...     options={'maxfevals': 100, 'verb_disp': 0}
    ... )
    >>> print(f"Best solution: {x}")
    >>> print(f"Evaluations used: {es.countevals}")
    >>> 
    >>> # With restarts for robustness  
    >>> x, es = cma.fmin_lq_surr2(
    ...     expensive_function,
    ...     lambda: np.random.randn(3),  # Random start each restart
    ...     0.5,
    ...     restarts=2,
    ...     options={'maxfevals': 200}
    ... )
    >>> 
    >>> # For very expensive functions
    >>> def very_expensive(x):
    ...     # Simulate 10 second computation
    ...     import time
    ...     time.sleep(0.1)  # Reduced for example
    ...     return sum(x**2)
    >>> 
    >>> x, es = cma.fmin_lq_surr2(
    ...     very_expensive,
    ...     [1, 1], 
    ...     0.3,
    ...     options={'maxfevals': 50, 'ftarget': 1e-6}
    ... )
    """
    pass

fmin_lq_surr - Legacy Surrogate Interface

def fmin_lq_surr(objective_function, x0, sigma0, options=None, **kwargs):
    """
    Minimize objective function using lq-CMA-ES with surrogate models.
    
    Legacy interface to surrogate model optimization. Use fmin_lq_surr2 
    for new code as it provides better control and cleaner results.
    
    Parameters:
    -----------
    Same as fmin_lq_surr2 but with different defaults and behavior.
    
    Returns:
    --------
    tuple[numpy.ndarray, CMAEvolutionStrategy]
        (xbest, es) where results may be based on surrogate values.
        
    Notes:
    ------
    - xbest takes into account only recent history, not all evaluations
    - es.result may be confusing as it's partly based on surrogate values
    - es.best contains solution with best surrogate value (not true value)
    - Model is kept same for each restart (use fmin_lq_surr2 to change this)
    """
    pass

Surrogate Model Usage Patterns

import cma
import numpy as np
import functools

# Pattern 1: Simple expensive function optimization
def expensive_objective(x):
    """Simulate expensive function with 1-second delay."""
    import time
    time.sleep(0.01)  # Simulate computation time
    return sum((x - np.array([1, 2, 3]))**2) + 0.01 * np.random.randn()

# Optimize with surrogate model
x_best, es = cma.fmin_lq_surr2(
    expensive_objective,
    [0, 0, 0],
    0.5,
    options={
        'maxfevals': 80,     # Limit true evaluations
        'ftarget': 1e-4,     # Stop when good enough
        'verb_disp': 10      # Show progress every 10 iterations
    }
)

print(f"Found solution: {x_best}")
print(f"True evaluations: {es.countevals}")
print(f"Total iterations: {es.countiter}")

# Pattern 2: Using with functools.partial for additional arguments
def parametric_function(x, a, b):
    """Function with additional parameters."""
    return sum((x - a)**2) + b * sum(np.sin(x))

# Create partial function with fixed parameters
objective = functools.partial(parametric_function, a=np.array([1, 2]), b=0.1)

x_best, es = cma.fmin_lq_surr2(
    objective,
    [0, 0],
    0.3,
    options={'maxfevals': 60}
)

# Pattern 3: Multiple restarts with different starting points
def random_start():
    """Generate random starting point for each restart."""
    return np.random.uniform(-2, 2, 5)

x_best, es = cma.fmin_lq_surr2(
    lambda x: sum(x**4 - 16*x**2 + 5*x),  # Multi-modal function
    random_start,  # Callable for different starts
    0.5,
    restarts=3,
    incpopsize=1.5,  # Smaller population increase
    options={'maxfevals': 200}
)

# Pattern 4: Custom evaluation detection
def custom_not_evaluated(value):
    """Custom function to detect missing evaluations."""
    return np.isnan(value) or np.isinf(value) or value < -1e10

x_best, es = cma.fmin_lq_surr2(
    expensive_objective,
    [0, 0, 0],
    0.5,
    not_evaluated=custom_not_evaluated,
    options={'maxfevals': 100}
)

Pure Python Implementation

Pure Python CMA-ES implementation that doesn't require numpy, suitable for environments where numpy is unavailable.

purecma.fmin - Pure Python Function Interface

def purecma.fmin(
    objective_fct,
    xstart,
    sigma,
    args=(),
    maxfevals='1e3 * N**2',
    ftarget=None,
    verb_disp=100,
    verb_log=1,
    verb_save=1000
):
    """
    Pure Python CMA-ES minimization without numpy dependency.
    
    Minimalistic implementation for reading/understanding CMA-ES algorithm
    or when numpy is not available. For serious applications with numpy
    available, use cma.fmin2 or cma.CMAEvolutionStrategy instead.
    
    Parameters:
    -----------
    objective_fct : callable
        Function to minimize. Takes list of floats, returns single float.
        Should be minimize (lower values are better).
        
    xstart : list or sequence
        Initial solution vector. Length defines search space dimension.
        
    sigma : float
        Initial step-size (standard deviation in any coordinate).
        
    args : tuple, optional
        Additional arguments passed to objective_fct as objective_fct(x, *args).
        
    maxfevals : int or str, optional
        Maximum function evaluations. String is evaluated with N as dimension
        (default '1e3 * N**2').
        
    ftarget : float, optional
        Target function value for termination.
        
    verb_disp : int, optional
        Display progress every verb_disp iterations (default 100, 0 for never).
        
    verb_log : int, optional
        Log data every verb_log iterations (default 1, 0 for never).
        
    verb_save : int, optional
        Save plot every verb_save iterations (default 1000, 0 for never).
        
    Returns:
    --------
    tuple[list, dict]
        (xbest, dict_info) where:
        - xbest: best solution found (list of floats)
        - dict_info: dictionary with optimization information including
          'fbest', 'evals', 'iterations', 'stop'
          
    Examples:
    ---------
    >>> import cma.purecma as purecma
    >>> 
    >>> # Simple optimization without numpy
    >>> def sphere(x):
    ...     return sum(xi**2 for xi in x)
    >>> 
    >>> xbest, info = purecma.fmin(sphere, [1, 2, 3], 0.5)
    >>> print(f"Best solution: {xbest}")
    >>> print(f"Best fitness: {info['fbest']}")
    >>> print(f"Evaluations: {info['evals']}")
    >>> 
    >>> # With additional arguments
    >>> def shifted_sphere(x, shift):
    ...     return sum((xi - shi)**2 for xi, shi in zip(x, shift))
    >>> 
    >>> xbest, info = purecma.fmin(
    ...     shifted_sphere, 
    ...     [0, 0, 0], 
    ...     0.5, 
    ...     args=([1, 2, 3],)  # Shift vector
    ... )
    >>> 
    >>> # With termination criteria
    >>> xbest, info = purecma.fmin(
    ...     sphere,
    ...     [1, 2, 3, 4, 5],
    ...     0.3,
    ...     maxfevals=1000,
    ...     ftarget=1e-8,
    ...     verb_disp=50
    ... )
    """
    pass

purecma.CMAES - Pure Python Class Interface

class purecma.CMAES:
    """
    Pure Python CMA-ES class providing ask-and-tell interface.
    
    Minimalistic implementation without numpy dependency. Suitable for
    understanding the CMA-ES algorithm or when numpy is unavailable.
    """
    
    def __init__(self, xmean, sigma, opts=None):
        """
        Initialize pure Python CMA-ES optimizer.
        
        Parameters:
        -----------
        xmean : list
            Initial mean (starting point), determines dimension.
            
        sigma : float
            Initial step size (standard deviation).
            
        opts : dict, optional
            Options dictionary. Available options:
            - 'maxiter': maximum iterations
            - 'maxfevals': maximum function evaluations  
            - 'ftarget': target function value
            - 'popsize': population size (default 4 + floor(3*log(N)))
            - 'seed': random seed
            - 'verb_disp': display verbosity
            
        Examples:
        ---------
        >>> import cma.purecma as purecma
        >>> 
        >>> # Basic initialization
        >>> es = purecma.CMAES([0, 0, 0], 0.5)
        >>> 
        >>> # With options
        >>> opts = {'maxfevals': 1000, 'popsize': 10, 'seed': 42}
        >>> es = purecma.CMAES([1, 2, 3], 0.3, opts)
        """
        pass
    
    def ask(self):
        """
        Sample candidate solutions from current distribution.
        
        Returns:
        --------
        list[list]
            List of candidate solutions (each solution is a list of floats).
            
        Examples:
        ---------
        >>> es = purecma.CMAES([0, 0], 0.5)
        >>> solutions = es.ask()
        >>> len(solutions) == es.popsize
        True
        >>> isinstance(solutions[0], list)
        True
        """
        pass
    
    def tell(self, arx, arf):
        """
        Update distribution based on function evaluations.
        
        This is where the main CMA-ES algorithm implementation occurs.
        
        Parameters:
        -----------
        arx : list[list]
            List of candidate solutions (same as returned by ask()).
            
        arf : list[float]
            Corresponding fitness values (lower is better).
            
        Examples:
        ---------
        >>> import cma.purecma as purecma
        >>> 
        >>> def objective(x):
        ...     return sum(xi**2 for xi in x)
        >>> 
        >>> es = purecma.CMAES([1, 2], 0.5)
        >>> while not es.stop():
        ...     solutions = es.ask()
        ...     fitness = [objective(x) for x in solutions]
        ...     es.tell(solutions, fitness)
        >>> 
        >>> print(f"Best solution: {es.result()[0]}")
        """
        pass
    
    def stop(self):
        """
        Check termination criteria.
        
        Returns:
        --------
        dict or False
            Termination conditions if stopped, False otherwise.
        """
        pass
    
    def result(self):
        """
        Return current best result.
        
        Returns:
        --------
        tuple[list, float, int, int]
            (xbest, fbest, evals, iterations)
        """
        pass
    
    def disp(self, verb_modulo=1):
        """
        Display current optimization status.
        
        Parameters:
        -----------
        verb_modulo : int, optional
            Display every verb_modulo iterations.
        """
        pass

Pure Python Usage Patterns

import cma.purecma as purecma

# Pattern 1: Simple function minimization
def rosenbrock(x):
    """Pure Python Rosenbrock function."""
    return sum(100 * (x[i+1] - x[i]**2)**2 + (1 - x[i])**2 
               for i in range(len(x) - 1))

# Using functional interface
xbest, info = purecma.fmin(rosenbrock, [0, 0], 0.5, verb_disp=50)
print(f"Functional result: {xbest}, fitness: {info['fbest']}")

# Pattern 2: Ask-and-tell interface with custom loop
es = purecma.CMAES([0, 0], 0.5, {'maxfevals': 2000, 'ftarget': 1e-8})

iteration = 0
while not es.stop():
    solutions = es.ask()
    fitness_values = [rosenbrock(x) for x in solutions]
    es.tell(solutions, fitness_values)
    
    iteration += 1
    if iteration % 50 == 0:
        es.disp()

result = es.result()
print(f"Ask-tell result: {result[0]}, fitness: {result[1]}")

# Pattern 3: Multi-dimensional problem without numpy
def high_dim_sphere(x):
    """High-dimensional sphere function."""
    return sum(xi**2 for xi in x)

# 20-dimensional problem
dimension = 20
initial_point = [0.5] * dimension
xbest, info = purecma.fmin(
    high_dim_sphere, 
    initial_point, 
    0.3,
    maxfevals=dimension * 1000,
    verb_disp=100
)

print(f"20D result: fitness = {info['fbest']}, evals = {info['evals']}")

# Pattern 4: Custom termination and monitoring
class OptimizationLogger:
    def __init__(self):
        self.history = []
    
    def log(self, iteration, fbest, sigma):
        self.history.append((iteration, fbest, sigma))

def custom_optimization():
    """Custom optimization with logging and early termination."""
    
    def objective(x):
        # Noisy sphere function
        import random
        return sum(xi**2 for xi in x) + 0.01 * random.gauss(0, 1)
    
    logger = OptimizationLogger()
    es = purecma.CMAES([1, 1, 1], 0.5, {'maxfevals': 5000})
    
    best_ever = float('inf')
    no_improvement_count = 0
    
    while not es.stop():
        solutions = es.ask()
        fitness_values = [objective(x) for x in solutions]
        es.tell(solutions, fitness_values)
        
        current_best = min(fitness_values)
        logger.log(es.countiter, current_best, es.sigma)
        
        # Custom termination: no improvement for 100 iterations
        if current_best < best_ever:
            best_ever = current_best
            no_improvement_count = 0
        else:
            no_improvement_count += 1
            
        if no_improvement_count > 100:
            break
            
        if es.countiter % 50 == 0:
            print(f"Iter {es.countiter}: best = {current_best:.6f}, "
                  f"sigma = {es.sigma:.6f}")
    
    return es.result(), logger.history

result, history = custom_optimization()
print(f"Custom optimization completed: {result[1]:.6f}")

Noise Handling

Tools for handling noisy objective functions where function values are subject to uncertainty.

NoiseHandler Class

class NoiseHandler:
    """
    Noise handling for uncertain/noisy objective functions.
    
    Implements adaptive noise handling according to Hansen et al. 2009
    "A Method for Handling Uncertainty in Evolutionary Optimization".
    
    Controls noise via step-size increase and number of re-evaluations.
    """
    
    def __init__(self, N, maxevals=[1, 1, 30], aggregate=np.median):
        """
        Initialize noise handler.
        
        Parameters:
        -----------
        N : int
            Search space dimension.
            
        maxevals : int or list, optional
            Maximum evaluations per solution. If list:
            - Single value: max evaluations
            - Two values: [min_evals, max_evals] 
            - Three values: [min_evals, start_evals, max_evals]
            Default [1, 1, 30].
            
        aggregate : callable, optional
            Function to aggregate multiple evaluations (default np.median).
            Common choices: np.mean, np.median, min, max.
            
        Examples:
        ---------
        >>> import cma
        >>> import numpy as np
        >>> 
        >>> # Create noise handler for 5D problem
        >>> noise_handler = cma.NoiseHandler(5, maxevals=[1, 2, 10])
        >>> 
        >>> # Noisy objective function
        >>> def noisy_sphere(x, noise_level=0.1):
        ...     true_value = sum(x**2)
        ...     noise = noise_level * np.random.randn()
        ...     return true_value + noise
        >>> 
        >>> # Use with fmin2
        >>> x, es = cma.fmin2(
        ...     noisy_sphere, 
        ...     5 * [0.1], 
        ...     0.5,
        ...     noise_handler=noise_handler,
        ...     options={'maxfevals': 2000}
        ... )
        """
        pass
    
    def __call__(self, func, x, *args, **kwargs):
        """
        Evaluate function with noise handling.
        
        Parameters:
        -----------
        func : callable
            Noisy function to evaluate.
            
        x : array-like
            Solution to evaluate.
            
        *args, **kwargs : 
            Additional arguments for func.
            
        Returns:
        --------
        float
            Aggregated function value after multiple evaluations.
        """
        pass
    
    @property
    def evaluations(self):
        """Current number of evaluations per solution."""
        pass
    
    def reeval_if_needed(self, es):
        """
        Re-evaluate solutions if noise level suggests it's beneficial.
        
        Parameters:
        -----------
        es : CMAEvolutionStrategy
            Evolution strategy instance to potentially re-evaluate.
        """
        pass

Noise Handling Usage Patterns

import cma
import numpy as np

# Pattern 1: Simple noisy optimization
def noisy_rosenbrock(x, noise_std=0.1):
    """Rosenbrock function with additive Gaussian noise."""
    true_value = sum(100*(x[1:] - x[:-1]**2)**2 + (1 - x[:-1])**2)
    noise = noise_std * np.random.randn()
    return true_value + noise

# Use noise handler with fmin2
noise_handler = cma.NoiseHandler(4, maxevals=[1, 2, 20])

x_best, es = cma.fmin2(
    noisy_rosenbrock,
    4 * [0.1],
    0.5,
    noise_handler=noise_handler,
    options={'maxfevals': 3000, 'verb_disp': 100}
)

print(f"Noisy optimization result: {x_best}")
print(f"Final evaluations per point: {noise_handler.evaluations}")

# Pattern 2: Custom noise aggregation
def robust_aggregation(values):
    """Custom aggregation using trimmed mean."""
    values = np.array(values)
    if len(values) <= 2:
        return np.mean(values)
    # Remove extreme 20% of values
    trim = max(1, int(0.1 * len(values)))
    sorted_vals = np.sort(values)
    return np.mean(sorted_vals[trim:-trim])

noise_handler_robust = cma.NoiseHandler(
    3, 
    maxevals=[2, 5, 15], 
    aggregate=robust_aggregation
)

# Pattern 3: Manual ask-and-tell with noise handler
def manual_noisy_optimization():
    """Manual optimization loop with noise handling."""
    
    def very_noisy_function(x):
        # High noise level
        return sum(x**2) + 0.5 * np.random.randn()
    
    es = cma.CMAEvolutionStrategy([1, 1, 1], 0.5, {'maxfevals': 2000})
    noise_handler = cma.NoiseHandler(3, maxevals=[1, 3, 10])
    
    while not es.stop():
        solutions = es.ask()
        
        # Evaluate with noise handling
        fitness_values = []
        for x in solutions:
            # Use noise handler directly
            f_val = noise_handler(very_noisy_function, x)
            fitness_values.append(f_val)
        
        es.tell(solutions, fitness_values)
        
        # Let noise handler decide about re-evaluations
        noise_handler.reeval_if_needed(es)
        
        if es.countiter % 50 == 0:
            print(f"Iter {es.countiter}: "
                  f"evals/point = {noise_handler.evaluations}, "
                  f"best = {es.result.fbest:.6f}")
    
    return es.result

result = manual_noisy_optimization()
print(f"Manual noisy result: {result.fbest}")

# Pattern 4: Adaptive noise handling parameters
def adaptive_noise_optimization():
    """Optimization with noise level that changes over time."""
    
    iteration_counter = [0]  # Use list for mutable closure
    
    def time_varying_noisy_function(x):
        iteration_counter[0] += 1
        true_val = sum((x - np.array([1, 2, 3]))**2)
        
        # Noise decreases over time (learning/adaptation scenario)
        noise_level = 1.0 / (1 + iteration_counter[0] / 1000)
        noise = noise_level * np.random.randn()
        return true_val + noise
    
    # Start with conservative noise handling
    noise_handler = cma.NoiseHandler(3, maxevals=[2, 4, 20])
    
    x_best, es = cma.fmin2(
        time_varying_noisy_function,
        [0, 0, 0],
        0.5,
        noise_handler=noise_handler,
        options={'maxfevals': 2500}
    )
    
    return x_best, es

x_best, es = adaptive_noise_optimization()
print(f"Adaptive noise result: {x_best}")

Integration Examples

Combining Surrogate Models with Noise Handling

import cma
import numpy as np

def expensive_noisy_function(x):
    """Simulate expensive function with noise and computational cost."""
    import time
    time.sleep(0.01)  # Simulate computation time
    
    true_value = sum((x - np.array([1, 2, 3]))**2)
    noise = 0.1 * np.random.randn()
    return true_value + noise

# Cannot directly combine lq-surr with NoiseHandler, 
# but can use noise-aware evaluation
def noise_aware_expensive_function(x, num_evals=3):
    """Evaluate expensive function multiple times for noise reduction."""
    evaluations = [expensive_noisy_function(x) for _ in range(num_evals)]
    return np.median(evaluations)  # Robust aggregation

# Use surrogate model with noise-reduced evaluations
x_best, es = cma.fmin_lq_surr2(
    lambda x: noise_aware_expensive_function(x, 2),  # 2 evaluations per point
    [0, 0, 0],
    0.5,
    options={'maxfevals': 60}  # Very limited budget
)

print(f"Expensive noisy optimization: {x_best}")
print(f"Total expensive evaluations: ~{es.countevals * 2}")

Pure Python with Custom Features

import cma.purecma as purecma

def pure_python_with_constraints():
    """Pure Python optimization with simple box constraints."""
    
    def constrained_objective(x):
        # Penalty for constraint violations (simple box constraints)
        penalty = 0
        for i, xi in enumerate(x):
            if xi < -2:
                penalty += (xi + 2)**2 * 1000
            elif xi > 2:  
                penalty += (xi - 2)**2 * 1000
        
        # Original objective
        obj = sum((xi - (i+1)*0.5)**2 for i, xi in enumerate(x))
        return obj + penalty
    
    # Optimize with pure Python CMA-ES
    es = purecma.CMAES([0, 0, 0, 0], 0.3, {'maxfevals': 2000})
    
    best_feasible = None
    best_feasible_f = float('inf')
    
    while not es.stop():
        solutions = es.ask()
        fitness_values = [constrained_objective(x) for x in solutions]
        es.tell(solutions, fitness_values)
        
        # Track best feasible solution
        for x, f in zip(solutions, fitness_values):
            if all(-2 <= xi <= 2 for xi in x):  # Feasible
                if f < best_feasible_f:
                    best_feasible = x[:]
                    best_feasible_f = f
        
        if es.countiter % 100 == 0:
            es.disp()
    
    result = es.result()
    print(f"Pure Python with constraints:")
    print(f"Best CMA-ES solution: {result[0]}")
    print(f"Best feasible solution: {best_feasible}")
    
    return best_feasible, best_feasible_f

pure_python_with_constraints()

Install with Tessl CLI

npx tessl i tessl/pypi-cma

docs

advanced-optimization.md

configuration.md

constraints-boundaries.md

core-optimization.md

fitness-functions.md

index.md

logging-analysis.md

samplers-adaptation.md

tile.json