CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-warp-lang

A Python framework for high-performance simulation and graphics programming that JIT compiles Python functions to efficient GPU/CPU kernel code.

Overview
Eval results
Files

utilities.mddocs/

Utilities and Profiling

Warp provides comprehensive utilities for performance profiling, context management, timing, and development helpers. These tools are essential for optimizing Warp applications and managing GPU/CPU resources effectively.

Capabilities

Performance Timing

High-precision timing utilities for measuring kernel execution and memory operations.

class ScopedTimer:
    """Context manager for timing code blocks."""
    
    def __init__(self, name: str, detailed: bool = False, dict: dict = None):
        """
        Create scoped timer.
        
        Args:
            name: Timer name for identification
            detailed: Enable detailed kernel-level timing
            dict: Dictionary to store timing results
        """
    
    def __enter__(self) -> 'ScopedTimer':
        """Start timing on context entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Stop timing on context exit."""
    
    @property
    def elapsed(self) -> float:
        """Get elapsed time in seconds."""

class TimingResult:
    """Container for detailed timing information."""
    
    @property
    def kernel_time(self) -> float:
        """Total kernel execution time."""
    
    @property
    def memcpy_time(self) -> float:
        """Total memory copy time."""
    
    @property
    def memset_time(self) -> float:
        """Total memory set time."""
    
    @property
    def total_time(self) -> float:
        """Total execution time."""

def timing_begin() -> None:
    """Start global timing collection."""

def timing_end() -> TimingResult:
    """
    End timing collection and return results.
    
    Returns:
        TimingResult with detailed performance metrics
    """

def timing_print() -> None:
    """Print timing results to console."""

# Timing categories for filtering
TIMING_KERNEL = 1        # Kernel execution time
TIMING_KERNEL_BUILTIN = 2 # Built-in kernel time  
TIMING_MEMCPY = 4        # Memory copy operations
TIMING_MEMSET = 8        # Memory set operations
TIMING_GRAPH = 16        # Graph operations
TIMING_ALL = 31          # All timing categories

Context Management

Scoped context managers for automatically managing device state, streams, and memory settings.

class ScopedDevice:
    """Context manager for temporary device switching."""
    
    def __init__(self, device: Device):
        """
        Create scoped device context.
        
        Args:
            device: Device to switch to during context
        """
    
    def __enter__(self) -> Device:
        """Switch to specified device."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Restore previous device."""

class ScopedStream:
    """Context manager for temporary stream switching."""
    
    def __init__(self, stream: Stream):
        """Create scoped stream context."""
    
    def __enter__(self) -> Stream:
        """Switch to specified stream."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Restore previous stream."""

class ScopedMempool:
    """Context manager for temporary memory pool settings."""
    
    def __init__(self, enabled: bool):
        """
        Create scoped memory pool context.
        
        Args:
            enabled: Enable/disable memory pooling during context
        """
    
    def __enter__(self) -> None:
        """Apply memory pool setting."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Restore previous memory pool setting."""

class ScopedMempoolAccess:
    """Context manager for cross-device memory pool access."""
    
    def __init__(self, enabled: bool):
        """Create scoped memory pool access context."""
    
    def __enter__(self) -> None:
        """Apply memory pool access setting."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Restore previous access setting."""

class ScopedPeerAccess:
    """Context manager for peer-to-peer GPU memory access."""
    
    def __init__(self, enabled: bool):
        """Create scoped peer access context."""
    
    def __enter__(self) -> None:
        """Apply peer access setting."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Restore previous peer access setting."""

class ScopedCapture:
    """Context manager for CUDA graph capture."""
    
    def __init__(self, device: Device = None):
        """Create scoped capture context."""
    
    def __enter__(self) -> 'ScopedCapture':
        """Begin CUDA graph capture."""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """End capture and create graph."""
    
    def launch(self, stream: Stream = None) -> None:
        """Launch captured graph."""

Stream and Event Management

