CuPy is a NumPy/SciPy-compatible array library for GPU-accelerated computing with Python
—
Direct access to CUDA functionality for fine-grained GPU control, memory management, device handling, and performance optimization. Provides low-level CUDA operations while maintaining Python integration.
def is_available():
"""
Check if CUDA is available.
Returns:
bool: True if CUDA devices are available
"""
def get_device_id():
"""Get current device ID."""
class Device:
"""
CUDA device context manager.
Parameters:
- device: device ID or None for current device
"""
def __init__(self, device=None): ...
def __enter__(self): ...
def __exit__(self, *args): ...
def get_cublas_handle():
"""Get cuBLAS handle for current device."""def alloc(size):
"""
Allocate GPU memory.
Parameters:
- size: size in bytes
Returns:
MemoryPointer: pointer to allocated memory
"""
class Memory:
"""GPU memory object."""
def __init__(self): ...
@property
def ptr(self): ...
@property
def size(self): ...
class MemoryPointer:
"""Pointer to GPU memory."""
def __init__(self): ...
@property
def device(self): ...
class MemoryPool:
"""
Memory pool for GPU memory allocation.
Parameters:
- allocator: memory allocator function
"""
def __init__(self, allocator=None): ...
def malloc(self, size): ...
def free(self, ptr, size): ...
def free_all_blocks(self): ...
def free_all_free(self): ...
def n_free_blocks(self): ...
def used_bytes(self): ...
def free_bytes(self): ...
def total_bytes(self): ...
class MemoryAsync:
"""Asynchronous memory allocation."""
class MemoryAsyncPool:
"""Asynchronous memory pool."""
def __init__(self): ...
class ManagedMemory:
"""CUDA managed memory allocation."""
class UnownedMemory:
"""Reference to unowned memory."""
class BaseMemory:
"""Base class for memory objects."""
def malloc_managed(size, device=None):
"""Allocate managed memory."""
def malloc_async(size, stream=None):
"""Allocate memory asynchronously."""
def set_allocator(allocator):
"""Set default memory allocator."""
def get_allocator():
"""Get current memory allocator."""
class PythonFunctionAllocator:
"""Python function-based allocator."""
def __init__(self, func, arg): ...
class CFunctionAllocator:
"""C function-based allocator."""
def __init__(self, func_ptr, arg_ptr): ...def alloc_pinned_memory(size):
"""
Allocate pinned (page-locked) memory.
Parameters:
- size: size in bytes
Returns:
PinnedMemoryPointer: pointer to pinned memory
"""
class PinnedMemory:
"""Pinned memory object."""
class PinnedMemoryPointer:
"""Pointer to pinned memory."""
class PinnedMemoryPool:
"""
Memory pool for pinned memory.
Parameters:
- allocator: memory allocator function
"""
def __init__(self, allocator=None): ...
def malloc(self, size): ...
def free(self, ptr, size): ...
def set_pinned_memory_allocator(allocator):
"""Set pinned memory allocator."""class Stream:
"""
CUDA stream for asynchronous operations.
Parameters:
- null: whether to use null stream
- non_blocking: whether stream is non-blocking
- ptds: per-thread default stream
"""
def __init__(self, null=False, non_blocking=False, ptds=False): ...
def synchronize(self): ...
def add_callback(self, callback, arg): ...
def record(self, event=None): ...
def wait_event(self, event): ...
@property
def ptr(self): ...
class ExternalStream:
"""
External CUDA stream wrapper.
Parameters:
- ptr: stream pointer
"""
def __init__(self, ptr): ...
class Event:
"""
CUDA event for timing and synchronization.
Parameters:
- blocking: whether event blocks
- disable_timing: disable timing capability
- interprocess: enable interprocess sharing
"""
def __init__(self, blocking=False, disable_timing=False, interprocess=False): ...
def record(self, stream=None): ...
def synchronize(self): ...
def query(self): ...
def elapsed_time(self, end_event): ...
def get_current_stream():
"""Get current CUDA stream."""
def get_elapsed_time(start_event, end_event):
"""Get elapsed time between events."""class Function:
"""CUDA function object."""
def __init__(self): ...
def __call__(self, grid, block, args, **kwargs): ...
class Module:
"""CUDA module object."""
def __init__(self): ...
def get_function(self, name): ...
def compile_with_cache(source, options=(), arch=None, cache_dir=None,
prepend_cupy_headers=True, backend='nvcc',
translate_cucomplex=True, enable_cooperative_groups=False,
name_expressions=None, log_stream=None,
cache_in_memory=False, jitify=False):
"""
Compile CUDA source code with caching.
Parameters:
- source: CUDA source code
- options: compiler options
- arch: target architecture
- cache_dir: cache directory
- prepend_cupy_headers: whether to prepend CuPy headers
- backend: compiler backend
- translate_cucomplex: translate cuComplex types
- enable_cooperative_groups: enable cooperative groups
- name_expressions: name expressions for kernel parameters
- log_stream: log stream for compilation messages
- cache_in_memory: cache compiled modules in memory
- jitify: use Jitify for compilation
Returns:
Module: compiled CUDA module
"""def using_allocator(allocator=None):
"""
Context manager for using specific allocator.
Parameters:
- allocator: memory allocator function
Returns:
context manager
"""class MemoryHook:
"""Base class for memory allocation hooks."""
def alloc_preprocess(self, **kwargs): ...
def alloc_postprocess(self, mem_ptr): ...
def free_preprocess(self, mem_ptr): ...
def free_postprocess(self, mem_ptr): ...# Sub-modules providing CUDA library access
import cupy.cuda.driver # CUDA Driver API
import cupy.cuda.runtime # CUDA Runtime API
import cupy.cuda.cublas # cuBLAS library
import cupy.cuda.curand # cuRAND library
import cupy.cuda.cusolver # cuSOLVER library
import cupy.cuda.cusparse # cuSPARSE library
import cupy.cuda.nvrtc # NVRTC library
import cupy.cuda.profiler # CUDA Profiler
import cupy.cuda.nvtx # NVIDIA Tools Extension (optional)
import cupy.cuda.thrust # Thrust library (optional)
import cupy.cuda.cub # CUB library
import cupy.cuda.jitify # Jitify library (optional)def get_cuda_path():
"""Get CUDA installation path."""
def get_nvcc_path():
"""Get NVCC compiler path."""
def get_rocm_path():
"""Get ROCm installation path."""
def get_hipcc_path():
"""Get HIPCC compiler path."""import cupy as cp
# Check CUDA availability
if cp.cuda.is_available():
print("CUDA is available")
device_count = cp.cuda.runtime.getDeviceCount()
print(f"Number of devices: {device_count}")
else:
print("CUDA is not available")
# Get current device
current_device = cp.cuda.get_device_id()
print(f"Current device: {current_device}")
# Use specific device
with cp.cuda.Device(1): # Use device 1
data = cp.random.random((1000, 1000))
result = cp.sum(data)
print(f"Computed on device: {cp.cuda.get_device_id()}")import cupy as cp
# Get default memory pool
mempool = cp.get_default_memory_pool()
# Check memory usage
print(f"Used bytes: {mempool.used_bytes()}")
print(f"Total bytes: {mempool.total_bytes()}")
# Allocate raw memory
raw_memory = cp.cuda.alloc(1024 * 1024) # 1MB
print(f"Allocated memory at: {raw_memory.ptr}")
# Use custom allocator
def custom_allocator(size):
print(f"Allocating {size} bytes")
return cp.cuda.memory.malloc(size)
with cp.cuda.using_allocator(custom_allocator):
array = cp.zeros(1000) # Uses custom allocator
# Clean up memory
mempool.free_all_free()import cupy as cp
import numpy as np
# Allocate pinned memory for faster transfers
pinned_mem = cp.cuda.alloc_pinned_memory(1000 * 8) # 1000 float64s
# Use pinned memory with NumPy array
pinned_array = np.frombuffer(pinned_mem, dtype=np.float64)
pinned_array[:] = np.random.random(1000)
# Transfer to GPU (faster with pinned memory)
gpu_array = cp.asarray(pinned_array)
# Pinned memory pool
pinned_pool = cp.get_default_pinned_memory_pool()
print(f"Pinned memory used: {pinned_pool.n_free_blocks()}")import cupy as cp
# Create CUDA streams
stream1 = cp.cuda.Stream()
stream2 = cp.cuda.Stream()
# Create events for timing
start_event = cp.cuda.Event()
end_event = cp.cuda.Event()
# Asynchronous operations
with stream1:
start_event.record()
# Compute on stream1
data1 = cp.random.random((5000, 5000))
result1 = cp.linalg.svd(data1)
end_event.record()
with stream2:
# Compute on stream2 simultaneously
data2 = cp.random.random((3000, 3000))
result2 = cp.fft.fft2(data2)
# Wait for completion and get timing
stream1.synchronize()
stream2.synchronize()
elapsed_time = cp.cuda.get_elapsed_time(start_event, end_event)
print(f"Stream1 computation took: {elapsed_time} ms")import cupy as cp
# Simple CUDA kernel source
kernel_source = '''
extern "C" __global__
void vector_add(float* a, float* b, float* c, int n) {
int idx = blockDim.x * blockIdx.x + threadIdx.x;
if (idx < n) {
c[idx] = a[idx] + b[idx];
}
}
'''
# Compile kernel
module = cp.cuda.compile_with_cache(kernel_source)
kernel = module.get_function('vector_add')
# Prepare data
n = 1000000
a = cp.random.random(n, dtype=cp.float32)
b = cp.random.random(n, dtype=cp.float32)
c = cp.zeros(n, dtype=cp.float32)
# Launch kernel
block_size = 256
grid_size = (n + block_size - 1) // block_size
kernel((grid_size,), (block_size,), (a, b, c, n))
# Verify result
expected = a + b
error = cp.linalg.norm(c - expected)
print(f"Kernel result error: {error}")import cupy as cp
class ProfilingHook(cp.cuda.MemoryHook):
def __init__(self):
self.alloc_count = 0
self.free_count = 0
self.total_allocated = 0
def alloc_preprocess(self, **kwargs):
size = kwargs.get('size', 0)
self.alloc_count += 1
self.total_allocated += size
print(f"Allocating {size} bytes (total: {self.total_allocated})")
def free_preprocess(self, mem_ptr):
self.free_count += 1
print(f"Freeing memory (free count: {self.free_count})")
# Install hook
hook = ProfilingHook()
cp.cuda.memory_hook.set_memory_hook(hook)
# Operations will now be logged
data = cp.random.random((1000, 1000))
result = cp.sum(data)
del data, result # Trigger memory free
print(f"Allocations: {hook.alloc_count}, Frees: {hook.free_count}")import cupy as cp
# Check available devices
device_count = cp.cuda.runtime.getDeviceCount()
print(f"Available devices: {device_count}")
if device_count > 1:
# Split computation across multiple GPUs
data = cp.random.random((10000, 10000))
# Split data
mid = data.shape[0] // 2
# Process first half on device 0
with cp.cuda.Device(0):
data1 = data[:mid].copy()
result1 = cp.linalg.svd(data1, compute_uv=False)
# Process second half on device 1
with cp.cuda.Device(1):
data2 = data[mid:].copy()
result2 = cp.linalg.svd(data2, compute_uv=False)
# Combine results (move to device 0)
with cp.cuda.Device(0):
combined_result = cp.concatenate([result1, result2])import cupy as cp
import time
# Deprecated profile context manager (use cupyx.profiler instead)
# with cp.cuda.profile():
# # Operations to profile
# pass
# Manual timing with events
def time_operation(func, *args, **kwargs):
start = cp.cuda.Event()
end = cp.cuda.Event()
start.record()
result = func(*args, **kwargs)
end.record()
end.synchronize()
elapsed = cp.cuda.get_elapsed_time(start, end)
return result, elapsed
# Time different operations
data = cp.random.random((5000, 5000))
svd_result, svd_time = time_operation(cp.linalg.svd, data, compute_uv=False)
fft_result, fft_time = time_operation(cp.fft.fft2, data)
print(f"SVD time: {svd_time:.2f} ms")
print(f"FFT time: {fft_time:.2f} ms")Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda12x