CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymc

Probabilistic Programming in Python: Bayesian Modeling and Probabilistic Machine Learning with PyTensor

Pending
Overview
Eval results
Files

data.mddocs/

PyMC Data Handling and Backends

PyMC provides comprehensive data handling capabilities and flexible backend systems for managing datasets, mini-batching, and storing sampling results. The data system supports dynamic updates, efficient memory usage, and seamless integration with the broader scientific Python ecosystem.

Data Containers

Data Function

import pymc as pm

def Data(name, value, *, dims=None, export_index_as_coords=True, **kwargs):
    """
    Create a data container for observed or input data.
    
    Parameters:
    - name (str): Name of the data variable
    - value (array-like): Initial data values
    - dims (tuple, optional): Dimension names for the data
    - export_index_as_coords (bool): Export indices as coordinates
    - **kwargs: Additional PyTensor shared variable options
    
    Returns:
    - data_var: PyTensor shared variable containing the data
    """

# Basic data container
import numpy as np

X_train = np.random.randn(100, 5)
y_train = np.random.randn(100)

with pm.Model() as model:
    # Create data containers
    X_data = pm.Data('X_data', X_train)
    y_data = pm.Data('y_data', y_train)
    
    # Model using data containers
    beta = pm.Normal('beta', 0, 1, shape=X_train.shape[1])
    mu = pm.math.dot(X_data, beta)
    sigma = pm.HalfNormal('sigma', 1)
    
    # Likelihood using data container
    likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma, observed=y_data)

# Data containers with dimensions
coords = {'obs': range(100), 'features': ['f1', 'f2', 'f3', 'f4', 'f5']}

with pm.Model(coords=coords) as dimensional_model:
    # Data with explicit dimensions
    X_data = pm.Data('X_data', X_train, dims=['obs', 'features'])
    y_data = pm.Data('y_data', y_train, dims='obs')
    
    # Model components with matching dimensions
    beta = pm.Normal('beta', 0, 1, dims='features')
    mu = pm.math.dot(X_data, beta)
    likelihood = pm.Normal('likelihood', mu=mu, sigma=1, observed=y_data)

Data Updates

def set_data(new_data, *, model=None, coords=None):
    """
    Update data values in existing data containers.
    
    Parameters:
    - new_data (dict): Dictionary mapping data names to new values
    - model (Model, optional): Target model (default: current context)
    - coords (dict, optional): New coordinate values
    """

# Update data for out-of-sample prediction
X_test = np.random.randn(50, 5)

# Sample with training data
with model:
    trace = pm.sample(1000)

# Update data containers for prediction
pm.set_data({'X_data': X_test, 'y_data': None})

# Generate predictions with new data
with model:
    posterior_predictive = pm.sample_posterior_predictive(trace, var_names=['likelihood'])

# Restore original data
pm.set_data({'X_data': X_train, 'y_data': y_train})

Minibatch Data

def Minibatch(name, value, batch_size=None, *, dims=None, random_seed=None, **kwargs):
    """
    Create a minibatch data container for large datasets.
    
    Parameters:
    - name (str): Name of the minibatch variable
    - value (array-like): Full dataset
    - batch_size (int, optional): Size of each minibatch
    - dims (tuple, optional): Dimension names
    - random_seed (int, optional): Seed for batch sampling
    - **kwargs: Additional options
    
    Returns:
    - minibatch_var: Minibatch data container
    """

# Large dataset minibatching
X_large = np.random.randn(10000, 20)
y_large = np.random.randn(10000)

with pm.Model() as minibatch_model:
    # Create minibatch containers
    X_mb = pm.Minibatch('X_mb', X_large, batch_size=128)
    y_mb = pm.Minibatch('y_mb', y_large, batch_size=128)
    
    # Model parameters
    beta = pm.Normal('beta', 0, 1, shape=20)
    sigma = pm.HalfNormal('sigma', 1)
    
    # Linear predictor with minibatch data
    mu = pm.math.dot(X_mb, beta)
    
    # Likelihood (automatically scaled for minibatching)
    likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma, 
                          observed=y_mb, total_size=len(X_large))
    
    # Variational inference works well with minibatches
    approx = pm.fit(n=50000, method='advi')
    trace = approx.sample(2000)