Utilities for managing CUDA streams and events for asynchronous execution.

class Stream:
    """CUDA stream for asynchronous execution."""
    
    def __init__(self, device: Device = None):
        """Create stream on specified device."""
    
    def synchronize(self) -> None:
        """Wait for all operations on stream to complete."""
    
    @property
    def device(self) -> Device:
        """Device associated with stream."""

class Event:
    """CUDA event for synchronization and timing."""
    
    def __init__(self, device: Device = None):
        """Create event on specified device."""
    
    def record(self, stream: Stream = None) -> None:
        """Record event on stream."""
    
    def synchronize(self) -> None:
        """Wait for event to complete."""
    
    def elapsed_time(self, end_event: 'Event') -> float:
        """Get elapsed time between events in milliseconds."""

def get_stream(device: Device = None) -> Stream:
    """Get current stream for device."""

def set_stream(stream: Stream) -> None:
    """Set current stream for stream's device."""

def wait_stream(stream: Stream, event: Event) -> None:
    """Make stream wait for event."""

def synchronize_stream(stream: Stream) -> None:
    """Wait for stream operations to complete."""

def record_event(event: Event, stream: Stream = None) -> None:
    """Record event on stream."""

def wait_event(event: Event, stream: Stream = None) -> None:
    """Make stream wait for event."""

def synchronize_event(event: Event) -> None:
    """Wait for event to complete."""

def get_event_elapsed_time(start: Event, end: Event) -> float:
    """Get elapsed time between events."""

Mathematical Utilities

Helper functions for common mathematical operations and transformations.

def transform_expand(t: transform) -> mat44:
    """
    Expand transform to 4x4 transformation matrix.
    
    Args:
        t: Transform (rotation + translation)
        
    Returns:
        4x4 transformation matrix
    """

def quat_between_vectors(a: vec3, b: vec3) -> quat:
    """
    Compute quaternion rotation between two vectors.
    
    Args:
        a: Source vector
        b: Target vector
        
    Returns:
        Quaternion representing rotation from a to b
    """

def map(func: Callable, 
       inputs: list, 
       device: Device = None,
       stream: Stream = None) -> list:
    """
    Apply function to arrays in parallel.
    
    Args:
        func: Function to apply
        inputs: List of input arrays
        device: Target device
        stream: CUDA stream for execution
        
    Returns:
        List of result arrays
    """

Memory Management Utilities

Functions for querying and controlling memory pool behavior.

def is_mempool_supported(device: Device = None) -> bool:
    """Check if memory pooling is supported on device."""

def is_mempool_enabled(device: Device = None) -> bool:
    """Check if memory pooling is enabled on device."""

def set_mempool_enabled(enabled: bool, device: Device = None) -> None:
    """Enable/disable memory pooling on device."""

def get_mempool_release_threshold(device: Device = None) -> int:
    """Get memory pool release threshold in bytes."""

def set_mempool_release_threshold(threshold: int, device: Device = None) -> None:
    """Set memory pool release threshold."""

def get_mempool_used_mem_current(device: Device = None) -> int:
    """Get current memory pool usage in bytes."""

def get_mempool_used_mem_high(device: Device = None) -> int:
    """Get peak memory pool usage in bytes."""

def is_mempool_access_supported(device: Device = None) -> bool:
    """Check if cross-device memory pool access is supported."""

def is_mempool_access_enabled(device: Device = None) -> bool:
    """Check if cross-device memory pool access is enabled."""

def set_mempool_access_enabled(enabled: bool, device: Device = None) -> None:
    """Enable/disable cross-device memory pool access."""

def is_peer_access_supported(device_a: Device, device_b: Device) -> bool:
    """Check if peer access is supported between devices."""

def is_peer_access_enabled(device_a: Device, device_b: Device) -> bool:
    """Check if peer access is enabled between devices."""

def set_peer_access_enabled(enabled: bool, device_a: Device, device_b: Device) -> None:
    """Enable/disable peer access between devices."""

