A Python framework for high-performance simulation and graphics programming that JIT compiles Python functions to efficient GPU/CPU kernel code.
Warp provides gradient-based optimizers for machine learning workflows. These optimizers work seamlessly with Warp's differentiable kernels and integrate with automatic differentiation systems for training neural networks and optimizing physical simulations.
Adaptive learning rate optimizer with momentum and bias correction.
class Adam:
"""Adam optimizer for gradient-based optimization."""
def __init__(self,
params: list = None,
lr: float = 0.001,
betas: tuple = (0.9, 0.999),
eps: float = 1e-8):
"""
Initialize Adam optimizer.
Args:
params: List of arrays to optimize
lr: Learning rate
betas: Coefficients for running averages (beta1, beta2)
eps: Small constant for numerical stability
"""
def step(self) -> None:
"""
Perform single optimization step.
Updates parameters using accumulated gradients.
"""
def zero_grad(self) -> None:
"""Clear gradients of all parameters."""
@property
def state_dict(self) -> dict:
"""Get optimizer state for checkpointing."""
def load_state_dict(self, state_dict: dict) -> None:
"""Load optimizer state from checkpoint."""Stochastic gradient descent with optional momentum and weight decay.
class SGD:
"""Stochastic Gradient Descent optimizer."""
def __init__(self,
params: list = None,
lr: float = 0.001,
momentum: float = 0.0,
dampening: float = 0.0,
weight_decay: float = 0.0,
nesterov: bool = False):
"""
Initialize SGD optimizer.
Args:
params: List of arrays to optimize
lr: Learning rate
momentum: Momentum factor
dampening: Dampening for momentum
weight_decay: L2 regularization weight
nesterov: Enable Nesterov momentum
"""
def step(self) -> None:
"""Perform single optimization step."""
def zero_grad(self) -> None:
"""Clear gradients of all parameters."""
@property
def state_dict(self) -> dict:
"""Get optimizer state for checkpointing."""
def load_state_dict(self, state_dict: dict) -> None:
"""Load optimizer state from checkpoint."""import warp as wp
import numpy as np
# Define neural network parameters
W1 = wp.array(np.random.randn(784, 128).astype(np.float32), device='cuda', requires_grad=True)
b1 = wp.zeros(128, dtype=wp.float32, device='cuda', requires_grad=True)
W2 = wp.array(np.random.randn(128, 10).astype(np.float32), device='cuda', requires_grad=True)
b2 = wp.zeros(10, dtype=wp.float32, device='cuda', requires_grad=True)
# Create optimizer
optimizer = wp.optim.Adam([W1, b1, W2, b2], lr=0.001)
# Training loop
for epoch in range(100):
for batch_x, batch_y in data_loader:
# Convert to Warp arrays
x = wp.from_numpy(batch_x, device='cuda')
y_true = wp.from_numpy(batch_y, device='cuda')
# Forward pass using Warp kernels
h1 = forward_layer(x, W1, b1)
y_pred = forward_layer(h1, W2, b2)
# Compute loss
loss = compute_loss(y_pred, y_true)
# Backward pass (automatic differentiation)
wp.backward(loss)
# Update parameters
optimizer.step()
optimizer.zero_grad()
print(f"Epoch {epoch}, Loss: {loss.numpy()}")import warp as wp
# Physical parameters to optimize
spring_stiffness = wp.array([1000.0], requires_grad=True, device='cuda')
damping_coeff = wp.array([0.1], requires_grad=True, device='cuda')
# Create optimizer
optimizer = wp.optim.Adam([spring_stiffness, damping_coeff], lr=0.01)
# Define physics simulation kernel
@wp.kernel
def simulate_springs(positions: wp.array(dtype=wp.vec3),
velocities: wp.array(dtype=wp.vec3),
forces: wp.array(dtype=wp.vec3),
stiffness: wp.array(dtype=float),
damping: wp.array(dtype=float),
dt: float):
i = wp.tid()
pos = positions[i]
vel = velocities[i]
# Spring force (to origin)
spring_force = -stiffness[0] * pos
# Damping force
damping_force = -damping[0] * vel
forces[i] = spring_force + damping_force
# Target trajectory
target_positions = wp.array(target_data, device='cuda')
# Optimization loop
for iteration in range(1000):
# Reset simulation state
positions = wp.copy(initial_positions)
velocities = wp.zeros_like(positions)
# Run simulation
for step in range(simulation_steps):
forces = wp.zeros_like(positions)
wp.launch(simulate_springs,
dim=num_particles,
inputs=[positions, velocities, forces,
spring_stiffness, damping_coeff, dt])
# Update positions and velocities
update_physics(positions, velocities, forces, dt)
# Compute loss against target
loss = wp.mean((positions - target_positions) ** 2)
# Backward pass
wp.backward(loss)
# Update parameters
optimizer.step()
optimizer.zero_grad()
if iteration % 100 == 0:
print(f"Iteration {iteration}, Loss: {loss.numpy()}")import warp as wp
# Different parameter groups with different learning rates
fast_params = [weight_matrix] # High learning rate
slow_params = [bias_vector] # Low learning rate
# Create separate optimizers
fast_optimizer = wp.optim.Adam(fast_params, lr=0.01)
slow_optimizer = wp.optim.SGD(slow_params, lr=0.001, momentum=0.9)
# Training step
def training_step(loss):
# Compute gradients
wp.backward(loss)
# Update with different schedules
fast_optimizer.step()
slow_optimizer.step()
# Clear gradients
fast_optimizer.zero_grad()
slow_optimizer.zero_grad()
# Learning rate scheduling
def adjust_learning_rate(optimizer, epoch):
"""Decay learning rate by factor of 0.1 every 30 epochs."""
lr = optimizer.lr * (0.1 ** (epoch // 30))
for param_group in optimizer.param_groups:
param_group['lr'] = lrimport warp as wp
import pickle
# Create optimizer
optimizer = wp.optim.Adam(model_params, lr=0.001)
# Training with checkpointing
for epoch in range(num_epochs):
# Training loop
for batch in data_loader:
loss = compute_loss(batch)
wp.backward(loss)
optimizer.step()
optimizer.zero_grad()
# Save checkpoint every 10 epochs
if epoch % 10 == 0:
checkpoint = {
'epoch': epoch,
'model_state': [param.numpy() for param in model_params],
'optimizer_state': optimizer.state_dict,
'loss': loss.numpy()
}
with open(f'checkpoint_epoch_{epoch}.pkl', 'wb') as f:
pickle.dump(checkpoint, f)
# Load checkpoint
def load_checkpoint(checkpoint_path, model_params, optimizer):
with open(checkpoint_path, 'rb') as f:
checkpoint = pickle.load(f)
# Restore model parameters
for param, saved_param in zip(model_params, checkpoint['model_state']):
param.assign(wp.from_numpy(saved_param, device=param.device))
# Restore optimizer state
optimizer.load_state_dict(checkpoint['optimizer_state'])
return checkpoint['epoch'], checkpoint['loss']import warp as wp
class OptimizerWithClipping:
def __init__(self, optimizer, max_grad_norm=1.0):
self.optimizer = optimizer
self.max_grad_norm = max_grad_norm
def clip_gradients(self, parameters):
"""Clip gradients to prevent exploding gradients."""
# Compute total gradient norm
total_norm = 0.0
for param in parameters:
if param.grad is not None:
param_norm = wp.norm(param.grad)
total_norm += param_norm ** 2
total_norm = wp.sqrt(total_norm)
# Scale gradients if norm exceeds threshold
if total_norm > self.max_grad_norm:
clip_coef = self.max_grad_norm / (total_norm + 1e-6)
for param in parameters:
if param.grad is not None:
param.grad *= clip_coef
def step(self, parameters):
self.clip_gradients(parameters)
self.optimizer.step()
def zero_grad(self):
self.optimizer.zero_grad()
# Usage
base_optimizer = wp.optim.Adam(model_params, lr=0.001)
optimizer = OptimizerWithClipping(base_optimizer, max_grad_norm=1.0)
# Training with gradient clipping
for batch in data_loader:
loss = compute_loss(batch)
wp.backward(loss)
optimizer.step(model_params)
optimizer.zero_grad()import warp as wp
# Enable tape for automatic differentiation
tape = wp.Tape()
# Forward pass with tape recording
with tape:
# Warp kernel computation
@wp.kernel
def neural_network_kernel(x: wp.array(dtype=float),
w: wp.array(dtype=float),
y: wp.array(dtype=float)):
i = wp.tid()
# Simple linear transformation
y[i] = w[0] * x[i] + w[1]
# Launch kernel
wp.launch(neural_network_kernel, dim=data_size,
inputs=[input_data, weights, output_data])
# Compute loss
loss = wp.mean((output_data - target_data) ** 2)
# Backward pass
tape.backward(loss)
# Extract gradients
weight_gradients = tape.gradients[weights]
# Manual optimizer step
learning_rate = 0.01
weights.assign(weights - learning_rate * weight_gradients)
# Reset tape for next iteration
tape.zero()# Optimizer base interface
class Optimizer:
"""Base class for optimizers."""
def __init__(self, params: list, lr: float):
"""Initialize optimizer with parameters and learning rate."""
def step(self) -> None:
"""Perform optimization step."""
def zero_grad(self) -> None:
"""Clear parameter gradients."""
@property
def param_groups(self) -> list:
"""List of parameter groups with optimization settings."""
@property
def state_dict(self) -> dict:
"""Optimizer state for checkpointing."""
# Parameter group structure
class ParameterGroup:
"""Group of parameters with shared optimization settings."""
params: list # List of parameter arrays
lr: float # Learning rate
weight_decay: float # L2 regularization
# Optimizer state for individual parameters
class ParameterState:
"""Per-parameter optimization state."""
step: int # Number of optimization steps
exp_avg: array # Exponential moving average of gradients (Adam)
exp_avg_sq: array # Exponential moving average of squared gradients (Adam)
momentum_buffer: array # Momentum buffer (SGD)Install with Tessl CLI
npx tessl i tessl/pypi-warp-lang