Data Utilities

def get_data(dataset_name):
    """
    Load example datasets for testing and tutorials.
    
    Parameters:
    - dataset_name (str): Name of the dataset to load
    
    Returns:
    - data: Loaded dataset as appropriate data structure
    """

# Load example datasets
boston_housing = pm.get_data('boston_housing')
iris_data = pm.get_data('iris')
radon_data = pm.get_data('radon')

print("Available datasets:")
for name, data in [('boston', boston_housing), ('iris', iris_data)]:
    print(f"  {name}: {data.shape if hasattr(data, 'shape') else type(data)}")

Trace Storage Backends

NDArray Backend (In-Memory)

from pymc.backends import NDArray

class NDArray:
    """
    In-memory trace storage backend.
    
    Parameters:
    - vars (list, optional): Variables to trace
    - model (Model, optional): PyMC model
    
    Attributes:
    - samples (dict): Dictionary of samples by variable name
    - sampler_stats (dict): Sampler statistics
    """
    
    def __init__(self, vars=None, model=None):
        pass
    
    def setup(self, draws, chain, sampler_vars=None):
        """Set up storage for sampling."""
        pass
    
    def record(self, point, sampler_stats=None):
        """Record a sample point."""
        pass
    
    def close(self):
        """Finalize the trace."""
        pass

# Basic in-memory sampling
with pm.Model() as model:
    alpha = pm.Normal('alpha', 0, 1)
    beta = pm.Normal('beta', 0, 1)
    
    # Use in-memory backend (default)
    trace = pm.sample(1000, trace=pm.backends.NDArray())

# Access samples directly
print(f"Alpha samples shape: {trace.posterior['alpha'].shape}")
print(f"Beta mean: {trace.posterior['beta'].mean():.3f}")

Zarr Backend (Persistent Storage)

from pymc.backends import ZarrTrace

class ZarrTrace:
    """
    Zarr-based persistent trace storage.
    
    Parameters:
    - dir_path (str): Directory path for Zarr storage
    - vars (list, optional): Variables to trace
    - model (Model, optional): PyMC model
    - append (bool): Append to existing trace
    """
    
    def __init__(self, dir_path=None, vars=None, model=None, append=False):
        pass

# Persistent storage with Zarr
import tempfile
import os

with tempfile.TemporaryDirectory() as tmp_dir:
    zarr_path = os.path.join(tmp_dir, 'trace.zarr')
    
    with pm.Model() as model:
        # Large model...
        theta = pm.Normal('theta', 0, 1, shape=100)
        
        # Store trace persistently
        trace = pm.sample(10000, trace=pm.backends.ZarrTrace(zarr_path))
    
    # Trace is automatically saved to disk
    print(f"Trace stored at: {zarr_path}")
    
    # Load trace later
    loaded_trace = pm.backends.ZarrTrace(zarr_path)
    print(f"Loaded trace shape: {loaded_trace.posterior['theta'].shape}")

Base Trace Classes

from pymc.backends.base import BaseTrace, MultiTrace

class BaseTrace:
    """
    Base class for all trace backends.
    
    Methods:
    - setup: Initialize trace storage
    - record: Record sample point
    - close: Finalize trace
    - __getitem__: Access samples by variable name
    """

class MultiTrace:
    """
    Container for multiple chain traces.
    
    Parameters:
    - straces (list): List of single-chain traces
    
    Methods:
    - get_values: Extract values for variables
    - get_sampler_stats: Extract sampler statistics
    - add_values: Add new samples
    """
    
    def __init__(self, straces):
        self.straces = straces
    
    def get_values(self, varname, burn=0, thin=1, combine=True, chains=None, squeeze=True):
        """
        Get values for a variable across chains.
        
        Parameters:
        - varname (str): Variable name
        - burn (int): Burn-in samples to exclude
        - thin (int): Thinning factor
        - combine (bool): Combine chains into single array
        - chains (list, optional): Specific chains to extract
        - squeeze (bool): Squeeze singleton dimensions
        
        Returns:
        - values: Variable samples
        """
        pass

# Working with MultiTrace
trace = pm.sample(2000, chains=4)  # Returns MultiTrace

