CuPy: NumPy & SciPy-compatible array library for GPU-accelerated computing with Python that provides a drop-in replacement for NumPy/SciPy on NVIDIA CUDA platforms.
—
Direct CUDA functionality including device management, stream control, memory management, and custom kernel execution. These features enable advanced GPU programming and performance optimization for CuPy applications.
Control and query GPU devices in multi-GPU systems.
class Device:
"""CUDA device context manager.
Provides context management for GPU device selection and ensures
operations execute on the specified device.
"""
def __init__(self, device=None):
"""Initialize device context.
Parameters:
- device: int or None, device ID to use (None for current device)
"""
def __enter__(self):
"""Enter device context."""
def __exit__(self, *args):
"""Exit device context and restore previous device."""
@property
def id(self):
"""Get device ID."""
def use(self):
"""Make this device current."""
def get_device_id():
"""Get current device ID.
Returns:
int: current CUDA device ID
"""
def is_available():
"""Check if CUDA is available.
Returns:
bool: True if CUDA is available
"""Manage CUDA streams for asynchronous operations and overlapping computation.
class Stream:
"""CUDA stream for asynchronous operations.
Enables overlapping of computation and memory transfers,
and provides synchronization control for GPU operations.
"""
def __init__(self, null=False, non_blocking=False, ptds=False):
"""Create CUDA stream.
Parameters:
- null: bool, create null stream (default stream)
- non_blocking: bool, create non-blocking stream
- ptds: bool, create per-thread default stream
"""
def __enter__(self):
"""Enter stream context."""
def __exit__(self, *args):
"""Exit stream context."""
def synchronize(self):
"""Synchronize stream execution."""
def add_callback(self, callback, arg):
"""Add callback to stream."""
@property
def ptr(self):
"""Get stream pointer."""
class ExternalStream:
"""Wrap external CUDA stream pointer.
Allows integration with external CUDA streams from other libraries.
"""
def __init__(self, ptr):
"""Wrap external stream.
Parameters:
- ptr: int, external stream pointer
"""
def get_current_stream():
"""Get current CUDA stream.
Returns:
Stream: current stream object
"""CUDA events for timing and synchronization.
class Event:
"""CUDA event for timing and synchronization.
Provides mechanisms for measuring elapsed time and
synchronizing between different streams.
"""
def __init__(self, blocking=False, disable_timing=False, interprocess=False):
"""Create CUDA event.
Parameters:
- blocking: bool, create blocking event
- disable_timing: bool, disable timing capability
- interprocess: bool, enable interprocess sharing
"""
def record(self, stream=None):
"""Record event in stream."""
def synchronize(self):
"""Synchronize on event completion."""
def elapsed_time(self, end_event):
"""Calculate elapsed time to another event.
Parameters:
- end_event: Event, ending event
Returns:
float: elapsed time in milliseconds
"""
def get_elapsed_time(start_event, end_event):
"""Get elapsed time between events.
Parameters:
- start_event: Event, starting event
- end_event: Event, ending event
Returns:
float: elapsed time in milliseconds
"""Advanced GPU memory allocation and management.
class Memory:
"""GPU memory allocation.
Represents a contiguous block of GPU memory with
automatic deallocation and reference counting.
"""
def __init__(self, size):
"""Allocate GPU memory.
Parameters:
- size: int, size in bytes
"""
@property
def ptr(self):
"""Get memory pointer."""
@property
def size(self):
"""Get memory size in bytes."""
class MemoryPointer:
"""Pointer to GPU memory with offset and size information."""
def __init__(self, mem, offset):
"""Create memory pointer.
Parameters:
- mem: Memory, memory object
- offset: int, offset from memory start
"""
class MemoryPool:
"""Memory pool for efficient GPU memory allocation.
Maintains a pool of allocated memory blocks to reduce
allocation overhead and memory fragmentation.
"""
def __init__(self, allocator=None):
"""Create memory pool.
Parameters:
- allocator: callable, custom memory allocator
"""
def malloc(self, size):
"""Allocate memory from pool.
Parameters:
- size: int, size in bytes
Returns:
MemoryPointer: pointer to allocated memory
"""
def free_all_blocks(self):
"""Free all unused memory blocks."""
def free_all_free(self):
"""Free all cached but unused memory."""
def used_bytes(self):
"""Get used memory in bytes.
Returns:
int: bytes currently in use
"""
def total_bytes(self):
"""Get total allocated memory in bytes.
Returns:
int: total bytes allocated from GPU
"""
def alloc(size):
"""Allocate GPU memory.
Parameters:
- size: int, size in bytes
Returns:
MemoryPointer: pointer to allocated memory
"""
def set_allocator(allocator=None):
"""Set GPU memory allocator.
Parameters:
- allocator: callable or None, memory allocator function
"""
def get_allocator():
"""Get current GPU memory allocator.
Returns:
callable: current allocator function
"""Host memory allocation for efficient GPU transfers.
class PinnedMemory:
"""Pinned (page-locked) host memory allocation.
Enables faster transfers between CPU and GPU by
preventing the OS from paging memory to disk.
"""
def __init__(self, size):
"""Allocate pinned memory.
Parameters:
- size: int, size in bytes
"""
class PinnedMemoryPool:
"""Memory pool for pinned host memory allocations."""
def malloc(self, size):
"""Allocate pinned memory from pool."""
def alloc_pinned_memory(size):
"""Allocate pinned host memory.
Parameters:
- size: int, size in bytes
Returns:
PinnedMemoryPointer: pointer to pinned memory
"""
def set_pinned_memory_allocator(allocator=None):
"""Set pinned memory allocator."""Access to specialized CUDA libraries through CuPy wrappers.
# cuBLAS integration
def get_cublas_handle():
"""Get cuBLAS handle for current device.
Returns:
int: cuBLAS handle pointer
"""
# Library modules available
class runtime:
"""CUDA Runtime API wrapper."""
class driver:
"""CUDA Driver API wrapper."""
class nvrtc:
"""NVIDIA Runtime Compilation API."""
class cublas:
"""cuBLAS Basic Linear Algebra Subprograms."""
class curand:
"""cuRAND Random Number Generation."""
class cusolver:
"""cuSOLVER Dense and Sparse Linear Algebra."""
class cusparse:
"""cuSPARSE Sparse Matrix Operations."""
class cufft:
"""cuFFT Fast Fourier Transform."""
class nvtx:
"""NVIDIA Tools Extension for profiling."""
class profiler:
"""CUDA Profiler control."""Tools for performance measurement and optimization.
def profile(*, warmup=1, repeat=5, preprocess=None, postprocess=None):
"""Context manager for performance profiling.
Parameters:
- warmup: int, number of warmup iterations
- repeat: int, number of measurement iterations
- preprocess: callable, setup function
- postprocess: callable, cleanup function
Returns:
context manager for profiling
"""
def compile_with_cache(source, filename, dirname=None, **kwargs):
"""Compile CUDA source with caching.
Parameters:
- source: str, CUDA source code
- filename: str, source filename
- dirname: str, cache directory
- kwargs: additional compilation options
Returns:
compiled module object
"""import cupy as cp
# Check available devices
print(f"Current device: {cp.cuda.get_device_id()}")
print(f"CUDA available: {cp.cuda.is_available()}")
# Use specific device
with cp.cuda.Device(1):
# Operations run on device 1
array = cp.zeros((1000, 1000))
result = cp.sum(array)
# Multi-GPU computation
devices = [0, 1]
arrays = []
for device_id in devices:
with cp.cuda.Device(device_id):
arrays.append(cp.random.random((5000, 5000)))
# Synchronize all devices
for device_id in devices:
with cp.cuda.Device(device_id):
cp.cuda.Stream.null.synchronize()import cupy as cp
# Create custom stream
stream = cp.cuda.Stream()
# Asynchronous operations
with stream:
a = cp.random.random((10000, 10000))
b = cp.random.random((10000, 10000))
c = cp.dot(a, b) # Runs asynchronously
# Synchronize stream
stream.synchronize()
# Multiple streams for overlapping
stream1 = cp.cuda.Stream()
stream2 = cp.cuda.Stream()
with stream1:
result1 = cp.fft.fft(cp.random.random(1000000))
with stream2:
result2 = cp.linalg.svd(cp.random.random((1000, 1000)))
# Both operations can run concurrently
stream1.synchronize()
stream2.synchronize()import cupy as cp
# Get default memory pool
pool = cp.get_default_memory_pool()
print(f"Used memory: {pool.used_bytes()} bytes")
print(f"Total memory: {pool.total_bytes()} bytes")
# Create large arrays
large_arrays = []
for i in range(10):
large_arrays.append(cp.zeros((1000, 1000), dtype=cp.float32))
print(f"After allocation - Used: {pool.used_bytes()} bytes")
# Free arrays (but memory stays in pool)
del large_arrays
print(f"After deletion - Used: {pool.used_bytes()} bytes")
# Actually free memory
pool.free_all_blocks()
print(f"After free_all_blocks - Used: {pool.used_bytes()} bytes")import cupy as cp
# Using events for precise timing
start_event = cp.cuda.Event()
end_event = cp.cuda.Event()
# Time a computation
start_event.record()
result = cp.linalg.svd(cp.random.random((5000, 5000)))
end_event.record()
# Get elapsed time
end_event.synchronize()
elapsed_time = cp.cuda.get_elapsed_time(start_event, end_event)
print(f"SVD took {elapsed_time:.2f} milliseconds")
# Using profile context manager
def my_computation():
a = cp.random.random((2000, 2000))
return cp.linalg.inv(a)
with cp.cuda.profile():
result = my_computation()import cupy as cp
import numpy as np
# Allocate pinned memory for faster transfers
size = 1000000
pinned_array = cp.cuda.alloc_pinned_memory(size * 4) # 4 bytes per float32
# Create numpy array using pinned memory
np_array = np.frombuffer(pinned_array, dtype=np.float32).reshape((1000, 1000))
np_array[:] = np.random.random((1000, 1000))
# Fast transfer to GPU
gpu_array = cp.asarray(np_array)
# Process on GPU
result = cp.fft.fft2(gpu_array)
# Fast transfer back to CPU
cpu_result = cp.asnumpy(result)import cupy as cp
# Create streams and events
stream1 = cp.cuda.Stream()
stream2 = cp.cuda.Stream()
event = cp.cuda.Event()
# Launch work in stream1
with stream1:
a = cp.random.random((5000, 5000))
b = cp.dot(a, a.T)
event.record() # Mark completion
# Wait for stream1 completion in stream2
with stream2:
stream2.wait_event(event) # Wait for event
c = cp.linalg.inv(b) # Depends on stream1 result
# Synchronize both streams
stream1.synchronize()
stream2.synchronize()Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda113