0
# PyMC Data Handling and Backends
1
2
PyMC provides comprehensive data handling capabilities and flexible backend systems for managing datasets, mini-batching, and storing sampling results. The data system supports dynamic updates, efficient memory usage, and seamless integration with the broader scientific Python ecosystem.
3
4
## Data Containers
5
6
### Data Function
7
8
```python { .api }
9
import pymc as pm
10
11
def Data(name, value, *, dims=None, export_index_as_coords=True, **kwargs):
12
"""
13
Create a data container for observed or input data.
14
15
Parameters:
16
- name (str): Name of the data variable
17
- value (array-like): Initial data values
18
- dims (tuple, optional): Dimension names for the data
19
- export_index_as_coords (bool): Export indices as coordinates
20
- **kwargs: Additional PyTensor shared variable options
21
22
Returns:
23
- data_var: PyTensor shared variable containing the data
24
"""
25
26
# Basic data container
27
import numpy as np
28
29
X_train = np.random.randn(100, 5)
30
y_train = np.random.randn(100)
31
32
with pm.Model() as model:
33
# Create data containers
34
X_data = pm.Data('X_data', X_train)
35
y_data = pm.Data('y_data', y_train)
36
37
# Model using data containers
38
beta = pm.Normal('beta', 0, 1, shape=X_train.shape[1])
39
mu = pm.math.dot(X_data, beta)
40
sigma = pm.HalfNormal('sigma', 1)
41
42
# Likelihood using data container
43
likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma, observed=y_data)
44
45
# Data containers with dimensions
46
coords = {'obs': range(100), 'features': ['f1', 'f2', 'f3', 'f4', 'f5']}
47
48
with pm.Model(coords=coords) as dimensional_model:
49
# Data with explicit dimensions
50
X_data = pm.Data('X_data', X_train, dims=['obs', 'features'])
51
y_data = pm.Data('y_data', y_train, dims='obs')
52
53
# Model components with matching dimensions
54
beta = pm.Normal('beta', 0, 1, dims='features')
55
mu = pm.math.dot(X_data, beta)
56
likelihood = pm.Normal('likelihood', mu=mu, sigma=1, observed=y_data)
57
```
58
59
### Data Updates
60
61
```python { .api }
62
def set_data(new_data, *, model=None, coords=None):
63
"""
64
Update data values in existing data containers.
65
66
Parameters:
67
- new_data (dict): Dictionary mapping data names to new values
68
- model (Model, optional): Target model (default: current context)
69
- coords (dict, optional): New coordinate values
70
"""
71
72
# Update data for out-of-sample prediction
73
X_test = np.random.randn(50, 5)
74
75
# Sample with training data
76
with model:
77
trace = pm.sample(1000)
78
79
# Update data containers for prediction
80
pm.set_data({'X_data': X_test, 'y_data': None})
81
82
# Generate predictions with new data
83
with model:
84
posterior_predictive = pm.sample_posterior_predictive(trace, var_names=['likelihood'])
85
86
# Restore original data
87
pm.set_data({'X_data': X_train, 'y_data': y_train})
88
```
89
90
### Minibatch Data
91
92
```python { .api }
93
def Minibatch(name, value, batch_size=None, *, dims=None, random_seed=None, **kwargs):
94
"""
95
Create a minibatch data container for large datasets.
96
97
Parameters:
98
- name (str): Name of the minibatch variable
99
- value (array-like): Full dataset
100
- batch_size (int, optional): Size of each minibatch
101
- dims (tuple, optional): Dimension names
102
- random_seed (int, optional): Seed for batch sampling
103
- **kwargs: Additional options
104
105
Returns:
106
- minibatch_var: Minibatch data container
107
"""
108
109
# Large dataset minibatching
110
X_large = np.random.randn(10000, 20)
111
y_large = np.random.randn(10000)
112
113
with pm.Model() as minibatch_model:
114
# Create minibatch containers
115
X_mb = pm.Minibatch('X_mb', X_large, batch_size=128)
116
y_mb = pm.Minibatch('y_mb', y_large, batch_size=128)
117
118
# Model parameters
119
beta = pm.Normal('beta', 0, 1, shape=20)
120
sigma = pm.HalfNormal('sigma', 1)
121
122
# Linear predictor with minibatch data
123
mu = pm.math.dot(X_mb, beta)
124
125
# Likelihood (automatically scaled for minibatching)
126
likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma,
127
observed=y_mb, total_size=len(X_large))
128
129
# Variational inference works well with minibatches
130
approx = pm.fit(n=50000, method='advi')
131
trace = approx.sample(2000)
132
```
133
134
### Data Utilities
135
136
```python { .api }
137
def get_data(dataset_name):
138
"""
139
Load example datasets for testing and tutorials.
140
141
Parameters:
142
- dataset_name (str): Name of the dataset to load
143
144
Returns:
145
- data: Loaded dataset as appropriate data structure
146
"""
147
148
# Load example datasets
149
boston_housing = pm.get_data('boston_housing')
150
iris_data = pm.get_data('iris')
151
radon_data = pm.get_data('radon')
152
153
print("Available datasets:")
154
for name, data in [('boston', boston_housing), ('iris', iris_data)]:
155
print(f" {name}: {data.shape if hasattr(data, 'shape') else type(data)}")
156
```
157
158
## Trace Storage Backends
159
160
### NDArray Backend (In-Memory)
161
162
```python { .api }
163
from pymc.backends import NDArray
164
165
class NDArray:
166
"""
167
In-memory trace storage backend.
168
169
Parameters:
170
- vars (list, optional): Variables to trace
171
- model (Model, optional): PyMC model
172
173
Attributes:
174
- samples (dict): Dictionary of samples by variable name
175
- sampler_stats (dict): Sampler statistics
176
"""
177
178
def __init__(self, vars=None, model=None):
179
pass
180
181
def setup(self, draws, chain, sampler_vars=None):
182
"""Set up storage for sampling."""
183
pass
184
185
def record(self, point, sampler_stats=None):
186
"""Record a sample point."""
187
pass
188
189
def close(self):
190
"""Finalize the trace."""
191
pass
192
193
# Basic in-memory sampling
194
with pm.Model() as model:
195
alpha = pm.Normal('alpha', 0, 1)
196
beta = pm.Normal('beta', 0, 1)
197
198
# Use in-memory backend (default)
199
trace = pm.sample(1000, trace=pm.backends.NDArray())
200
201
# Access samples directly
202
print(f"Alpha samples shape: {trace.posterior['alpha'].shape}")
203
print(f"Beta mean: {trace.posterior['beta'].mean():.3f}")
204
```
205
206
### Zarr Backend (Persistent Storage)
207
208
```python { .api }
209
from pymc.backends import ZarrTrace
210
211
class ZarrTrace:
212
"""
213
Zarr-based persistent trace storage.
214
215
Parameters:
216
- dir_path (str): Directory path for Zarr storage
217
- vars (list, optional): Variables to trace
218
- model (Model, optional): PyMC model
219
- append (bool): Append to existing trace
220
"""
221
222
def __init__(self, dir_path=None, vars=None, model=None, append=False):
223
pass
224
225
# Persistent storage with Zarr
226
import tempfile
227
import os
228
229
with tempfile.TemporaryDirectory() as tmp_dir:
230
zarr_path = os.path.join(tmp_dir, 'trace.zarr')
231
232
with pm.Model() as model:
233
# Large model...
234
theta = pm.Normal('theta', 0, 1, shape=100)
235
236
# Store trace persistently
237
trace = pm.sample(10000, trace=pm.backends.ZarrTrace(zarr_path))
238
239
# Trace is automatically saved to disk
240
print(f"Trace stored at: {zarr_path}")
241
242
# Load trace later
243
loaded_trace = pm.backends.ZarrTrace(zarr_path)
244
print(f"Loaded trace shape: {loaded_trace.posterior['theta'].shape}")
245
```
246
247
### Base Trace Classes
248
249
```python { .api }
250
from pymc.backends.base import BaseTrace, MultiTrace
251
252
class BaseTrace:
253
"""
254
Base class for all trace backends.
255
256
Methods:
257
- setup: Initialize trace storage
258
- record: Record sample point
259
- close: Finalize trace
260
- __getitem__: Access samples by variable name
261
"""
262
263
class MultiTrace:
264
"""
265
Container for multiple chain traces.
266
267
Parameters:
268
- straces (list): List of single-chain traces
269
270
Methods:
271
- get_values: Extract values for variables
272
- get_sampler_stats: Extract sampler statistics
273
- add_values: Add new samples
274
"""
275
276
def __init__(self, straces):
277
self.straces = straces
278
279
def get_values(self, varname, burn=0, thin=1, combine=True, chains=None, squeeze=True):
280
"""
281
Get values for a variable across chains.
282
283
Parameters:
284
- varname (str): Variable name
285
- burn (int): Burn-in samples to exclude
286
- thin (int): Thinning factor
287
- combine (bool): Combine chains into single array
288
- chains (list, optional): Specific chains to extract
289
- squeeze (bool): Squeeze singleton dimensions
290
291
Returns:
292
- values: Variable samples
293
"""
294
pass
295
296
# Working with MultiTrace
297
trace = pm.sample(2000, chains=4) # Returns MultiTrace
298
299
# Extract specific variable values
300
alpha_samples = trace.get_values('alpha', burn=500, thin=2)
301
beta_samples = trace.get_values('beta', chains=[0, 1], combine=False)
302
303
# Get sampler statistics
304
diverging = trace.get_sampler_stats('diverging')
305
energy = trace.get_sampler_stats('energy')
306
307
print(f"Total divergences: {diverging.sum()}")
308
print(f"Mean energy: {energy.mean():.3f}")
309
```
310
311
## ArviZ Integration
312
313
### InferenceData Conversion
314
315
```python { .api }
316
def to_inference_data(trace=None, *, prior=None, posterior_predictive=None,
317
predictions=None, log_likelihood=None, coords=None,
318
dims=None, model=None, save_warmup=False, **kwargs):
319
"""
320
Convert PyMC trace to ArviZ InferenceData object.
321
322
Parameters:
323
- trace: PyMC trace object
324
- prior: Prior samples
325
- posterior_predictive: Posterior predictive samples
326
- predictions: Out-of-sample predictions
327
- log_likelihood: Log-likelihood values
328
- coords (dict): Coordinate specifications
329
- dims (dict): Dimension specifications
330
- model: PyMC model
331
- save_warmup (bool): Include warmup samples
332
333
Returns:
334
- idata: ArviZ InferenceData object
335
"""
336
337
# Convert to ArviZ InferenceData
338
with pm.Model() as model:
339
alpha = pm.Normal('alpha', 0, 1)
340
beta = pm.Normal('beta', 0, 1, shape=3)
341
sigma = pm.HalfNormal('sigma', 1)
342
343
mu = alpha + pm.math.dot(X_data, beta)
344
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y_data)
345
346
# Sample
347
trace = pm.sample(1000)
348
349
# Prior samples
350
prior_samples = pm.sample_prior_predictive(500)
351
352
# Posterior predictive samples
353
post_pred = pm.sample_posterior_predictive(trace)
354
355
# Convert to comprehensive InferenceData
356
idata = pm.to_inference_data(
357
trace,
358
prior=prior_samples,
359
posterior_predictive=post_pred,
360
coords={'obs': range(len(y_data)), 'features': ['f1', 'f2', 'f3']},
361
dims={'beta': ['features'], 'y_obs': ['obs']},
362
model=model
363
)
364
365
print("InferenceData groups:")
366
for group_name in idata._groups:
367
print(f" {group_name}: {list(idata[group_name].data_vars.keys())}")
368
```
369
370
### Predictions Integration
371
372
```python { .api }
373
def predictions_to_inference_data(predictions, *, posterior_trace=None,
374
model=None, coords=None, dims=None, **kwargs):
375
"""
376
Convert predictions to InferenceData format.
377
378
Parameters:
379
- predictions: Prediction samples
380
- posterior_trace: Original posterior trace
381
- model: PyMC model
382
- coords (dict): Coordinate specifications
383
- dims (dict): Dimension specifications
384
385
Returns:
386
- pred_idata: InferenceData with predictions group
387
"""
388
389
# Out-of-sample predictions with proper metadata
390
X_test = np.random.randn(25, 3)
391
pm.set_data({'X_data': X_test, 'y_data': None})
392
393
with model:
394
# Generate predictions
395
predictions = pm.sample_posterior_predictive(
396
trace,
397
var_names=['y_obs'],
398
predictions=True
399
)
400
401
# Convert predictions to InferenceData
402
pred_idata = pm.predictions_to_inference_data(
403
predictions,
404
posterior_trace=trace,
405
coords={'test_obs': range(25), 'features': ['f1', 'f2', 'f3']},
406
dims={'y_obs': ['test_obs']},
407
model=model
408
)
409
```
410
411
## Data Processing Utilities
412
413
### Dictionary-Array Mapping
414
415
```python { .api }
416
from pymc.blocking import DictToArrayBijection
417
418
class DictToArrayBijection:
419
"""
420
Bijection between dictionary and array representations of parameters.
421
422
Parameters:
423
- ordering: Variable ordering specification
424
- vmap: Variable mapping
425
426
Methods:
427
- map: Convert dictionary to array
428
- rmap: Convert array to dictionary
429
"""
430
431
def __init__(self, ordering, vmap=None):
432
pass
433
434
def map(self, point_dict):
435
"""Convert parameter dictionary to array."""
436
pass
437
438
def rmap(self, point_array):
439
"""Convert parameter array to dictionary."""
440
pass
441
442
# Create bijection for efficient array operations
443
with pm.Model() as model:
444
alpha = pm.Normal('alpha', 0, 1)
445
beta = pm.Normal('beta', 0, 1, shape=3)
446
447
# Create bijection
448
bijection = pm.DictToArrayBijection(model.free_RVs)
449
450
# Convert point formats
451
point_dict = {'alpha': 0.5, 'beta': np.array([1.0, -0.5, 0.2])}
452
point_array = bijection.map(point_dict)
453
recovered_dict = bijection.rmap(point_array)
454
455
print(f"Array representation: {point_array}")
456
print(f"Recovered dict: {recovered_dict}")
457
```
458
459
## Advanced Data Patterns
460
461
### Dynamic Data Loading
462
463
```python { .api }
464
class DataLoader:
465
"""Custom data loader for streaming/dynamic data."""
466
467
def __init__(self, data_source, batch_size=128):
468
self.data_source = data_source
469
self.batch_size = batch_size
470
self.current_batch = 0
471
472
def get_batch(self):
473
"""Get next batch of data."""
474
start_idx = self.current_batch * self.batch_size
475
end_idx = start_idx + self.batch_size
476
477
batch_data = self.data_source[start_idx:end_idx]
478
self.current_batch += 1
479
480
return batch_data
481
482
def reset(self):
483
"""Reset batch counter."""
484
self.current_batch = 0
485
486
# Use with PyMC for streaming inference
487
data_loader = DataLoader(large_dataset, batch_size=256)
488
489
with pm.Model() as streaming_model:
490
# Initialize with first batch
491
initial_batch = data_loader.get_batch()
492
X_mb = pm.Minibatch('X_mb', initial_batch['X'], batch_size=256)
493
y_mb = pm.Minibatch('y_mb', initial_batch['y'], batch_size=256)
494
495
# Model definition...
496
497
# Stream through data for variational inference
498
for epoch in range(10):
499
data_loader.reset()
500
501
# Process all batches in epoch
502
while True:
503
try:
504
batch = data_loader.get_batch()
505
pm.set_data({'X_mb': batch['X'], 'y_mb': batch['y']})
506
507
# Update variational approximation
508
approx = pm.fit(n=1000, method='advi')
509
510
except IndexError: # End of data
511
break
512
```
513
514
### Custom Backend Implementation
515
516
```python { .api }
517
class CustomBackend(pm.backends.base.BaseTrace):
518
"""Custom trace backend for specialized storage needs."""
519
520
def __init__(self, custom_storage, vars=None, model=None):
521
self.custom_storage = custom_storage
522
super().__init__(vars=vars, model=model)
523
524
def setup(self, draws, chain, sampler_vars=None):
525
"""Initialize custom storage."""
526
self.chain = chain
527
self.draws = draws
528
self.draw_idx = 0
529
530
# Initialize storage structures
531
self.custom_storage.init_chain(chain, draws, self.varnames)
532
533
def record(self, point, sampler_stats=None):
534
"""Record sample to custom storage."""
535
self.custom_storage.store_sample(
536
self.chain,
537
self.draw_idx,
538
point,
539
sampler_stats
540
)
541
self.draw_idx += 1
542
543
def close(self):
544
"""Finalize custom storage."""
545
self.custom_storage.finalize_chain(self.chain)
546
547
def __getitem__(self, key):
548
"""Access stored samples."""
549
return self.custom_storage.get_samples(self.chain, key)
550
551
# Use custom backend
552
# custom_storage = YourCustomStorage() # Implement your storage system
553
# trace = pm.sample(1000, trace=CustomBackend(custom_storage))
554
```
555
556
### Memory Management
557
558
```python { .api }
559
# Memory-efficient sampling for large models
560
with pm.Model() as large_model:
561
# Very large parameter space
562
theta = pm.Normal('theta', 0, 1, shape=10000)
563
564
# Use memory-mapped storage
565
with tempfile.TemporaryDirectory() as tmp_dir:
566
zarr_path = os.path.join(tmp_dir, 'large_trace.zarr')
567
568
# Sample with disk storage
569
trace = pm.sample(
570
draws=5000,
571
trace=pm.backends.ZarrTrace(zarr_path),
572
compute_convergence_checks=False, # Save memory
573
progressbar=True
574
)
575
576
# Process trace in chunks to avoid memory overflow
577
chunk_size = 1000
578
for i in range(0, 5000, chunk_size):
579
chunk = trace.posterior['theta'][..., i:i+chunk_size, :]
580
# Process chunk...
581
processed_chunk = chunk.mean(dim='draw')
582
print(f"Processed chunk {i//chunk_size + 1}")
583
```
584
585
PyMC's data handling system provides flexible and efficient tools for managing datasets of any size while maintaining seamless integration with the broader PyMC ecosystem and external data sources.