CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-warp-lang

A Python framework for high-performance simulation and graphics programming that JIT compiles Python functions to efficient GPU/CPU kernel code.

Overview
Eval results
Files

optimization.mddocs/

Optimization

Warp provides gradient-based optimizers for machine learning workflows. These optimizers work seamlessly with Warp's differentiable kernels and integrate with automatic differentiation systems for training neural networks and optimizing physical simulations.

Capabilities

Adam Optimizer

Adaptive learning rate optimizer with momentum and bias correction.

class Adam:
    """Adam optimizer for gradient-based optimization."""
    
    def __init__(self, 
                 params: list = None,
                 lr: float = 0.001,
                 betas: tuple = (0.9, 0.999),
                 eps: float = 1e-8):
        """
        Initialize Adam optimizer.
        
        Args:
            params: List of arrays to optimize
            lr: Learning rate
            betas: Coefficients for running averages (beta1, beta2)
            eps: Small constant for numerical stability
        """
    
    def step(self) -> None:
        """
        Perform single optimization step.
        Updates parameters using accumulated gradients.
        """
    
    def zero_grad(self) -> None:
        """Clear gradients of all parameters."""
    
    @property
    def state_dict(self) -> dict:
        """Get optimizer state for checkpointing."""
    
    def load_state_dict(self, state_dict: dict) -> None:
        """Load optimizer state from checkpoint."""

SGD Optimizer

Stochastic gradient descent with optional momentum and weight decay.

class SGD:
    """Stochastic Gradient Descent optimizer."""
    
    def __init__(self,
                 params: list = None,
                 lr: float = 0.001,
                 momentum: float = 0.0,
                 dampening: float = 0.0,
                 weight_decay: float = 0.0,
                 nesterov: bool = False):
        """
        Initialize SGD optimizer.
        
        Args:
            params: List of arrays to optimize
            lr: Learning rate
            momentum: Momentum factor
            dampening: Dampening for momentum
            weight_decay: L2 regularization weight
            nesterov: Enable Nesterov momentum
        """
    
    def step(self) -> None:
        """Perform single optimization step."""
    
    def zero_grad(self) -> None:
        """Clear gradients of all parameters."""
    
    @property
    def state_dict(self) -> dict:
        """Get optimizer state for checkpointing."""
    
    def load_state_dict(self, state_dict: dict) -> None:
        """Load optimizer state from checkpoint."""

Usage Examples

Basic Neural Network Training

import warp as wp
import numpy as np

# Define neural network parameters
W1 = wp.array(np.random.randn(784, 128).astype(np.float32), device='cuda', requires_grad=True)
b1 = wp.zeros(128, dtype=wp.float32, device='cuda', requires_grad=True)
W2 = wp.array(np.random.randn(128, 10).astype(np.float32), device='cuda', requires_grad=True)
b2 = wp.zeros(10, dtype=wp.float32, device='cuda', requires_grad=True)

# Create optimizer
optimizer = wp.optim.Adam([W1, b1, W2, b2], lr=0.001)

# Training loop
for epoch in range(100):
    for batch_x, batch_y in data_loader:
        # Convert to Warp arrays
        x = wp.from_numpy(batch_x, device='cuda')
        y_true = wp.from_numpy(batch_y, device='cuda')
        
        # Forward pass using Warp kernels
        h1 = forward_layer(x, W1, b1)
        y_pred = forward_layer(h1, W2, b2)
        
        # Compute loss
        loss = compute_loss(y_pred, y_true)
        
        # Backward pass (automatic differentiation)
        wp.backward(loss)
        
        # Update parameters
        optimizer.step()
        optimizer.zero_grad()
        
        print(f"Epoch {epoch}, Loss: {loss.numpy()}")

Physics Simulation Optimization

import warp as wp

# Physical parameters to optimize
spring_stiffness = wp.array([1000.0], requires_grad=True, device='cuda')
damping_coeff = wp.array([0.1], requires_grad=True, device='cuda') 

# Create optimizer
optimizer = wp.optim.Adam([spring_stiffness, damping_coeff], lr=0.01)

# Define physics simulation kernel
@wp.kernel
def simulate_springs(positions: wp.array(dtype=wp.vec3),
                    velocities: wp.array(dtype=wp.vec3),
                    forces: wp.array(dtype=wp.vec3),
                    stiffness: wp.array(dtype=float),
                    damping: wp.array(dtype=float),
                    dt: float):
    i = wp.tid()
    
    pos = positions[i]
    vel = velocities[i]
    
    # Spring force (to origin)
    spring_force = -stiffness[0] * pos
    
    # Damping force
    damping_force = -damping[0] * vel
    
    forces[i] = spring_force + damping_force

# Target trajectory
target_positions = wp.array(target_data, device='cuda')

# Optimization loop
for iteration in range(1000):
    # Reset simulation state
    positions = wp.copy(initial_positions)
    velocities = wp.zeros_like(positions)
    
    # Run simulation
    for step in range(simulation_steps):
        forces = wp.zeros_like(positions)
        
        wp.launch(simulate_springs, 
                 dim=num_particles,
                 inputs=[positions, velocities, forces, 
                        spring_stiffness, damping_coeff, dt])
        
        # Update positions and velocities
        update_physics(positions, velocities, forces, dt)
    
    # Compute loss against target
    loss = wp.mean((positions - target_positions) ** 2)
    
    # Backward pass
    wp.backward(loss)
    
    # Update parameters
    optimizer.step()
    optimizer.zero_grad()
    
    if iteration % 100 == 0:
        print(f"Iteration {iteration}, Loss: {loss.numpy()}")

Custom Optimization with Multiple Optimizers

import warp as wp

