Exploratory analysis of Bayesian models with comprehensive data manipulation, statistical diagnostics, and visualization capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Performance optimization utilities including Numba JIT compilation, Dask parallelization, and interactive backend management for Jupyter environments.
class Numba:
"""
Numba JIT compilation utilities for performance optimization.
Enables Just-In-Time compilation of critical ArviZ functions
for significant performance improvements, especially with large datasets.
"""
numba_flag: bool
"""Current state of Numba JIT compilation (True if enabled)."""
@classmethod
def enable_numba(cls):
"""
Enable Numba JIT compilation for supported ArviZ functions.
Improves performance for computationally intensive operations
like statistical calculations and data transformations.
"""
@classmethod
def disable_numba(cls):
"""
Disable Numba JIT compilation and fall back to pure Python/NumPy.
Useful for debugging or when Numba installation issues occur.
"""import arviz as az
# Check current Numba status
print(f"Numba enabled: {az.Numba.numba_flag}")
# Enable Numba acceleration
az.Numba.enable_numba()
# Compute statistics with JIT acceleration
idata = az.load_arviz_data("centered_eight")
summary = az.summary(idata) # Faster with Numba
rhat = az.rhat(idata) # Accelerated convergence diagnostics
ess = az.ess(idata) # Faster ESS computation
# Disable if needed (e.g., for debugging)
az.Numba.disable_numba()class Dask:
"""
Dask parallel computation utilities for distributed processing.
Enables parallel execution of ArviZ computations across multiple
cores or distributed clusters for improved performance on large datasets.
"""
dask_flag: bool
"""Current state of Dask parallelization (True if enabled)."""
dask_kwargs: dict
"""Dictionary of Dask configuration parameters."""
@classmethod
def enable_dask(cls, dask_kwargs: dict = None):
"""
Enable Dask parallel computation for supported ArviZ functions.
Args:
dask_kwargs (dict, optional): Dask scheduler and worker configuration
Example: {"scheduler": "threads", "num_workers": 4}
"""
@classmethod
def disable_dask(cls):
"""
Disable Dask parallelization and use single-threaded computation.
"""# Check current Dask status
print(f"Dask enabled: {az.Dask.dask_flag}")
print(f"Dask config: {az.Dask.dask_kwargs}")
# Enable Dask with custom configuration
dask_config = {
"scheduler": "threads", # or "processes", "distributed"
"num_workers": 4 # number of parallel workers
}
az.Dask.enable_dask(dask_config)
# Computations now run in parallel
large_idata = az.load_arviz_data("rugby")
summary = az.summary(large_idata) # Parallel summary computation
loo_result = az.loo(large_idata) # Parallel LOO-CV computation
# Disable Dask
az.Dask.disable_dask()# Distributed computing setup
distributed_config = {
"scheduler": "distributed",
"address": "scheduler-address:8786", # Dask scheduler address
"num_workers": 8
}
az.Dask.enable_dask(distributed_config)
# Process-based parallelism (for CPU-bound tasks)
process_config = {
"scheduler": "processes",
"num_workers": 4,
"threads_per_worker": 2
}
az.Dask.enable_dask(process_config)
# Thread-based parallelism (for I/O-bound tasks)
thread_config = {
"scheduler": "threads",
"num_workers": 8
}
az.Dask.enable_dask(thread_config)class interactive_backend:
"""
Context manager for interactive plotting backends in Jupyter environments.
Manages switching between inline static plots and interactive plots
that can be displayed in separate windows or embedded widgets.
"""
def __init__(self, backend: str = ""):
"""
Initialize interactive backend context manager.
Args:
backend (str, optional): Interactive backend to use
Options: "notebook", "lab", "colab", "kaggle"
If empty, automatically detects environment
"""
def __enter__(self):
"""Enter interactive plotting mode."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit interactive mode and restore previous settings."""# Basic interactive plotting
with az.interactive_backend():
az.plot_trace(idata) # Opens in interactive window
az.plot_posterior(idata) # Interactive plot with zoom/pan
# Specific backend for Jupyter Lab
with az.interactive_backend("lab"):
az.plot_pair(idata) # Interactive pair plot in JupyterLab
# Auto-detect environment
with az.interactive_backend():
# Automatically uses appropriate backend:
# - "notebook" for Jupyter Notebook
# - "lab" for JupyterLab
# - "colab" for Google Colab
# - "kaggle" for Kaggle Notebooks
az.plot_forest(idata)# Optimal configuration for large-scale analysis
def setup_high_performance():
"""Configure ArviZ for maximum performance."""
# Enable JIT compilation
az.Numba.enable_numba()
# Enable parallel processing
az.Dask.enable_dask({
"scheduler": "threads",
"num_workers": 4
})
print("High-performance mode enabled")
# Use for computationally intensive tasks
setup_high_performance()
# Large dataset processing
large_models = {f"model_{i}": large_idata_list[i] for i in range(10)}
comparison = az.compare(large_models) # Fast parallel model comparison# Configuration for memory-constrained environments
def setup_memory_efficient():
"""Configure ArviZ for memory efficiency."""
# Use lazy loading
az.rcParams["data.load"] = "lazy"
# Disable warmup saving to reduce memory
az.rcParams["data.save_warmup"] = False
# Enable Numba for faster processing (less memory overhead)
az.Numba.enable_numba()
# Use process-based parallelism to avoid memory sharing
az.Dask.enable_dask({
"scheduler": "processes",
"num_workers": 2 # Fewer workers to conserve memory
})
setup_memory_efficient()import time
def benchmark_configuration():
"""Compare performance with different configurations."""
# Load test data
idata = az.load_arviz_data("rugby")
# Baseline (no optimization)
az.Numba.disable_numba()
az.Dask.disable_dask()
start = time.time()
summary1 = az.summary(idata)
baseline_time = time.time() - start
# With Numba
az.Numba.enable_numba()
start = time.time()
summary2 = az.summary(idata)
numba_time = time.time() - start
# With Numba + Dask
az.Dask.enable_dask({"scheduler": "threads", "num_workers": 4})
start = time.time()
summary3 = az.summary(idata)
combined_time = time.time() - start
print(f"Baseline: {baseline_time:.2f}s")
print(f"Numba: {numba_time:.2f}s ({baseline_time/numba_time:.1f}x speedup)")
print(f"Numba+Dask: {combined_time:.2f}s ({baseline_time/combined_time:.1f}x speedup)")
benchmark_configuration()def detect_environment():
"""Detect current computational environment and optimize accordingly."""
import sys
# Detect Jupyter environments
if 'ipykernel' in sys.modules:
if 'google.colab' in sys.modules:
print("Google Colab detected")
# Colab-specific optimizations
az.rcParams["plot.backend"] = "matplotlib"
az.Numba.enable_numba()
elif 'ipywidgets' in sys.modules:
print("JupyterLab detected")
# JupyterLab optimizations
az.rcParams["plot.backend"] = "bokeh"
az.Numba.enable_numba()
az.Dask.enable_dask({"scheduler": "threads", "num_workers": 2})
else:
print("Jupyter Notebook detected")
az.rcParams["plot.backend"] = "matplotlib"
az.Numba.enable_numba()
else:
print("Script/CLI environment detected")
# Command-line optimizations
az.Numba.enable_numba()
az.Dask.enable_dask({"scheduler": "processes", "num_workers": 4})
# Auto-configure based on environment
detect_environment()try:
az.Numba.enable_numba()
print("Numba enabled successfully")
except ImportError:
print("Numba not available. Install with: pip install numba")
except Exception as e:
print(f"Numba error: {e}")
print("Falling back to pure Python implementation")
az.Numba.disable_numba()try:
az.Dask.enable_dask({"scheduler": "threads", "num_workers": 4})
print("Dask enabled successfully")
except ImportError:
print("Dask not available. Install with: pip install dask")
except Exception as e:
print(f"Dask error: {e}")
print("Using single-threaded computation")
az.Dask.disable_dask()def handle_memory_constraints():
"""Configure ArviZ for memory-constrained environments."""
import psutil
# Check available memory
available_gb = psutil.virtual_memory().available / (1024**3)
if available_gb < 4:
print("Limited memory detected. Using conservative settings.")
az.rcParams["data.load"] = "lazy"
az.rcParams["data.save_warmup"] = False
az.Dask.enable_dask({"scheduler": "threads", "num_workers": 1})
elif available_gb < 8:
print("Moderate memory available. Using balanced settings.")
az.Dask.enable_dask({"scheduler": "threads", "num_workers": 2})
az.Numba.enable_numba()
else:
print("Sufficient memory available. Using high-performance settings.")
az.Dask.enable_dask({"scheduler": "threads", "num_workers": 4})
az.Numba.enable_numba()
handle_memory_constraints()def flatten_inference_data_to_dict(data: InferenceData, *, var_names: list = None, groups: list = None, dimensions: dict = None, group_info: bool = False, var_name_format: str = None, index_origin: int = None) -> dict:
"""
Flatten InferenceData to dictionary format for external use.
Converts ArviZ InferenceData objects to flat dictionary structures
that can be used with other libraries or data analysis tools.
Args:
data (InferenceData): Input inference data to flatten
var_names (list, optional): Variables to include in output
groups (list, optional): Groups to include (default: all)
dimensions (dict, optional): Dimension specifications
group_info (bool): Whether to include group information (default False)
var_name_format (str, optional): Format string for variable names
index_origin (int, optional): Starting index for array indexing
Returns:
dict: Flattened dictionary with data and metadata
"""Install with Tessl CLI
npx tessl i tessl/pypi-arviz