NumPy & SciPy for GPU - CUDA 11.0 compatible package providing GPU-accelerated computing with Python through a NumPy/SciPy-compatible array library
—
Low-level CUDA functionality providing direct access to GPU device management, memory allocation, stream control, and integration with CUDA libraries. Enables fine-grained control over GPU resources and execution.
Control and query GPU devices for multi-GPU computing.
class Device:
"""
CUDA device context manager.
Parameters:
- device: int or None, device ID to use
"""
def __init__(self, device=None): ...
def __enter__(self): ...
def __exit__(self, *args): ...
@property
def id(self) -> int:
"""Device ID."""
def synchronize(self):
"""Synchronize the device."""
def use(self):
"""Make this device current."""
def get_device_id() -> int:
"""Get current device ID."""
def set_device_id(device_id: int):
"""Set current device ID."""
def get_device_count() -> int:
"""Get number of available CUDA devices."""
def is_available() -> bool:
"""Check if CUDA is available."""
def get_compute_capability(device=None) -> tuple:
"""Get compute capability of device."""
def get_device_properties(device=None) -> dict:
"""Get properties of CUDA device."""Advanced GPU memory allocation and management with memory pools.
class MemoryPool:
"""
GPU memory pool for efficient allocation.
"""
def __init__(self): ...
def malloc(self, size: int):
"""
Allocate GPU memory.
Parameters:
- size: int, number of bytes to allocate
Returns:
MemoryPointer: Pointer to allocated memory
"""
def free_all_blocks(self):
"""Free all memory blocks in pool."""
def free_all_free_blocks(self):
"""Free all unused memory blocks."""
def get_limit(self) -> int:
"""Get memory pool size limit."""
def set_limit(self, size: int):
"""Set memory pool size limit."""
@property
def used_bytes(self) -> int:
"""Number of bytes currently in use."""
@property
def total_bytes(self) -> int:
"""Total number of bytes allocated."""
class PinnedMemoryPool:
"""
Pinned memory pool for CPU memory.
"""
def __init__(self): ...
def malloc(self, size: int): ...
def free_all_blocks(self): ...
class MemoryPointer:
"""
Pointer to GPU memory.
"""
def __init__(self, mem, offset): ...
@property
def device(self) -> Device: ...
@property
def ptr(self) -> int:
"""Raw pointer value."""
def copy_from_device(self, src, size): ...
def copy_from_host(self, src, size): ...
def copy_to_host(self, dst, size): ...
def get_allocator():
"""Get current memory allocator function."""
def set_allocator(allocator=None):
"""Set memory allocator function."""
def get_pinned_memory_allocator():
"""Get current pinned memory allocator."""
def set_pinned_memory_allocator(allocator=None):
"""Set pinned memory allocator function."""
def malloc(size: int) -> MemoryPointer:
"""Allocate GPU memory."""
def free(ptr: MemoryPointer):
"""Free GPU memory."""
def malloc_managed(size: int) -> MemoryPointer:
"""Allocate unified memory."""
def mem_info() -> tuple:
"""Get memory information (free, total)."""CUDA streams for asynchronous execution and memory transfers.
class Stream:
"""
CUDA stream for asynchronous execution.
Parameters:
- null: bool, create null stream
- non_blocking: bool, create non-blocking stream
- ptds: bool, per-thread default stream
"""
def __init__(self, null=False, non_blocking=False, ptds=False): ...
def __enter__(self): ...
def __exit__(self, *args): ...
def synchronize(self):
"""Synchronize stream execution."""
def add_callback(self, callback, arg=None):
"""Add callback to stream."""
def record(self, event=None):
"""Record event in stream."""
def wait_event(self, event):
"""Make stream wait for event."""
@property
def ptr(self) -> int:
"""Raw stream pointer."""
def get_current_stream() -> Stream:
"""Get current CUDA stream."""
def get_default_stream() -> Stream:
"""Get default CUDA stream."""CUDA events for synchronization and timing.
class Event:
"""
CUDA event for synchronization.
Parameters:
- blocking: bool, create blocking event
- disable_timing: bool, disable timing capability
- interprocess: bool, enable interprocess sharing
"""
def __init__(self, blocking=False, disable_timing=False, interprocess=False): ...
def record(self, stream=None):
"""Record event in stream."""
def synchronize(self):
"""Synchronize on event."""
def elapsed_time(self, end_event) -> float:
"""Compute elapsed time to another event."""
@property
def ptr(self) -> int:
"""Raw event pointer."""
def synchronize():
"""Synchronize all CUDA operations."""Access to major CUDA libraries for specialized computations.
# cuBLAS - Basic Linear Algebra Subprograms
class cublas:
"""cuBLAS library interface."""
@staticmethod
def getVersion() -> int: ...
@staticmethod
def create() -> int: ...
@staticmethod
def destroy(handle: int): ...
# cuSOLVER - Dense and Sparse Linear Algebra
class cusolver:
"""cuSOLVER library interface."""
@staticmethod
def getVersion() -> tuple: ...
# cuSPARSE - Sparse Matrix Operations
class cusparse:
"""cuSPARSE library interface."""
@staticmethod
def getVersion() -> int: ...
# cuRAND - Random Number Generation
class curand:
"""cuRAND library interface."""
@staticmethod
def getVersion() -> int: ...
# cuFFT - Fast Fourier Transform
class cufft:
"""cuFFT library interface."""
@staticmethod
def getVersion() -> int: ...
# NCCL - Collective Communications
class nccl:
"""NCCL library interface."""
@staticmethod
def get_version() -> int: ...Query CUDA runtime and driver information.
def get_cuda_path() -> str:
"""Get CUDA installation path."""
def get_nvcc_path() -> str:
"""Get nvcc compiler path."""
def runtime_version() -> int:
"""Get CUDA runtime version."""
def driver_version() -> int:
"""Get CUDA driver version."""
def get_local_mem_info() -> dict:
"""Get local memory information."""
def get_memory_info() -> tuple:
"""Get device memory information."""import cupy as cp
# Check CUDA availability
if cp.cuda.is_available():
print(f"CUDA devices available: {cp.cuda.get_device_count()}")
# Use specific device
with cp.cuda.Device(0):
# Operations run on device 0
data = cp.zeros((1000, 1000))
result = cp.sum(data)
# Switch devices
cp.cuda.set_device_id(1)
data_dev1 = cp.ones((500, 500))# Use custom memory pool
memory_pool = cp.get_default_memory_pool()
pinned_memory_pool = cp.get_default_pinned_memory_pool()
# Monitor memory usage
print(f"Used: {memory_pool.used_bytes()} bytes")
print(f"Total: {memory_pool.total_bytes()} bytes")
# Set memory limit
memory_pool.set_limit(size=2**30) # 1GB limit
# Free unused memory
memory_pool.free_all_free_blocks()
# Direct memory allocation
ptr = cp.cuda.malloc(1024) # Allocate 1KB
cp.cuda.free(ptr) # Free memory# Create streams for concurrent execution
stream1 = cp.cuda.Stream()
stream2 = cp.cuda.Stream()
# Asynchronous operations
with stream1:
data1 = cp.random.random((1000, 1000))
result1 = cp.dot(data1, data1.T)
with stream2:
data2 = cp.random.random((1000, 1000))
result2 = cp.linalg.svd(data2)
# Synchronize streams
stream1.synchronize()
stream2.synchronize()
# Event-based synchronization
event = cp.cuda.Event()
with stream1:
event.record()
with stream2:
stream2.wait_event(event) # Wait for stream1# Time operations using events
start_event = cp.cuda.Event()
end_event = cp.cuda.Event()
start_event.record()
# GPU operations
data = cp.random.random((5000, 5000))
result = cp.linalg.inv(data)
end_event.record()
end_event.synchronize()
elapsed_time = cp.cuda.get_elapsed_time(start_event, end_event)
print(f"Operation took {elapsed_time:.2f} ms")# Pinned memory for faster transfers
pinned_array = cp.cuda.PinnedMemoryPool().malloc(1024)
# Asynchronous memory transfers
cpu_data = np.random.random((1000, 1000))
gpu_data = cp.asarray(cpu_data) # CPU to GPU
# Transfer back to CPU asynchronously
stream = cp.cuda.Stream()
cpu_result = cp.asnumpy(gpu_data, stream=stream)
stream.synchronize()# Distribute computation across multiple GPUs
n_devices = cp.cuda.get_device_count()
if n_devices > 1:
# Split work across devices
data_size = 10000
chunk_size = data_size // n_devices
results = []
streams = []
for device_id in range(n_devices):
with cp.cuda.Device(device_id):
stream = cp.cuda.Stream()
streams.append(stream)
with stream:
start = device_id * chunk_size
end = start + chunk_size
chunk = cp.arange(start, end)
result = cp.sum(chunk ** 2)
results.append(result)
# Synchronize all devices
for stream in streams:
stream.synchronize()
# Combine results
total_result = sum(cp.asnumpy(r) for r in results)Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda110