A Python framework for high-performance simulation and graphics programming that JIT compiles Python functions to efficient GPU/CPU kernel code.
Warp provides comprehensive utilities for performance profiling, context management, timing, and development helpers. These tools are essential for optimizing Warp applications and managing GPU/CPU resources effectively.
High-precision timing utilities for measuring kernel execution and memory operations.
class ScopedTimer:
"""Context manager for timing code blocks."""
def __init__(self, name: str, detailed: bool = False, dict: dict = None):
"""
Create scoped timer.
Args:
name: Timer name for identification
detailed: Enable detailed kernel-level timing
dict: Dictionary to store timing results
"""
def __enter__(self) -> 'ScopedTimer':
"""Start timing on context entry."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Stop timing on context exit."""
@property
def elapsed(self) -> float:
"""Get elapsed time in seconds."""
class TimingResult:
"""Container for detailed timing information."""
@property
def kernel_time(self) -> float:
"""Total kernel execution time."""
@property
def memcpy_time(self) -> float:
"""Total memory copy time."""
@property
def memset_time(self) -> float:
"""Total memory set time."""
@property
def total_time(self) -> float:
"""Total execution time."""
def timing_begin() -> None:
"""Start global timing collection."""
def timing_end() -> TimingResult:
"""
End timing collection and return results.
Returns:
TimingResult with detailed performance metrics
"""
def timing_print() -> None:
"""Print timing results to console."""
# Timing categories for filtering
TIMING_KERNEL = 1 # Kernel execution time
TIMING_KERNEL_BUILTIN = 2 # Built-in kernel time
TIMING_MEMCPY = 4 # Memory copy operations
TIMING_MEMSET = 8 # Memory set operations
TIMING_GRAPH = 16 # Graph operations
TIMING_ALL = 31 # All timing categoriesScoped context managers for automatically managing device state, streams, and memory settings.
class ScopedDevice:
"""Context manager for temporary device switching."""
def __init__(self, device: Device):
"""
Create scoped device context.
Args:
device: Device to switch to during context
"""
def __enter__(self) -> Device:
"""Switch to specified device."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Restore previous device."""
class ScopedStream:
"""Context manager for temporary stream switching."""
def __init__(self, stream: Stream):
"""Create scoped stream context."""
def __enter__(self) -> Stream:
"""Switch to specified stream."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Restore previous stream."""
class ScopedMempool:
"""Context manager for temporary memory pool settings."""
def __init__(self, enabled: bool):
"""
Create scoped memory pool context.
Args:
enabled: Enable/disable memory pooling during context
"""
def __enter__(self) -> None:
"""Apply memory pool setting."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Restore previous memory pool setting."""
class ScopedMempoolAccess:
"""Context manager for cross-device memory pool access."""
def __init__(self, enabled: bool):
"""Create scoped memory pool access context."""
def __enter__(self) -> None:
"""Apply memory pool access setting."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Restore previous access setting."""
class ScopedPeerAccess:
"""Context manager for peer-to-peer GPU memory access."""
def __init__(self, enabled: bool):
"""Create scoped peer access context."""
def __enter__(self) -> None:
"""Apply peer access setting."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Restore previous peer access setting."""
class ScopedCapture:
"""Context manager for CUDA graph capture."""
def __init__(self, device: Device = None):
"""Create scoped capture context."""
def __enter__(self) -> 'ScopedCapture':
"""Begin CUDA graph capture."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""End capture and create graph."""
def launch(self, stream: Stream = None) -> None:
"""Launch captured graph."""Utilities for managing CUDA streams and events for asynchronous execution.
class Stream:
"""CUDA stream for asynchronous execution."""
def __init__(self, device: Device = None):
"""Create stream on specified device."""
def synchronize(self) -> None:
"""Wait for all operations on stream to complete."""
@property
def device(self) -> Device:
"""Device associated with stream."""
class Event:
"""CUDA event for synchronization and timing."""
def __init__(self, device: Device = None):
"""Create event on specified device."""
def record(self, stream: Stream = None) -> None:
"""Record event on stream."""
def synchronize(self) -> None:
"""Wait for event to complete."""
def elapsed_time(self, end_event: 'Event') -> float:
"""Get elapsed time between events in milliseconds."""
def get_stream(device: Device = None) -> Stream:
"""Get current stream for device."""
def set_stream(stream: Stream) -> None:
"""Set current stream for stream's device."""
def wait_stream(stream: Stream, event: Event) -> None:
"""Make stream wait for event."""
def synchronize_stream(stream: Stream) -> None:
"""Wait for stream operations to complete."""
def record_event(event: Event, stream: Stream = None) -> None:
"""Record event on stream."""
def wait_event(event: Event, stream: Stream = None) -> None:
"""Make stream wait for event."""
def synchronize_event(event: Event) -> None:
"""Wait for event to complete."""
def get_event_elapsed_time(start: Event, end: Event) -> float:
"""Get elapsed time between events."""Helper functions for common mathematical operations and transformations.
def transform_expand(t: transform) -> mat44:
"""
Expand transform to 4x4 transformation matrix.
Args:
t: Transform (rotation + translation)
Returns:
4x4 transformation matrix
"""
def quat_between_vectors(a: vec3, b: vec3) -> quat:
"""
Compute quaternion rotation between two vectors.
Args:
a: Source vector
b: Target vector
Returns:
Quaternion representing rotation from a to b
"""
def map(func: Callable,
inputs: list,
device: Device = None,
stream: Stream = None) -> list:
"""
Apply function to arrays in parallel.
Args:
func: Function to apply
inputs: List of input arrays
device: Target device
stream: CUDA stream for execution
Returns:
List of result arrays
"""Functions for querying and controlling memory pool behavior.
def is_mempool_supported(device: Device = None) -> bool:
"""Check if memory pooling is supported on device."""
def is_mempool_enabled(device: Device = None) -> bool:
"""Check if memory pooling is enabled on device."""
def set_mempool_enabled(enabled: bool, device: Device = None) -> None:
"""Enable/disable memory pooling on device."""
def get_mempool_release_threshold(device: Device = None) -> int:
"""Get memory pool release threshold in bytes."""
def set_mempool_release_threshold(threshold: int, device: Device = None) -> None:
"""Set memory pool release threshold."""
def get_mempool_used_mem_current(device: Device = None) -> int:
"""Get current memory pool usage in bytes."""
def get_mempool_used_mem_high(device: Device = None) -> int:
"""Get peak memory pool usage in bytes."""
def is_mempool_access_supported(device: Device = None) -> bool:
"""Check if cross-device memory pool access is supported."""
def is_mempool_access_enabled(device: Device = None) -> bool:
"""Check if cross-device memory pool access is enabled."""
def set_mempool_access_enabled(enabled: bool, device: Device = None) -> None:
"""Enable/disable cross-device memory pool access."""
def is_peer_access_supported(device_a: Device, device_b: Device) -> bool:
"""Check if peer access is supported between devices."""
def is_peer_access_enabled(device_a: Device, device_b: Device) -> bool:
"""Check if peer access is enabled between devices."""
def set_peer_access_enabled(enabled: bool, device_a: Device, device_b: Device) -> None:
"""Enable/disable peer access between devices."""import warp as wp
# Initialize Warp with timing enabled
wp.init()
wp.config.enable_backward = True
# Basic timing with context manager
with wp.ScopedTimer("matrix_multiply"):
result = wp.launch(matrix_mult_kernel, dim=1000000, inputs=[a, b, c])
print(f"Matrix multiplication took {timer.elapsed:.3f} seconds")
# Detailed timing collection
wp.timing_begin()
# Run multiple operations
wp.launch(kernel1, dim=100000, inputs=[data1])
wp.launch(kernel2, dim=200000, inputs=[data2])
wp.launch(kernel3, dim=150000, inputs=[data3])
# Get detailed results
timing_result = wp.timing_end()
print(f"Total kernel time: {timing_result.kernel_time:.3f}s")
print(f"Memory copy time: {timing_result.memcpy_time:.3f}s")
print(f"Total time: {timing_result.total_time:.3f}s")
# Print formatted timing report
wp.timing_print()import warp as wp
# Multi-GPU computation with scoped contexts
devices = wp.get_cuda_devices()
# Process data on multiple GPUs
results = []
for i, device in enumerate(devices):
with wp.ScopedDevice(device):
# Create stream for this device
stream = wp.Stream(device)
with wp.ScopedStream(stream):
# Allocate data on current device
data = wp.array(input_data[i], device=device)
result = wp.zeros_like(data)
# Launch kernel asynchronously
wp.launch(process_kernel, dim=data.size, inputs=[data, result])
results.append(result)
# Synchronize all devices
for device in devices:
wp.synchronize_device(device)import warp as wp
# Configure memory pools for better performance
for device in wp.get_cuda_devices():
with wp.ScopedDevice(device):
# Enable memory pooling
wp.set_mempool_enabled(True)
# Set 1GB release threshold
wp.set_mempool_release_threshold(1024 * 1024 * 1024)
# Enable cross-device access for multi-GPU
wp.set_mempool_access_enabled(True)
# Use scoped memory pool settings
with wp.ScopedMempool(enabled=False):
# Disable pooling for this allocation
large_array = wp.zeros(1000000000, dtype=wp.float32)
# Monitor memory usage
print(f"Current pool usage: {wp.get_mempool_used_mem_current()} bytes")
print(f"Peak pool usage: {wp.get_mempool_used_mem_high()} bytes")import warp as wp
# Create streams and events
stream1 = wp.Stream()
stream2 = wp.Stream()
event = wp.Event()
# Launch work on first stream
wp.launch(kernel1, dim=100000, inputs=[data1], stream=stream1)
# Record completion event
wp.record_event(event, stream1)
# Launch dependent work on second stream
wp.wait_event(event, stream2) # Wait for first kernel
wp.launch(kernel2, dim=100000, inputs=[data2], stream=stream2)
# Measure timing between operations
start_event = wp.Event()
end_event = wp.Event()
wp.record_event(start_event)
wp.launch(timed_kernel, dim=50000, inputs=[data])
wp.record_event(end_event)
wp.synchronize()
elapsed = wp.get_event_elapsed_time(start_event, end_event)
print(f"Kernel execution time: {elapsed:.3f} ms")import warp as wp
# Capture sequence of operations as CUDA graph
with wp.ScopedCapture() as capture:
# Launch sequence of kernels
wp.launch(kernel1, dim=1000, inputs=[a, b])
wp.launch(kernel2, dim=1000, inputs=[b, c])
wp.launch(kernel3, dim=1000, inputs=[c, d])
# Replay captured graph multiple times (much faster)
for iteration in range(1000):
capture.launch()
wp.synchronize()import warp as wp
import threading
import queue
def worker_thread(device_id: int, work_queue: queue.Queue, result_queue: queue.Queue):
"""Worker thread for processing on specific GPU."""
device = wp.get_cuda_device(device_id)
with wp.ScopedDevice(device):
stream = wp.Stream()
with wp.ScopedStream(stream):
while True:
try:
work_item = work_queue.get(timeout=1.0)
if work_item is None: # Shutdown signal
break
# Process work item
data, params = work_item
result = wp.zeros_like(data)
wp.launch(worker_kernel,
dim=data.size,
inputs=[data, result, params])
# Copy result back to CPU
result_cpu = result.numpy()
result_queue.put(result_cpu)
except queue.Empty:
continue
# Start worker threads for each GPU
num_gpus = wp.get_cuda_device_count()
work_queue = queue.Queue()
result_queue = queue.Queue()
threads = []
for gpu_id in range(num_gpus):
thread = threading.Thread(target=worker_thread,
args=(gpu_id, work_queue, result_queue))
thread.start()
threads.append(thread)
# Submit work
for i in range(100):
work_data = wp.array(generate_work_data(i), device='cpu')
work_params = generate_params(i)
work_queue.put((work_data, work_params))
# Collect results
results = []
for i in range(100):
result = result_queue.get()
results.append(result)
# Shutdown workers
for _ in range(num_gpus):
work_queue.put(None)
for thread in threads:
thread.join()import warp as wp
# Debug timing breakdown
timing_dict = {}
with wp.ScopedTimer("initialization", dict=timing_dict):
wp.init()
data = wp.zeros(1000000, dtype=float)
with wp.ScopedTimer("computation", dict=timing_dict):
wp.launch(compute_kernel, dim=1000000, inputs=[data])
with wp.ScopedTimer("readback", dict=timing_dict):
result = data.numpy()
# Print timing breakdown
for name, time in timing_dict.items():
print(f"{name}: {time:.3f}s")
# Transform utilities
rotation = wp.quat_from_axis_angle(wp.vec3(0, 1, 0), wp.pi / 4)
translation = wp.vec3(1, 2, 3)
transform = wp.transform(translation, rotation)
# Convert to matrix for OpenGL/rendering
matrix = wp.transform_expand(transform)
print(f"Transformation matrix:\n{matrix}")
# Vector rotation utility
v1 = wp.normalize(wp.vec3(1, 0, 0))
v2 = wp.normalize(wp.vec3(0, 1, 0))
rotation_quat = wp.quat_between_vectors(v1, v2)
print(f"Rotation between vectors: {rotation_quat}")# Timing types
class Timer:
"""High-precision timer."""
def start(self) -> None:
"""Start timer."""
def stop(self) -> None:
"""Stop timer."""
def elapsed(self) -> float:
"""Get elapsed time in seconds."""
# Stream and event types
class StreamState:
"""Stream state information."""
device: Device
priority: int
flags: int
class EventState:
"""Event state information."""
device: Device
recorded: bool
flags: int
# Memory pool statistics
class MempoolStats:
"""Memory pool usage statistics."""
used_current: int # Current usage in bytes
used_high: int # Peak usage in bytes
reserved: int # Reserved memory in bytes
free: int # Free memory in bytes
# Context manager base
class ScopedContext:
"""Base class for scoped context managers."""
def __enter__(self):
"""Context entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context exit with cleanup."""Install with Tessl CLI
npx tessl i tessl/pypi-warp-lang