# Extract specific variable values
alpha_samples = trace.get_values('alpha', burn=500, thin=2)
beta_samples = trace.get_values('beta', chains=[0, 1], combine=False)

# Get sampler statistics
diverging = trace.get_sampler_stats('diverging')
energy = trace.get_sampler_stats('energy')

print(f"Total divergences: {diverging.sum()}")
print(f"Mean energy: {energy.mean():.3f}")

ArviZ Integration

InferenceData Conversion

def to_inference_data(trace=None, *, prior=None, posterior_predictive=None,
                     predictions=None, log_likelihood=None, coords=None,
                     dims=None, model=None, save_warmup=False, **kwargs):
    """
    Convert PyMC trace to ArviZ InferenceData object.
    
    Parameters:
    - trace: PyMC trace object
    - prior: Prior samples
    - posterior_predictive: Posterior predictive samples  
    - predictions: Out-of-sample predictions
    - log_likelihood: Log-likelihood values
    - coords (dict): Coordinate specifications
    - dims (dict): Dimension specifications
    - model: PyMC model
    - save_warmup (bool): Include warmup samples
    
    Returns:
    - idata: ArviZ InferenceData object
    """

# Convert to ArviZ InferenceData
with pm.Model() as model:
    alpha = pm.Normal('alpha', 0, 1)
    beta = pm.Normal('beta', 0, 1, shape=3)
    sigma = pm.HalfNormal('sigma', 1)
    
    mu = alpha + pm.math.dot(X_data, beta)
    y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y_data)
    
    # Sample
    trace = pm.sample(1000)
    
    # Prior samples
    prior_samples = pm.sample_prior_predictive(500)
    
    # Posterior predictive samples
    post_pred = pm.sample_posterior_predictive(trace)

# Convert to comprehensive InferenceData
idata = pm.to_inference_data(
    trace,
    prior=prior_samples,
    posterior_predictive=post_pred,
    coords={'obs': range(len(y_data)), 'features': ['f1', 'f2', 'f3']},
    dims={'beta': ['features'], 'y_obs': ['obs']},
    model=model
)

print("InferenceData groups:")
for group_name in idata._groups:
    print(f"  {group_name}: {list(idata[group_name].data_vars.keys())}")

Predictions Integration

def predictions_to_inference_data(predictions, *, posterior_trace=None,
                                model=None, coords=None, dims=None, **kwargs):
    """
    Convert predictions to InferenceData format.
    
    Parameters:
    - predictions: Prediction samples
    - posterior_trace: Original posterior trace
    - model: PyMC model
    - coords (dict): Coordinate specifications
    - dims (dict): Dimension specifications
    
    Returns:
    - pred_idata: InferenceData with predictions group
    """

# Out-of-sample predictions with proper metadata
X_test = np.random.randn(25, 3)
pm.set_data({'X_data': X_test, 'y_data': None})

with model:
    # Generate predictions
    predictions = pm.sample_posterior_predictive(
        trace, 
        var_names=['y_obs'],
        predictions=True
    )

# Convert predictions to InferenceData
pred_idata = pm.predictions_to_inference_data(
    predictions,
    posterior_trace=trace,
    coords={'test_obs': range(25), 'features': ['f1', 'f2', 'f3']},
    dims={'y_obs': ['test_obs']},
    model=model
)

Data Processing Utilities

Dictionary-Array Mapping

from pymc.blocking import DictToArrayBijection

class DictToArrayBijection:
    """
    Bijection between dictionary and array representations of parameters.
    
    Parameters:
    - ordering: Variable ordering specification
    - vmap: Variable mapping
    
    Methods:
    - map: Convert dictionary to array
    - rmap: Convert array to dictionary
    """
    
    def __init__(self, ordering, vmap=None):
        pass
    
    def map(self, point_dict):
        """Convert parameter dictionary to array."""
        pass
    
    def rmap(self, point_array):  
        """Convert parameter array to dictionary."""
        pass

# Create bijection for efficient array operations
with pm.Model() as model:
    alpha = pm.Normal('alpha', 0, 1)
    beta = pm.Normal('beta', 0, 1, shape=3)
    
    # Create bijection
    bijection = pm.DictToArrayBijection(model.free_RVs)
    
    # Convert point formats
    point_dict = {'alpha': 0.5, 'beta': np.array([1.0, -0.5, 0.2])}
    point_array = bijection.map(point_dict)
    recovered_dict = bijection.rmap(point_array)
    
    print(f"Array representation: {point_array}")
    print(f"Recovered dict: {recovered_dict}")