Usage Examples

Performance Profiling

import warp as wp

# Initialize Warp with timing enabled
wp.init()
wp.config.enable_backward = True

# Basic timing with context manager
with wp.ScopedTimer("matrix_multiply"):
    result = wp.launch(matrix_mult_kernel, dim=1000000, inputs=[a, b, c])

print(f"Matrix multiplication took {timer.elapsed:.3f} seconds")

# Detailed timing collection
wp.timing_begin()

# Run multiple operations
wp.launch(kernel1, dim=100000, inputs=[data1])
wp.launch(kernel2, dim=200000, inputs=[data2]) 
wp.launch(kernel3, dim=150000, inputs=[data3])

# Get detailed results
timing_result = wp.timing_end()
print(f"Total kernel time: {timing_result.kernel_time:.3f}s")
print(f"Memory copy time: {timing_result.memcpy_time:.3f}s")
print(f"Total time: {timing_result.total_time:.3f}s")

# Print formatted timing report
wp.timing_print()

Device and Stream Management

import warp as wp

# Multi-GPU computation with scoped contexts
devices = wp.get_cuda_devices()

# Process data on multiple GPUs
results = []
for i, device in enumerate(devices):
    with wp.ScopedDevice(device):
        # Create stream for this device
        stream = wp.Stream(device)
        
        with wp.ScopedStream(stream):
            # Allocate data on current device
            data = wp.array(input_data[i], device=device)
            result = wp.zeros_like(data)
            
            # Launch kernel asynchronously
            wp.launch(process_kernel, dim=data.size, inputs=[data, result])
            
            results.append(result)

# Synchronize all devices
for device in devices:
    wp.synchronize_device(device)

Memory Pool Optimization

import warp as wp

# Configure memory pools for better performance
for device in wp.get_cuda_devices():
    with wp.ScopedDevice(device):
        # Enable memory pooling
        wp.set_mempool_enabled(True)
        
        # Set 1GB release threshold
        wp.set_mempool_release_threshold(1024 * 1024 * 1024)
        
        # Enable cross-device access for multi-GPU
        wp.set_mempool_access_enabled(True)

# Use scoped memory pool settings
with wp.ScopedMempool(enabled=False):
    # Disable pooling for this allocation
    large_array = wp.zeros(1000000000, dtype=wp.float32)

# Monitor memory usage
print(f"Current pool usage: {wp.get_mempool_used_mem_current()} bytes")
print(f"Peak pool usage: {wp.get_mempool_used_mem_high()} bytes")

Asynchronous Execution with Events

import warp as wp

# Create streams and events
stream1 = wp.Stream()
stream2 = wp.Stream()
event = wp.Event()

# Launch work on first stream
wp.launch(kernel1, dim=100000, inputs=[data1], stream=stream1)

# Record completion event
wp.record_event(event, stream1)

# Launch dependent work on second stream
wp.wait_event(event, stream2)  # Wait for first kernel
wp.launch(kernel2, dim=100000, inputs=[data2], stream=stream2)

# Measure timing between operations
start_event = wp.Event()
end_event = wp.Event()

wp.record_event(start_event)
wp.launch(timed_kernel, dim=50000, inputs=[data])
wp.record_event(end_event)

wp.synchronize()
elapsed = wp.get_event_elapsed_time(start_event, end_event)
print(f"Kernel execution time: {elapsed:.3f} ms")

CUDA Graph Capture

import warp as wp

# Capture sequence of operations as CUDA graph
with wp.ScopedCapture() as capture:
    # Launch sequence of kernels
    wp.launch(kernel1, dim=1000, inputs=[a, b])
    wp.launch(kernel2, dim=1000, inputs=[b, c])
    wp.launch(kernel3, dim=1000, inputs=[c, d])

# Replay captured graph multiple times (much faster)
for iteration in range(1000):
    capture.launch()

wp.synchronize()

Multi-threaded Execution