# Different parameter groups with different learning rates
fast_params = [weight_matrix]  # High learning rate
slow_params = [bias_vector]    # Low learning rate

# Create separate optimizers
fast_optimizer = wp.optim.Adam(fast_params, lr=0.01)
slow_optimizer = wp.optim.SGD(slow_params, lr=0.001, momentum=0.9)

# Training step
def training_step(loss):
    # Compute gradients
    wp.backward(loss)
    
    # Update with different schedules
    fast_optimizer.step()
    slow_optimizer.step()
    
    # Clear gradients
    fast_optimizer.zero_grad()
    slow_optimizer.zero_grad()

# Learning rate scheduling
def adjust_learning_rate(optimizer, epoch):
    """Decay learning rate by factor of 0.1 every 30 epochs."""
    lr = optimizer.lr * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

Checkpointing and State Management

import warp as wp
import pickle

# Create optimizer
optimizer = wp.optim.Adam(model_params, lr=0.001)

# Training with checkpointing
for epoch in range(num_epochs):
    # Training loop
    for batch in data_loader:
        loss = compute_loss(batch)
        wp.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    # Save checkpoint every 10 epochs
    if epoch % 10 == 0:
        checkpoint = {
            'epoch': epoch,
            'model_state': [param.numpy() for param in model_params],
            'optimizer_state': optimizer.state_dict,
            'loss': loss.numpy()
        }
        
        with open(f'checkpoint_epoch_{epoch}.pkl', 'wb') as f:
            pickle.dump(checkpoint, f)

# Load checkpoint
def load_checkpoint(checkpoint_path, model_params, optimizer):
    with open(checkpoint_path, 'rb') as f:
        checkpoint = pickle.load(f)
    
    # Restore model parameters
    for param, saved_param in zip(model_params, checkpoint['model_state']):
        param.assign(wp.from_numpy(saved_param, device=param.device))
    
    # Restore optimizer state
    optimizer.load_state_dict(checkpoint['optimizer_state'])
    
    return checkpoint['epoch'], checkpoint['loss']

Gradient Clipping and Regularization

import warp as wp

class OptimizerWithClipping:
    def __init__(self, optimizer, max_grad_norm=1.0):
        self.optimizer = optimizer
        self.max_grad_norm = max_grad_norm
    
    def clip_gradients(self, parameters):
        """Clip gradients to prevent exploding gradients."""
        # Compute total gradient norm
        total_norm = 0.0
        for param in parameters:
            if param.grad is not None:
                param_norm = wp.norm(param.grad)
                total_norm += param_norm ** 2
        
        total_norm = wp.sqrt(total_norm)
        
        # Scale gradients if norm exceeds threshold
        if total_norm > self.max_grad_norm:
            clip_coef = self.max_grad_norm / (total_norm + 1e-6)
            for param in parameters:
                if param.grad is not None:
                    param.grad *= clip_coef
    
    def step(self, parameters):
        self.clip_gradients(parameters)
        self.optimizer.step()
    
    def zero_grad(self):
        self.optimizer.zero_grad()

# Usage
base_optimizer = wp.optim.Adam(model_params, lr=0.001)
optimizer = OptimizerWithClipping(base_optimizer, max_grad_norm=1.0)

# Training with gradient clipping
for batch in data_loader:
    loss = compute_loss(batch)
    wp.backward(loss)
    optimizer.step(model_params)
    optimizer.zero_grad()

Integration with Automatic Differentiation

import warp as wp

# Enable tape for automatic differentiation
tape = wp.Tape()

# Forward pass with tape recording
with tape:
    # Warp kernel computation
    @wp.kernel
    def neural_network_kernel(x: wp.array(dtype=float),
                             w: wp.array(dtype=float),
                             y: wp.array(dtype=float)):
        i = wp.tid()
        # Simple linear transformation
        y[i] = w[0] * x[i] + w[1]
    
    # Launch kernel
    wp.launch(neural_network_kernel, dim=data_size, 
              inputs=[input_data, weights, output_data])
    
    # Compute loss
    loss = wp.mean((output_data - target_data) ** 2)

# Backward pass
tape.backward(loss)

# Extract gradients
weight_gradients = tape.gradients[weights]

# Manual optimizer step
learning_rate = 0.01
weights.assign(weights - learning_rate * weight_gradients)

# Reset tape for next iteration
tape.zero()

Types

# Optimizer base interface
class Optimizer:
    """Base class for optimizers."""
    
    def __init__(self, params: list, lr: float):
        """Initialize optimizer with parameters and learning rate."""
    
    def step(self) -> None:
        """Perform optimization step."""
    
    def zero_grad(self) -> None:
        """Clear parameter gradients."""
    
    @property
    def param_groups(self) -> list:
        """List of parameter groups with optimization settings."""
    
    @property
    def state_dict(self) -> dict:
        """Optimizer state for checkpointing."""

# Parameter group structure
class ParameterGroup:
    """Group of parameters with shared optimization settings."""
    
    params: list    # List of parameter arrays
    lr: float      # Learning rate
    weight_decay: float  # L2 regularization
    
# Optimizer state for individual parameters
class ParameterState:
    """Per-parameter optimization state."""
    
    step: int      # Number of optimization steps
    exp_avg: array # Exponential moving average of gradients (Adam)
    exp_avg_sq: array # Exponential moving average of squared gradients (Adam)
    momentum_buffer: array # Momentum buffer (SGD)

Install with Tessl CLI

npx tessl i tessl/pypi-warp-lang

docs

core-execution.md

fem.md

framework-integration.md

index.md

kernel-programming.md

optimization.md

rendering.md

types-arrays.md

utilities.md

tile.json