Implementation of Gaussian processes in Python with support for AutoGrad, TensorFlow, PyTorch, and JAX
Efficient computation with lazy evaluation for vectors and matrices that build values on-demand using custom rules and caching. This enables scalable Gaussian process operations by avoiding explicit construction of large matrices until needed.
Foundation class for lazy tensors that index by object identity and provide on-demand value construction through custom building rules.
class LazyTensor:
def __init__(self, rank):
"""
Initialize lazy tensor with specified rank.
Parameters:
- rank: Tensor rank (1 for vectors, 2 for matrices)
"""
def __setitem__(self, key, value):
"""Set value at specified key."""
def __getitem__(self, key):
"""Get value at specified key, building if necessary."""
def _build(self, i):
"""
Abstract method for building values on-demand.
Parameters:
- i: Resolved index to build value for
Returns:
- Built value for the index
"""One-dimensional lazy tensors that build values using custom rules based on index sets and builder functions.
class LazyVector(LazyTensor):
def __init__(self):
"""Initialize lazy vector."""
def add_rule(self, indices, builder):
"""
Add building rule for specified indices.
Note: For performance, indices must already be resolved!
Parameters:
- indices: Set of indices this rule applies to
- builder: Function that takes index and returns corresponding element
"""
def _build(self, i):
"""
Build value for index using registered rules.
Parameters:
- i: Index tuple to build value for
Returns:
- Built value
Raises:
- RuntimeError: If no rule can build the requested index
"""Two-dimensional lazy tensors supporting universal rules and dimension-specific rules for efficient matrix construction.
class LazyMatrix(LazyTensor):
def __init__(self):
"""Initialize lazy matrix."""
def add_rule(self, indices, builder):
"""
Add universal building rule for specified indices.
Note: For performance, indices must already be resolved!
Parameters:
- indices: Set of indices this rule applies to
- builder: Function taking (left_index, right_index) returning element
"""
def add_left_rule(self, i_left, indices, builder):
"""
Add building rule for fixed left index.
Note: For performance, indices must already be resolved!
Parameters:
- i_left: Fixed left index for this rule
- indices: Set of right indices this rule applies to
- builder: Function taking right_index and returning element
"""
def add_right_rule(self, i_right, indices, builder):
"""
Add building rule for fixed right index.
Note: For performance, indices must already be resolved!
Parameters:
- i_right: Fixed right index for this rule
- indices: Set of left indices this rule applies to
- builder: Function taking left_index and returning element
"""
def _build(self, i):
"""
Build matrix element using registered rules.
Parameters:
- i: (left_index, right_index) tuple
Returns:
- Built matrix element
Raises:
- RuntimeError: If no rule can build the requested element
"""Internal functions for converting various key types to resolved indices used by the lazy tensor system.
def _resolve_index(key):
"""Resolve key to index using object identity."""
def _resolve_index(i):
"""Resolve integer index directly."""
def _resolve_index(x):
"""Resolve tuple or sequence of keys recursively."""import stheno
from stheno.lazy import LazyVector
# Create lazy vector
lazy_vec = LazyVector()
# Create some objects to use as indices
gp1 = stheno.GP(kernel=stheno.EQ(), name="gp1")
gp2 = stheno.GP(kernel=stheno.Matern52(), name="gp2")
gp3 = stheno.GP(kernel=stheno.Linear(), name="gp3")
# Add rules for building values
indices_123 = {id(gp1), id(gp2), id(gp3)}
def builder_simple(i):
if i == id(gp1):
return "Value for GP1"
elif i == id(gp2):
return "Value for GP2"
elif i == id(gp3):
return "Value for GP3"
else:
return f"Default value for {i}"
lazy_vec.add_rule(indices_123, builder_simple)
# Access values - built on demand
print(f"GP1 value: {lazy_vec[gp1]}")
print(f"GP2 value: {lazy_vec[gp2]}")
print(f"GP3 value: {lazy_vec[gp3]}")
# Values are cached after first access
print(f"GP1 value (cached): {lazy_vec[gp1]}")from stheno.lazy import LazyMatrix
import numpy as np
# Create lazy matrix
lazy_mat = LazyMatrix()
# Create GP objects as indices
gps = [stheno.GP(kernel=stheno.EQ(), name=f"gp{i}") for i in range(4)]
gp_ids = {id(gp) for gp in gps}
# Universal rule: diagonal elements
def diagonal_builder(i_left, i_right):
if i_left == i_right:
return 1.0 # Diagonal element
return None # Let other rules handle off-diagonal
lazy_mat.add_rule(gp_ids, diagonal_builder)
# Left rule: first GP has special relationship with others
def first_gp_left_rule(i_right):
# When first GP is on the left, return correlation
return 0.5
lazy_mat.add_left_rule(id(gps[0]), gp_ids, first_gp_left_rule)
# Right rule: second GP has special relationship when on right
def second_gp_right_rule(i_left):
# When second GP is on the right, return different correlation
return 0.3
lazy_mat.add_right_rule(id(gps[1]), gp_ids, second_gp_right_rule)
# Access matrix elements
print(f"Diagonal (0,0): {lazy_mat[gps[0], gps[0]]}") # Uses universal rule
print(f"Off-diagonal (0,1): {lazy_mat[gps[0], gps[1]]}") # Uses left rule
print(f"Off-diagonal (2,1): {lazy_mat[gps[2], gps[1]]}") # Uses right rule
print(f"Off-diagonal (2,3): {lazy_mat[gps[2], gps[3]]}") # Uses universal rule (None->0)# Lazy computation is used internally by Stheno measures
measure = stheno.Measure()
# Create GPs in the measure
with measure:
gp_a = stheno.GP(kernel=stheno.EQ(), name="gp_a")
gp_b = stheno.GP(kernel=stheno.Matern52(), name="gp_b")
gp_c = gp_a + gp_b
measure.name(gp_c, "gp_c")
# The measure uses lazy vectors and matrices internally
print(f"Measure processes: {len(measure.ps)}")
print(f"Lazy means type: {type(measure.means)}")
print(f"Lazy kernels type: {type(measure.kernels)}")
# Access mean functions (computed lazily)
mean_a = measure.means[gp_a]
mean_b = measure.means[gp_b]
mean_c = measure.means[gp_c] # Built from gp_a + gp_b rule
print(f"Mean A: {mean_a}")
print(f"Mean C: {mean_c}")
# Access kernel matrix elements (computed lazily)
kernel_aa = measure.kernels[gp_a, gp_a]
kernel_ab = measure.kernels[gp_a, gp_b]
kernel_cc = measure.kernels[gp_c, gp_c] # Built from sum rule
print(f"Kernel (A,A): {kernel_aa}")
print(f"Kernel (C,C) type: {type(kernel_cc)}")# Create custom lazy vector for expensive function evaluations
expensive_cache = LazyVector()
def expensive_function(x):
"""Simulate expensive computation."""
print(f"Computing expensive function for {x}")
import time
time.sleep(0.1) # Simulate computation time
return x ** 2 + np.sin(x)
# Set up objects and their indices
inputs = [0.5, 1.0, 1.5, 2.0, 2.5]
input_ids = {id(x): x for x in inputs} # Map id back to value
all_ids = set(input_ids.keys())
def expensive_builder(obj_id):
x = input_ids[obj_id]
return expensive_function(x)
expensive_cache.add_rule(all_ids, expensive_builder)
# First access computes and caches
print("First access:")
result1 = expensive_cache[inputs[0]]
result2 = expensive_cache[inputs[1]]
print("Second access (cached):")
result1_cached = expensive_cache[inputs[0]] # No computation
result2_cached = expensive_cache[inputs[1]] # No computation
print(f"Results match: {result1 == result1_cached and result2 == result2_cached}")# Create lazy matrix for kernel evaluations
kernel_matrix = LazyMatrix()
# Define inputs and kernel function
inputs = [np.array([i]) for i in range(5)]
input_ids = {id(x): x for x in inputs}
all_ids = set(input_ids.keys())
def kernel_function(x1, x2):
"""RBF kernel function."""
return np.exp(-0.5 * np.sum((x1 - x2)**2))
def kernel_builder(id1, id2):
x1 = input_ids[id1]
x2 = input_ids[id2]
print(f"Computing kernel between {x1.flatten()} and {x2.flatten()}")
return kernel_function(x1, x2)
kernel_matrix.add_rule(all_ids, kernel_builder)
# Build kernel matrix elements on demand
print("Building kernel matrix:")
K_00 = kernel_matrix[inputs[0], inputs[0]] # Diagonal
K_01 = kernel_matrix[inputs[0], inputs[1]] # Off-diagonal
K_10 = kernel_matrix[inputs[1], inputs[0]] # Symmetric element
print(f"K(0,0) = {K_00:.3f}")
print(f"K(0,1) = {K_01:.3f}")
print(f"K(1,0) = {K_10:.3f}")
print(f"Symmetry check: {np.allclose(K_01, K_10)}")
# Second access uses cached values
print("\nAccessing cached values:")
K_00_cached = kernel_matrix[inputs[0], inputs[0]] # No computation
print(f"Cached K(0,0) = {K_00_cached:.3f}")# Demonstrate rule priority in lazy matrices
priority_matrix = LazyMatrix()
# Create test objects
objs = [f"obj_{i}" for i in range(3)]
obj_ids = {id(obj): obj for obj in objs}
all_ids = set(obj_ids.keys())
# Universal rule (lowest priority)
def universal_rule(id1, id2):
return f"Universal: {obj_ids[id1]} × {obj_ids[id2]}"
priority_matrix.add_rule(all_ids, universal_rule)
# Left rule for first object (higher priority)
def left_rule_obj0(id2):
return f"Left rule: obj_0 × {obj_ids[id2]}"
priority_matrix.add_left_rule(id(objs[0]), all_ids, left_rule_obj0)
# Right rule for second object (higher priority)
def right_rule_obj1(id1):
return f"Right rule: {obj_ids[id1]} × obj_1"
priority_matrix.add_right_rule(id(objs[1]), all_ids, right_rule_obj1)
# Test rule priorities
print(f"(0,0): {priority_matrix[objs[0], objs[0]]}") # Left rule wins
print(f"(0,1): {priority_matrix[objs[0], objs[1]]}") # Left rule wins over right
print(f"(2,1): {priority_matrix[objs[2], objs[1]]}") # Right rule wins
print(f"(2,2): {priority_matrix[objs[2], objs[2]]}") # Universal rule used# Demonstrate error handling for missing rules
incomplete_vector = LazyVector()
# Add rule for only some indices
some_objs = ["a", "b"]
some_ids = {id(obj) for obj in some_objs}
def partial_builder(obj_id):
return f"Built for {id}"
incomplete_vector.add_rule(some_ids, partial_builder)
# This works
result = incomplete_vector["a"]
print(f"Success: {result}")
# This raises RuntimeError
try:
result = incomplete_vector["c"] # Not in rule set
except RuntimeError as e:
print(f"Expected error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-stheno