CuPy: NumPy & SciPy for GPU - CUDA 11.x optimized distribution providing GPU-accelerated computing with Python
—
CuPy provides comprehensive CUDA integration capabilities for advanced GPU programming, offering direct device management, memory operations, kernel execution, stream processing, and low-level CUDA API access optimized for high-performance computing applications.
Core CUDA device management for controlling GPU devices and execution contexts.
class Device:
"""
CUDA device context manager.
This class provides a convenient interface for managing CUDA device
contexts and switching between multiple GPUs.
"""
def __init__(self, device=None):
"""
Parameters:
device: int or Device, optional - CUDA device ID or Device object
"""
def __enter__(self): ...
def __exit__(self, *args): ...
def use(self):
"""Use this device for subsequent operations."""
@property
def id(self):
"""Get the device ID."""
def get_device_id():
"""
Get the current CUDA device ID.
"""
def get_cublas_handle():
"""
Get the cuBLAS handle for the current device.
"""
def synchronize():
"""
Synchronize the current device.
"""
def is_available():
"""
Check if CUDA is available.
"""Comprehensive memory management for GPU device memory, including allocators and memory pools.
def alloc(size):
"""
Allocate device memory.
Parameters:
size: int - Size in bytes to allocate
"""
def malloc_managed(size):
"""
Allocate managed memory (Unified Memory).
Parameters:
size: int - Size in bytes to allocate
"""
def malloc_async(size):
"""
Allocate memory asynchronously.
Parameters:
size: int - Size in bytes to allocate
"""
class BaseMemory:
"""
Base class for memory objects.
This is the base class for all memory types in CuPy,
providing common interface for memory management.
"""
def __init__(self, size): ...
@property
def ptr(self):
"""Get memory pointer."""
@property
def size(self):
"""Get memory size in bytes."""
class Memory(BaseMemory):
"""
Device memory object.
Represents a chunk of device memory allocated on GPU.
"""
class ManagedMemory(BaseMemory):
"""
Managed memory object.
Represents unified memory accessible from both CPU and GPU.
"""
class MemoryAsync(BaseMemory):
"""
Asynchronous memory object.
Represents memory allocated asynchronously using memory pools.
"""
class MemoryPointer:
"""
Pointer to a device memory region.
This class represents a pointer to device memory and provides
methods for accessing and manipulating memory contents.
"""
def __init__(self, mem, offset, size, owner=None): ...
def copy_from_device(self, src, size): ...
def copy_from_device_async(self, src, size, stream=None): ...
def copy_from_host(self, mem, size): ...
def copy_from_host_async(self, mem, size, stream=None): ...
def copy_to_host(self, mem, size): ...
def copy_to_host_async(self, mem, size, stream=None): ...
def memset(self, value, size): ...
def memset_async(self, value, size, stream=None): ...
class UnownedMemory:
"""
Unowned memory reference.
Represents a reference to memory that is not owned by this object,
useful for wrapping external memory allocations.
"""Memory pooling systems for efficient memory allocation and reuse.
class MemoryPool:
"""
Memory pool for device memory.
Memory pools reduce allocation overhead by reusing previously
allocated memory blocks.
"""
def __init__(self, allocator=None):
"""
Parameters:
allocator: function, optional - Custom allocator function
"""
def malloc(self, size): ...
def free(self, mem): ...
def free_all_blocks(self): ...
def free_all_free(self): ...
def n_free_blocks(self): ...
def used_bytes(self): ...
def total_bytes(self): ...
class MemoryAsyncPool:
"""
Asynchronous memory pool.
Provides asynchronous memory allocation with stream ordering.
"""
def __init__(self, allocator=None): ...
def set_allocator(allocator):
"""
Set the memory allocator.
Parameters:
allocator: function or Allocator - Memory allocator to use
"""
def get_allocator():
"""
Get the current memory allocator.
"""
class PythonFunctionAllocator:
"""
Memory allocator using a Python function.
Wraps a Python function to provide custom memory allocation.
"""
def __init__(self, func, arg): ...
class CFunctionAllocator:
"""
Memory allocator using a C function.
Wraps a C function pointer for memory allocation.
"""
def __init__(self, func, arg): ...
def using_allocator(allocator=None):
"""
Context manager for temporarily using a different allocator.
Parameters:
allocator: Allocator, optional - Allocator to use temporarily
"""Host-side pinned memory management for efficient host-device transfers.
def alloc_pinned_memory(size):
"""
Allocate pinned host memory.
Parameters:
size: int - Size in bytes to allocate
"""
class PinnedMemory:
"""
Pinned host memory object.
Represents page-locked host memory that can be accessed
by the GPU for faster transfers.
"""
def __init__(self, size): ...
class PinnedMemoryPointer:
"""
Pointer to pinned memory region.
Provides interface for accessing pinned memory contents.
"""
def __init__(self, mem, offset, size, owner): ...
class PinnedMemoryPool:
"""
Memory pool for pinned memory.
Manages allocation and reuse of pinned host memory.
"""
def __init__(self, allocator=None): ...
def malloc(self, size): ...
def free(self, mem): ...
def set_pinned_memory_allocator(allocator):
"""
Set the pinned memory allocator.
Parameters:
allocator: function - Pinned memory allocator function
"""CUDA streams and events for managing asynchronous operations and synchronization.
class Stream:
"""
CUDA stream for asynchronous operations.
Streams allow operations to be executed asynchronously and
can be used to overlap computation and memory transfers.
"""
def __init__(self, null=False, non_blocking=False, ptds=False):
"""
Parameters:
null: bool, optional - Use the default stream if True
non_blocking: bool, optional - Create a non-blocking stream
ptds: bool, optional - Use per-thread default stream
"""
def __enter__(self): ...
def __exit__(self, *args): ...
def synchronize(self): ...
def add_callback(self, callback, arg): ...
def record(self, event=None): ...
def wait_event(self, event): ...
@property
def ptr(self):
"""Get the stream pointer."""
class ExternalStream:
"""
Wrapper for external CUDA stream.
Allows integration with CUDA streams created outside of CuPy.
"""
def __init__(self, ptr): ...
def get_current_stream():
"""
Get the current CUDA stream.
"""
class Event:
"""
CUDA event for synchronization.
Events provide a way to monitor the progress of operations
and synchronize between different streams.
"""
def __init__(self, block=True, disable_timing=False, interprocess=False):
"""
Parameters:
block: bool, optional - Use blocking synchronization
disable_timing: bool, optional - Disable timing measurements
interprocess: bool, optional - Enable interprocess usage
"""
def record(self, stream=None): ...
def synchronize(self): ...
def query(self): ...
def elapsed_time(self, end_event): ...
def get_elapsed_time(start_event, end_event):
"""
Get elapsed time between events.
Parameters:
start_event: Event - Start event
end_event: Event - End event
"""CUDA graphs for optimizing sequences of operations.
class Graph:
"""
CUDA graph for capturing and replaying operation sequences.
Graphs allow capturing a sequence of CUDA operations and
replaying them efficiently with reduced launch overhead.
"""
def __init__(self): ...
def begin_capture(self, stream=None): ...
def end_capture(self, stream=None): ...
def launch(self, stream=None): ...
def debug_dot_print(self, path): ...CUDA kernel compilation and execution management.
class Function:
"""
CUDA function object.
Represents a compiled CUDA kernel function that can be launched
with specified grid and block dimensions.
"""
def __init__(self, module, name):
"""
Parameters:
module: Module - CUDA module containing the function
name: str - Function name
"""
def __call__(self, grid, block, args, **kwargs): ...
@property
def attributes(self):
"""Get function attributes."""
class Module:
"""
CUDA module containing compiled device code.
Modules contain one or more CUDA kernels and can be loaded
from PTX or CUBIN code.
"""
def __init__(self): ...
def get_function(self, name): ...
def get_global(self, name): ...
def get_texref(self, name): ...
@classmethod
def load_file(cls, filename): ...
@classmethod
def load_from_string(cls, source): ...Hooks for monitoring and controlling memory allocation behavior.
class MemoryHook:
"""
Base class for memory allocation hooks.
Memory hooks allow monitoring and customization of memory
allocation and deallocation operations.
"""
def alloc_preprocess(self, **kwargs): ...
def alloc_postprocess(self, mem): ...
def free_preprocess(self, mem): ...
def free_postprocess(self, mem): ...Tools for profiling and debugging CUDA applications.
def profile():
"""
Context manager for CUDA profiling (deprecated).
Note: This is deprecated. Use cupyx.profiler.profile() instead.
"""Functions for querying CUDA runtime and environment information.
def get_local_runtime_version():
"""
Get the local CUDA runtime version.
"""
def get_cuda_path():
"""
Get the CUDA installation path.
"""
def get_nvcc_path():
"""
Get the path to nvcc compiler.
"""
def get_rocm_path():
"""
Get the ROCm installation path (for AMD GPUs).
"""
def get_hipcc_path():
"""
Get the path to hipcc compiler (for AMD GPUs).
"""Access to low-level CUDA APIs for advanced users.
# CUDA Driver API
driver = cupy.cuda.driver
# CUDA Runtime API
runtime = cupy.cuda.runtime
# NVRTC Compiler API
nvrtc = cupy.cuda.nvrtc
# Backend library wrappers (lazy-loaded)
cublas = cupy.cuda.cublas # cuBLAS operations
cusolver = cupy.cuda.cusolver # cuSOLVER linear algebra
cusparse = cupy.cuda.cusparse # cuSPARSE sparse operations
curand = cupy.cuda.curand # cuRAND random numbers
nvtx = cupy.cuda.nvtx # NVTX profiling markersimport cupy as cp
import cupy.cuda as cuda
# Device management
print(f"Current device: {cuda.get_device_id()}")
print(f"CUDA available: {cuda.is_available()}")
# Using specific devices
with cuda.Device(0):
# Operations on device 0
x = cp.array([1, 2, 3])
with cuda.Device(1): # If multiple GPUs available
# Operations on device 1
y = cp.array([4, 5, 6])
# Memory management
# Direct memory allocation
mem = cuda.alloc(1024) # Allocate 1KB
ptr = cuda.MemoryPointer(mem, 0, 1024)
# Using memory pools (recommended)
pool = cuda.MemoryPool()
with cuda.using_allocator(pool.malloc):
# All allocations use the pool
large_array = cp.zeros((10000, 10000))
# Memory pool statistics
print(f"Used memory: {pool.used_bytes()} bytes")
print(f"Total memory: {pool.total_bytes()} bytes")
# Stream management for asynchronous operations
stream1 = cuda.Stream()
stream2 = cuda.Stream()
with stream1:
# Operations executed on stream1
a = cp.random.rand(1000, 1000)
b = cp.random.rand(1000, 1000)
with stream2:
# Operations executed on stream2 (can overlap with stream1)
c = cp.random.rand(1000, 1000)
d = cp.random.rand(1000, 1000)
# Synchronization
stream1.synchronize() # Wait for stream1 to complete
stream2.synchronize() # Wait for stream2 to complete
# Event-based synchronization
event = cuda.Event()
with stream1:
result1 = cp.dot(a, b)
event.record() # Record completion of operations
with stream2:
stream2.wait_event(event) # Wait for stream1 operations
result2 = cp.dot(c, d) + result1 # Uses result from stream1
# Measuring execution time with events
start_event = cuda.Event()
end_event = cuda.Event()
start_event.record()
# Some operations
large_computation = cp.dot(cp.random.rand(5000, 5000),
cp.random.rand(5000, 5000))
end_event.record()
end_event.synchronize()
elapsed_ms = cuda.get_elapsed_time(start_event, end_event)
print(f"Computation took {elapsed_ms} ms")
# Pinned memory for faster transfers
pinned_mem = cuda.alloc_pinned_memory(1000 * 8) # 1000 float64s
pinned_array = cp.ndarray((1000,), dtype=cp.float64,
memptr=cuda.MemoryPointer(pinned_mem, 0, 1000 * 8))
# Custom kernel example using RawKernel
kernel_code = r'''
extern "C" __global__
void vector_add(float* x, float* y, float* z, int n) {
int tid = blockDim.x * blockIdx.x + threadIdx.x;
if (tid < n) {
z[tid] = x[tid] + y[tid];
}
}
'''
kernel = cp.RawKernel(kernel_code, 'vector_add')
# Launch custom kernel
n = 1000
x = cp.random.rand(n, dtype=cp.float32)
y = cp.random.rand(n, dtype=cp.float32)
z = cp.zeros(n, dtype=cp.float32)
# Launch with appropriate grid/block size
threads_per_block = 256
blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
kernel((blocks_per_grid,), (threads_per_block,), (x, y, z, n))
# Memory hooks for monitoring
class MemoryTracker(cuda.MemoryHook):
def __init__(self):
self.allocated_bytes = 0
self.freed_bytes = 0
def alloc_postprocess(self, mem):
self.allocated_bytes += mem.size
print(f"Allocated {mem.size} bytes")
def free_preprocess(self, mem):
self.freed_bytes += mem.size
print(f"Freed {mem.size} bytes")
tracker = MemoryTracker()
# Note: Memory hooks integration depends on CuPy version
# Working with CUDA graphs (for CUDA 10.0+)
if hasattr(cuda, 'Graph'):
graph = cuda.Graph()
# Capture operations in a graph
stream = cuda.Stream()
with stream:
graph.begin_capture(stream)
# Operations to be captured
x = cp.random.rand(1000, 1000)
y = cp.random.rand(1000, 1000)
z = x @ y
graph.end_capture(stream)
# Replay the graph multiple times efficiently
for _ in range(10):
graph.launch(stream)
stream.synchronize()
# Multi-GPU computation example
def multi_gpu_computation(data_list):
"""Distribute computation across multiple GPUs."""
n_gpus = cuda.runtime.getDeviceCount()
streams = []
results = []
for i, data in enumerate(data_list[:n_gpus]):
device_id = i % n_gpus
with cuda.Device(device_id):
stream = cuda.Stream()
streams.append(stream)
with stream:
# Transfer data to this GPU
gpu_data = cp.asarray(data)
# Perform computation
result = cp.sum(gpu_data ** 2)
results.append(result)
# Synchronize all streams
for stream in streams:
stream.synchronize()
return results
# Memory bandwidth benchmark
def memory_bandwidth_test(size_mb=100):
"""Test memory bandwidth between host and device."""
size_bytes = size_mb * 1024 * 1024
# Host memory
host_data = cp.asnumpy(cp.random.rand(size_bytes // 8))
# Pinned host memory for faster transfers
pinned_mem = cuda.alloc_pinned_memory(size_bytes)
# Time regular vs pinned memory transfers
import time
# Regular host memory
start = time.time()
gpu_data1 = cp.asarray(host_data)
cp.cuda.synchronize()
regular_time = time.time() - start
# Pinned memory (requires copying to pinned first)
start = time.time()
# Copy to pinned then to GPU would be done here
# This is a simplified example
pinned_time = time.time() - start
bandwidth_regular = size_mb / regular_time
print(f"Regular memory bandwidth: {bandwidth_regular:.2f} MB/s")
# Advanced memory pool configuration
def configure_memory_pool():
"""Configure memory pool for optimal performance."""
# Get the default memory pool
mempool = cp.get_default_memory_pool()
# Set memory pool growth strategy
# mempool.set_limit(size=2**30) # Limit to 1GB
# Monitor memory usage
print(f"Used bytes: {mempool.used_bytes()}")
print(f"Total bytes: {mempool.total_bytes()}")
# Force cleanup of unused memory
mempool.free_all_free()
return mempool
# Context management for robust error handling
def safe_gpu_computation():
"""Example of robust GPU computation with proper cleanup."""
stream = None
temp_arrays = []
try:
stream = cuda.Stream()
with stream:
# Temporary arrays that need cleanup
temp1 = cp.random.rand(10000, 10000)
temp2 = cp.random.rand(10000, 10000)
temp_arrays.extend([temp1, temp2])
# Main computation
result = temp1 @ temp2
# Synchronize to ensure completion
stream.synchronize()
return result
except Exception as e:
print(f"GPU computation failed: {e}")
return None
finally:
# Cleanup resources
if stream:
stream.synchronize()
# Force garbage collection of temporary arrays
del temp_arrays
cp.get_default_memory_pool().free_all_free()# Use memory pools to reduce allocation overhead
with cuda.using_allocator(cp.get_default_memory_pool().malloc):
# All allocations reuse memory from the pool
data = cp.zeros((10000, 10000))
# Pre-allocate large arrays when possible
workspace = cp.zeros((10000, 10000)) # Reuse this array
# Use appropriate memory types
regular_mem = cuda.alloc(1024) # Regular device memory
managed_mem = cuda.malloc_managed(1024) # Unified memory# Overlap computation and memory transfers
compute_stream = cuda.Stream()
transfer_stream = cuda.Stream()
with transfer_stream:
# Asynchronous memory transfer
next_data = cp.asarray(host_data)
with compute_stream:
# Parallel computation
result = process_current_data(current_data)# Choose optimal grid/block dimensions
def optimal_launch_config(n, max_threads_per_block=1024):
"""Calculate optimal CUDA launch configuration."""
if n <= max_threads_per_block:
return (1, n)
else:
threads_per_block = max_threads_per_block
blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
return (blocks_per_grid, threads_per_block)
grid, block = optimal_launch_config(1000000)CUDA integration in CuPy provides comprehensive low-level GPU programming capabilities, enabling advanced memory management, asynchronous execution, custom kernel development, and performance optimization for high-performance computing applications while maintaining compatibility with the broader CUDA ecosystem.
Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda11x