import warp as wp
import threading
import queue

def worker_thread(device_id: int, work_queue: queue.Queue, result_queue: queue.Queue):
    """Worker thread for processing on specific GPU."""
    device = wp.get_cuda_device(device_id)
    
    with wp.ScopedDevice(device):
        stream = wp.Stream()
        
        with wp.ScopedStream(stream):
            while True:
                try:
                    work_item = work_queue.get(timeout=1.0)
                    if work_item is None:  # Shutdown signal
                        break
                    
                    # Process work item
                    data, params = work_item
                    result = wp.zeros_like(data)
                    
                    wp.launch(worker_kernel, 
                             dim=data.size, 
                             inputs=[data, result, params])
                    
                    # Copy result back to CPU
                    result_cpu = result.numpy()
                    result_queue.put(result_cpu)
                    
                except queue.Empty:
                    continue

# Start worker threads for each GPU
num_gpus = wp.get_cuda_device_count()
work_queue = queue.Queue()
result_queue = queue.Queue()

threads = []
for gpu_id in range(num_gpus):
    thread = threading.Thread(target=worker_thread, 
                             args=(gpu_id, work_queue, result_queue))
    thread.start()
    threads.append(thread)

# Submit work
for i in range(100):
    work_data = wp.array(generate_work_data(i), device='cpu')
    work_params = generate_params(i)
    work_queue.put((work_data, work_params))

# Collect results
results = []
for i in range(100):
    result = result_queue.get()
    results.append(result)

# Shutdown workers
for _ in range(num_gpus):
    work_queue.put(None)

for thread in threads:
    thread.join()

Development and Debugging Utilities

import warp as wp

# Debug timing breakdown
timing_dict = {}

with wp.ScopedTimer("initialization", dict=timing_dict):
    wp.init()
    data = wp.zeros(1000000, dtype=float)

with wp.ScopedTimer("computation", dict=timing_dict):
    wp.launch(compute_kernel, dim=1000000, inputs=[data])

with wp.ScopedTimer("readback", dict=timing_dict):
    result = data.numpy()

# Print timing breakdown
for name, time in timing_dict.items():
    print(f"{name}: {time:.3f}s")

# Transform utilities
rotation = wp.quat_from_axis_angle(wp.vec3(0, 1, 0), wp.pi / 4)
translation = wp.vec3(1, 2, 3)
transform = wp.transform(translation, rotation)

# Convert to matrix for OpenGL/rendering
matrix = wp.transform_expand(transform)
print(f"Transformation matrix:\n{matrix}")

# Vector rotation utility
v1 = wp.normalize(wp.vec3(1, 0, 0))
v2 = wp.normalize(wp.vec3(0, 1, 0))
rotation_quat = wp.quat_between_vectors(v1, v2)
print(f"Rotation between vectors: {rotation_quat}")

Types

# Timing types
class Timer:
    """High-precision timer."""
    
    def start(self) -> None:
        """Start timer."""
    
    def stop(self) -> None:
        """Stop timer."""
    
    def elapsed(self) -> float:
        """Get elapsed time in seconds."""

# Stream and event types
class StreamState:
    """Stream state information."""
    
    device: Device
    priority: int
    flags: int

class EventState:
    """Event state information."""
    
    device: Device
    recorded: bool
    flags: int

# Memory pool statistics
class MempoolStats:
    """Memory pool usage statistics."""
    
    used_current: int    # Current usage in bytes
    used_high: int      # Peak usage in bytes
    reserved: int       # Reserved memory in bytes
    free: int          # Free memory in bytes

# Context manager base
class ScopedContext:
    """Base class for scoped context managers."""
    
    def __enter__(self):
        """Context entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context exit with cleanup."""

Install with Tessl CLI

npx tessl i tessl/pypi-warp-lang

docs

core-execution.md

fem.md

framework-integration.md

index.md

kernel-programming.md

optimization.md

rendering.md

types-arrays.md

utilities.md

tile.json