Advanced Data Patterns

Dynamic Data Loading

class DataLoader:
    """Custom data loader for streaming/dynamic data."""
    
    def __init__(self, data_source, batch_size=128):
        self.data_source = data_source
        self.batch_size = batch_size
        self.current_batch = 0
    
    def get_batch(self):
        """Get next batch of data."""
        start_idx = self.current_batch * self.batch_size
        end_idx = start_idx + self.batch_size
        
        batch_data = self.data_source[start_idx:end_idx]
        self.current_batch += 1
        
        return batch_data
    
    def reset(self):
        """Reset batch counter."""
        self.current_batch = 0

# Use with PyMC for streaming inference
data_loader = DataLoader(large_dataset, batch_size=256)

with pm.Model() as streaming_model:
    # Initialize with first batch
    initial_batch = data_loader.get_batch()
    X_mb = pm.Minibatch('X_mb', initial_batch['X'], batch_size=256)
    y_mb = pm.Minibatch('y_mb', initial_batch['y'], batch_size=256)
    
    # Model definition...
    
    # Stream through data for variational inference
    for epoch in range(10):
        data_loader.reset()
        
        # Process all batches in epoch
        while True:
            try:
                batch = data_loader.get_batch()
                pm.set_data({'X_mb': batch['X'], 'y_mb': batch['y']})
                
                # Update variational approximation
                approx = pm.fit(n=1000, method='advi')
                
            except IndexError:  # End of data
                break

Custom Backend Implementation

class CustomBackend(pm.backends.base.BaseTrace):
    """Custom trace backend for specialized storage needs."""
    
    def __init__(self, custom_storage, vars=None, model=None):
        self.custom_storage = custom_storage
        super().__init__(vars=vars, model=model)
    
    def setup(self, draws, chain, sampler_vars=None):
        """Initialize custom storage."""
        self.chain = chain
        self.draws = draws
        self.draw_idx = 0
        
        # Initialize storage structures
        self.custom_storage.init_chain(chain, draws, self.varnames)
    
    def record(self, point, sampler_stats=None):
        """Record sample to custom storage."""
        self.custom_storage.store_sample(
            self.chain, 
            self.draw_idx, 
            point, 
            sampler_stats
        )
        self.draw_idx += 1
    
    def close(self):
        """Finalize custom storage."""
        self.custom_storage.finalize_chain(self.chain)
    
    def __getitem__(self, key):
        """Access stored samples."""
        return self.custom_storage.get_samples(self.chain, key)

# Use custom backend
# custom_storage = YourCustomStorage()  # Implement your storage system
# trace = pm.sample(1000, trace=CustomBackend(custom_storage))

Memory Management

# Memory-efficient sampling for large models
with pm.Model() as large_model:
    # Very large parameter space
    theta = pm.Normal('theta', 0, 1, shape=10000)
    
    # Use memory-mapped storage
    with tempfile.TemporaryDirectory() as tmp_dir:
        zarr_path = os.path.join(tmp_dir, 'large_trace.zarr')
        
        # Sample with disk storage
        trace = pm.sample(
            draws=5000,
            trace=pm.backends.ZarrTrace(zarr_path),
            compute_convergence_checks=False,  # Save memory
            progressbar=True
        )
        
        # Process trace in chunks to avoid memory overflow
        chunk_size = 1000
        for i in range(0, 5000, chunk_size):
            chunk = trace.posterior['theta'][..., i:i+chunk_size, :]
            # Process chunk...
            processed_chunk = chunk.mean(dim='draw')
            print(f"Processed chunk {i//chunk_size + 1}")

PyMC's data handling system provides flexible and efficient tools for managing datasets of any size while maintaining seamless integration with the broader PyMC ecosystem and external data sources.

Install with Tessl CLI

npx tessl i tessl/pypi-pymc

docs

data.md

distributions.md

gp.md

index.md

math.md

model.md

ode.md

sampling.md

stats.md

variational.md

tile.json