The Python ensemble sampling toolkit for affine-invariant MCMC
—
The core functionality of emcee is provided by the EnsembleSampler class, which implements the affine-invariant ensemble sampler for MCMC. This sampler coordinates an ensemble of walkers that explore the parameter space collectively, making it highly effective for complex, multimodal distributions.
The main sampling engine that manages an ensemble of walkers and orchestrates the MCMC sampling process.
class EnsembleSampler:
def __init__(self, nwalkers: int, ndim: int, log_prob_fn: callable,
pool=None, moves=None, args=None, kwargs=None,
backend=None, vectorize: bool = False, blobs_dtype=None,
parameter_names=None):
"""
Initialize the ensemble sampler.
Args:
nwalkers: Number of walkers in the ensemble (must be >= 2*ndim)
ndim: Number of dimensions in parameter space
log_prob_fn: Function that returns log probability for given parameters
pool: Parallel processing pool (multiprocessing, MPI, etc.)
moves: Single move, list of moves, or weighted list of moves
args: Extra positional arguments for log_prob_fn
kwargs: Extra keyword arguments for log_prob_fn
backend: Storage backend (Backend, HDFBackend, etc.)
vectorize: If True, log_prob_fn accepts list of positions
blobs_dtype: Data type for blob storage
parameter_names: Names for parameters (enables dict parameter passing)
"""Methods for executing MCMC sampling with various control options.
def run_mcmc(self, initial_state, nsteps: int, **kwargs):
"""
Run MCMC for a fixed number of steps.
Args:
initial_state: Starting positions (State object or array [nwalkers, ndim])
nsteps: Number of MCMC steps to run
**kwargs: Additional sampling options (tune, skip_initial_state_check, etc.)
Returns:
State: Final state of the ensemble
"""
def sample(self, initial_state, iterations: int = 1, tune: bool = False,
skip_initial_state_check: bool = False, thin_by: int = 1,
thin=None, store: bool = True, progress: bool = False):
"""
Generator function for step-by-step MCMC sampling.
Args:
initial_state: Starting positions
iterations: Number of iterations to yield
tune: Whether to tune move parameters during sampling
skip_initial_state_check: Skip walker independence check
thin_by: Only store every thin_by steps
thin: Deprecated, use thin_by instead
store: Whether to store samples in backend
progress: Show progress bar
Yields:
State: Current state after each iteration
"""Methods for accessing sampling results and diagnostic information.
def get_chain(self, flat: bool = False, thin: int = 1, discard: int = 0):
"""
Get the stored chain of MCMC samples.
Args:
flat: Flatten chain across ensemble dimension
thin: Take every thin steps
discard: Discard first discard steps as burn-in
Returns:
ndarray: Chain with shape [steps, nwalkers, ndim] or [steps*nwalkers, ndim] if flat
"""
def get_log_prob(self, flat: bool = False, thin: int = 1, discard: int = 0):
"""
Get log probability values for each sample.
Returns:
ndarray: Log probabilities with shape [steps, nwalkers] or [steps*nwalkers] if flat
"""
def get_blobs(self, flat: bool = False, thin: int = 1, discard: int = 0):
"""
Get blob data for each sample.
Returns:
ndarray or None: Blob data if available
"""
def get_autocorr_time(self, tol: int = 50, c: int = 5, quiet: bool = False):
"""
Compute integrated autocorrelation time for each parameter.
Args:
tol: Tolerance for autocorrelation time convergence
c: Window factor for autocorrelation analysis
quiet: Suppress warnings
Returns:
ndarray: Autocorrelation times for each parameter
"""
def get_last_sample(self):
"""
Get the last sample from the chain.
Returns:
State: Last sampled state
"""Properties for accessing sampler state and results.
@property
def chain(self):
"""Get the full chain as ndarray [steps, nwalkers, ndim]"""
@property
def lnprobability(self):
"""Get log probabilities as ndarray [steps, nwalkers]"""
@property
def acceptance_fraction(self):
"""Get acceptance fraction for each walker"""
@property
def acor(self):
"""Get autocorrelation time (deprecated, use get_autocorr_time())"""
@property
def flatchain(self):
"""Get flattened chain [steps*nwalkers, ndim]"""
@property
def flatlnprobability(self):
"""Get flattened log probabilities [steps*nwalkers]"""
@property
def backend(self):
"""Get the backend storage object"""Methods for controlling and resetting the sampler state.
def reset(self):
"""
Reset the sampler to its initial state.
Clears all stored samples and resets iteration counter.
"""Function for checking whether initial walker positions are sufficiently independent.
def walkers_independent(coords):
"""
Check if walker positions are linearly independent.
Args:
coords: Walker positions [nwalkers, ndim]
Returns:
bool: True if walkers are sufficiently independent
"""import emcee
import numpy as np
def log_prob(theta):
# 2D Gaussian example
return -0.5 * np.sum(theta**2)
# Set up ensemble
nwalkers, ndim = 32, 2
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob)
# Initialize walkers
pos = np.random.randn(nwalkers, ndim)
# Run sampling
sampler.run_mcmc(pos, nsteps=1000)
# Access results
chain = sampler.get_chain(discard=100, flat=True)
log_prob_vals = sampler.get_log_prob(discard=100, flat=True)# Initial burn-in with tuning
state = sampler.run_mcmc(pos, 500, tune=True)
# Production sampling
final_state = sampler.run_mcmc(state, 1000, tune=False)
# Check autocorrelation
tau = sampler.get_autocorr_time()
print(f"Autocorrelation time: {tau}")# Step-by-step sampling
pos = np.random.randn(nwalkers, ndim)
for i, state in enumerate(sampler.sample(pos, iterations=1000)):
if i % 100 == 0:
print(f"Step {i}, acceptance fraction: {np.mean(sampler.acceptance_fraction)}")from multiprocessing import Pool
def log_prob(theta):
return -0.5 * np.sum(theta**2)
# Use multiprocessing pool
with Pool() as pool:
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob, pool=pool)
sampler.run_mcmc(pos, 1000)Install with Tessl CLI
npx tessl i tessl/pypi-emcee