Probabilistic Programming in Python: Bayesian Modeling and Probabilistic Machine Learning with PyTensor
—
PyMC provides comprehensive data handling capabilities and flexible backend systems for managing datasets, mini-batching, and storing sampling results. The data system supports dynamic updates, efficient memory usage, and seamless integration with the broader scientific Python ecosystem.
import pymc as pm
def Data(name, value, *, dims=None, export_index_as_coords=True, **kwargs):
"""
Create a data container for observed or input data.
Parameters:
- name (str): Name of the data variable
- value (array-like): Initial data values
- dims (tuple, optional): Dimension names for the data
- export_index_as_coords (bool): Export indices as coordinates
- **kwargs: Additional PyTensor shared variable options
Returns:
- data_var: PyTensor shared variable containing the data
"""
# Basic data container
import numpy as np
X_train = np.random.randn(100, 5)
y_train = np.random.randn(100)
with pm.Model() as model:
# Create data containers
X_data = pm.Data('X_data', X_train)
y_data = pm.Data('y_data', y_train)
# Model using data containers
beta = pm.Normal('beta', 0, 1, shape=X_train.shape[1])
mu = pm.math.dot(X_data, beta)
sigma = pm.HalfNormal('sigma', 1)
# Likelihood using data container
likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma, observed=y_data)
# Data containers with dimensions
coords = {'obs': range(100), 'features': ['f1', 'f2', 'f3', 'f4', 'f5']}
with pm.Model(coords=coords) as dimensional_model:
# Data with explicit dimensions
X_data = pm.Data('X_data', X_train, dims=['obs', 'features'])
y_data = pm.Data('y_data', y_train, dims='obs')
# Model components with matching dimensions
beta = pm.Normal('beta', 0, 1, dims='features')
mu = pm.math.dot(X_data, beta)
likelihood = pm.Normal('likelihood', mu=mu, sigma=1, observed=y_data)def set_data(new_data, *, model=None, coords=None):
"""
Update data values in existing data containers.
Parameters:
- new_data (dict): Dictionary mapping data names to new values
- model (Model, optional): Target model (default: current context)
- coords (dict, optional): New coordinate values
"""
# Update data for out-of-sample prediction
X_test = np.random.randn(50, 5)
# Sample with training data
with model:
trace = pm.sample(1000)
# Update data containers for prediction
pm.set_data({'X_data': X_test, 'y_data': None})
# Generate predictions with new data
with model:
posterior_predictive = pm.sample_posterior_predictive(trace, var_names=['likelihood'])
# Restore original data
pm.set_data({'X_data': X_train, 'y_data': y_train})def Minibatch(name, value, batch_size=None, *, dims=None, random_seed=None, **kwargs):
"""
Create a minibatch data container for large datasets.
Parameters:
- name (str): Name of the minibatch variable
- value (array-like): Full dataset
- batch_size (int, optional): Size of each minibatch
- dims (tuple, optional): Dimension names
- random_seed (int, optional): Seed for batch sampling
- **kwargs: Additional options
Returns:
- minibatch_var: Minibatch data container
"""
# Large dataset minibatching
X_large = np.random.randn(10000, 20)
y_large = np.random.randn(10000)
with pm.Model() as minibatch_model:
# Create minibatch containers
X_mb = pm.Minibatch('X_mb', X_large, batch_size=128)
y_mb = pm.Minibatch('y_mb', y_large, batch_size=128)
# Model parameters
beta = pm.Normal('beta', 0, 1, shape=20)
sigma = pm.HalfNormal('sigma', 1)
# Linear predictor with minibatch data
mu = pm.math.dot(X_mb, beta)
# Likelihood (automatically scaled for minibatching)
likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma,
observed=y_mb, total_size=len(X_large))
# Variational inference works well with minibatches
approx = pm.fit(n=50000, method='advi')
trace = approx.sample(2000)def get_data(dataset_name):
"""
Load example datasets for testing and tutorials.
Parameters:
- dataset_name (str): Name of the dataset to load
Returns:
- data: Loaded dataset as appropriate data structure
"""
# Load example datasets
boston_housing = pm.get_data('boston_housing')
iris_data = pm.get_data('iris')
radon_data = pm.get_data('radon')
print("Available datasets:")
for name, data in [('boston', boston_housing), ('iris', iris_data)]:
print(f" {name}: {data.shape if hasattr(data, 'shape') else type(data)}")from pymc.backends import NDArray
class NDArray:
"""
In-memory trace storage backend.
Parameters:
- vars (list, optional): Variables to trace
- model (Model, optional): PyMC model
Attributes:
- samples (dict): Dictionary of samples by variable name
- sampler_stats (dict): Sampler statistics
"""
def __init__(self, vars=None, model=None):
pass
def setup(self, draws, chain, sampler_vars=None):
"""Set up storage for sampling."""
pass
def record(self, point, sampler_stats=None):
"""Record a sample point."""
pass
def close(self):
"""Finalize the trace."""
pass
# Basic in-memory sampling
with pm.Model() as model:
alpha = pm.Normal('alpha', 0, 1)
beta = pm.Normal('beta', 0, 1)
# Use in-memory backend (default)
trace = pm.sample(1000, trace=pm.backends.NDArray())
# Access samples directly
print(f"Alpha samples shape: {trace.posterior['alpha'].shape}")
print(f"Beta mean: {trace.posterior['beta'].mean():.3f}")from pymc.backends import ZarrTrace
class ZarrTrace:
"""
Zarr-based persistent trace storage.
Parameters:
- dir_path (str): Directory path for Zarr storage
- vars (list, optional): Variables to trace
- model (Model, optional): PyMC model
- append (bool): Append to existing trace
"""
def __init__(self, dir_path=None, vars=None, model=None, append=False):
pass
# Persistent storage with Zarr
import tempfile
import os
with tempfile.TemporaryDirectory() as tmp_dir:
zarr_path = os.path.join(tmp_dir, 'trace.zarr')
with pm.Model() as model:
# Large model...
theta = pm.Normal('theta', 0, 1, shape=100)
# Store trace persistently
trace = pm.sample(10000, trace=pm.backends.ZarrTrace(zarr_path))
# Trace is automatically saved to disk
print(f"Trace stored at: {zarr_path}")
# Load trace later
loaded_trace = pm.backends.ZarrTrace(zarr_path)
print(f"Loaded trace shape: {loaded_trace.posterior['theta'].shape}")from pymc.backends.base import BaseTrace, MultiTrace
class BaseTrace:
"""
Base class for all trace backends.
Methods:
- setup: Initialize trace storage
- record: Record sample point
- close: Finalize trace
- __getitem__: Access samples by variable name
"""
class MultiTrace:
"""
Container for multiple chain traces.
Parameters:
- straces (list): List of single-chain traces
Methods:
- get_values: Extract values for variables
- get_sampler_stats: Extract sampler statistics
- add_values: Add new samples
"""
def __init__(self, straces):
self.straces = straces
def get_values(self, varname, burn=0, thin=1, combine=True, chains=None, squeeze=True):
"""
Get values for a variable across chains.
Parameters:
- varname (str): Variable name
- burn (int): Burn-in samples to exclude
- thin (int): Thinning factor
- combine (bool): Combine chains into single array
- chains (list, optional): Specific chains to extract
- squeeze (bool): Squeeze singleton dimensions
Returns:
- values: Variable samples
"""
pass
# Working with MultiTrace
trace = pm.sample(2000, chains=4) # Returns MultiTrace
# Extract specific variable values
alpha_samples = trace.get_values('alpha', burn=500, thin=2)
beta_samples = trace.get_values('beta', chains=[0, 1], combine=False)
# Get sampler statistics
diverging = trace.get_sampler_stats('diverging')
energy = trace.get_sampler_stats('energy')
print(f"Total divergences: {diverging.sum()}")
print(f"Mean energy: {energy.mean():.3f}")def to_inference_data(trace=None, *, prior=None, posterior_predictive=None,
predictions=None, log_likelihood=None, coords=None,
dims=None, model=None, save_warmup=False, **kwargs):
"""
Convert PyMC trace to ArviZ InferenceData object.
Parameters:
- trace: PyMC trace object
- prior: Prior samples
- posterior_predictive: Posterior predictive samples
- predictions: Out-of-sample predictions
- log_likelihood: Log-likelihood values
- coords (dict): Coordinate specifications
- dims (dict): Dimension specifications
- model: PyMC model
- save_warmup (bool): Include warmup samples
Returns:
- idata: ArviZ InferenceData object
"""
# Convert to ArviZ InferenceData
with pm.Model() as model:
alpha = pm.Normal('alpha', 0, 1)
beta = pm.Normal('beta', 0, 1, shape=3)
sigma = pm.HalfNormal('sigma', 1)
mu = alpha + pm.math.dot(X_data, beta)
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y_data)
# Sample
trace = pm.sample(1000)
# Prior samples
prior_samples = pm.sample_prior_predictive(500)
# Posterior predictive samples
post_pred = pm.sample_posterior_predictive(trace)
# Convert to comprehensive InferenceData
idata = pm.to_inference_data(
trace,
prior=prior_samples,
posterior_predictive=post_pred,
coords={'obs': range(len(y_data)), 'features': ['f1', 'f2', 'f3']},
dims={'beta': ['features'], 'y_obs': ['obs']},
model=model
)
print("InferenceData groups:")
for group_name in idata._groups:
print(f" {group_name}: {list(idata[group_name].data_vars.keys())}")def predictions_to_inference_data(predictions, *, posterior_trace=None,
model=None, coords=None, dims=None, **kwargs):
"""
Convert predictions to InferenceData format.
Parameters:
- predictions: Prediction samples
- posterior_trace: Original posterior trace
- model: PyMC model
- coords (dict): Coordinate specifications
- dims (dict): Dimension specifications
Returns:
- pred_idata: InferenceData with predictions group
"""
# Out-of-sample predictions with proper metadata
X_test = np.random.randn(25, 3)
pm.set_data({'X_data': X_test, 'y_data': None})
with model:
# Generate predictions
predictions = pm.sample_posterior_predictive(
trace,
var_names=['y_obs'],
predictions=True
)
# Convert predictions to InferenceData
pred_idata = pm.predictions_to_inference_data(
predictions,
posterior_trace=trace,
coords={'test_obs': range(25), 'features': ['f1', 'f2', 'f3']},
dims={'y_obs': ['test_obs']},
model=model
)from pymc.blocking import DictToArrayBijection
class DictToArrayBijection:
"""
Bijection between dictionary and array representations of parameters.
Parameters:
- ordering: Variable ordering specification
- vmap: Variable mapping
Methods:
- map: Convert dictionary to array
- rmap: Convert array to dictionary
"""
def __init__(self, ordering, vmap=None):
pass
def map(self, point_dict):
"""Convert parameter dictionary to array."""
pass
def rmap(self, point_array):
"""Convert parameter array to dictionary."""
pass
# Create bijection for efficient array operations
with pm.Model() as model:
alpha = pm.Normal('alpha', 0, 1)
beta = pm.Normal('beta', 0, 1, shape=3)
# Create bijection
bijection = pm.DictToArrayBijection(model.free_RVs)
# Convert point formats
point_dict = {'alpha': 0.5, 'beta': np.array([1.0, -0.5, 0.2])}
point_array = bijection.map(point_dict)
recovered_dict = bijection.rmap(point_array)
print(f"Array representation: {point_array}")
print(f"Recovered dict: {recovered_dict}")class DataLoader:
"""Custom data loader for streaming/dynamic data."""
def __init__(self, data_source, batch_size=128):
self.data_source = data_source
self.batch_size = batch_size
self.current_batch = 0
def get_batch(self):
"""Get next batch of data."""
start_idx = self.current_batch * self.batch_size
end_idx = start_idx + self.batch_size
batch_data = self.data_source[start_idx:end_idx]
self.current_batch += 1
return batch_data
def reset(self):
"""Reset batch counter."""
self.current_batch = 0
# Use with PyMC for streaming inference
data_loader = DataLoader(large_dataset, batch_size=256)
with pm.Model() as streaming_model:
# Initialize with first batch
initial_batch = data_loader.get_batch()
X_mb = pm.Minibatch('X_mb', initial_batch['X'], batch_size=256)
y_mb = pm.Minibatch('y_mb', initial_batch['y'], batch_size=256)
# Model definition...
# Stream through data for variational inference
for epoch in range(10):
data_loader.reset()
# Process all batches in epoch
while True:
try:
batch = data_loader.get_batch()
pm.set_data({'X_mb': batch['X'], 'y_mb': batch['y']})
# Update variational approximation
approx = pm.fit(n=1000, method='advi')
except IndexError: # End of data
breakclass CustomBackend(pm.backends.base.BaseTrace):
"""Custom trace backend for specialized storage needs."""
def __init__(self, custom_storage, vars=None, model=None):
self.custom_storage = custom_storage
super().__init__(vars=vars, model=model)
def setup(self, draws, chain, sampler_vars=None):
"""Initialize custom storage."""
self.chain = chain
self.draws = draws
self.draw_idx = 0
# Initialize storage structures
self.custom_storage.init_chain(chain, draws, self.varnames)
def record(self, point, sampler_stats=None):
"""Record sample to custom storage."""
self.custom_storage.store_sample(
self.chain,
self.draw_idx,
point,
sampler_stats
)
self.draw_idx += 1
def close(self):
"""Finalize custom storage."""
self.custom_storage.finalize_chain(self.chain)
def __getitem__(self, key):
"""Access stored samples."""
return self.custom_storage.get_samples(self.chain, key)
# Use custom backend
# custom_storage = YourCustomStorage() # Implement your storage system
# trace = pm.sample(1000, trace=CustomBackend(custom_storage))# Memory-efficient sampling for large models
with pm.Model() as large_model:
# Very large parameter space
theta = pm.Normal('theta', 0, 1, shape=10000)
# Use memory-mapped storage
with tempfile.TemporaryDirectory() as tmp_dir:
zarr_path = os.path.join(tmp_dir, 'large_trace.zarr')
# Sample with disk storage
trace = pm.sample(
draws=5000,
trace=pm.backends.ZarrTrace(zarr_path),
compute_convergence_checks=False, # Save memory
progressbar=True
)
# Process trace in chunks to avoid memory overflow
chunk_size = 1000
for i in range(0, 5000, chunk_size):
chunk = trace.posterior['theta'][..., i:i+chunk_size, :]
# Process chunk...
processed_chunk = chunk.mean(dim='draw')
print(f"Processed chunk {i//chunk_size + 1}")PyMC's data handling system provides flexible and efficient tools for managing datasets of any size while maintaining seamless integration with the broader PyMC ecosystem and external data sources.
Install with Tessl CLI
npx tessl i tessl/pypi-pymc