NumPy & SciPy-compatible array library for GPU-accelerated computing with Python
—
Low-level CUDA functionality for memory allocation, device management, and stream operations. These features enable fine-grained control over GPU resources and memory optimization for high-performance computing applications.
Control and query CUDA devices and contexts.
class Device:
"""
CUDA device context manager.
Parameters:
- device: int or None, device ID to use (None for current)
"""
def __init__(self, device=None): ...
def __enter__(self): ...
def __exit__(self, *args): ...
def use(self): ...
def synchronize(self): ...
@property
def id(self): ...
def get_device_id():
"""
Get current device ID.
Returns:
int: Current CUDA device ID
"""
def is_available():
"""
Check if CUDA is available.
Returns:
bool: True if CUDA devices are available
"""
def get_local_runtime_version():
"""
Get local CUDA runtime version.
Returns:
int: CUDA runtime version
"""
def get_cublas_handle():
"""
Get cuBLAS handle for current device.
Returns:
int: cuBLAS handle
"""GPU memory allocation and management with automatic pooling.
def alloc(size):
"""
Allocate device memory.
Parameters:
- size: int, size in bytes to allocate
Returns:
Memory: Memory object wrapping allocated GPU memory
"""
def malloc_managed(size):
"""
Allocate managed (unified) memory.
Parameters:
- size: int, size in bytes to allocate
Returns:
ManagedMemory: Managed memory object accessible from CPU and GPU
"""
def malloc_async(size, stream=None):
"""
Allocate memory asynchronously.
Parameters:
- size: int, size in bytes to allocate
- stream: Stream or None, CUDA stream for allocation
Returns:
MemoryAsync: Asynchronous memory object
"""
class Memory:
"""Device memory object."""
@property
def ptr(self): ...
@property
def size(self): ...
def __int__(self): ...
class ManagedMemory:
"""Managed memory object accessible from CPU and GPU."""
@property
def ptr(self): ...
@property
def size(self): ...
class MemoryAsync:
"""Asynchronous memory object."""
@property
def ptr(self): ...
@property
def size(self): ...
class MemoryPointer:
"""
Pointer to device memory with automatic memory management.
Parameters:
- mem: Memory, underlying memory object
- offset: int, offset in bytes from memory start
"""
def __init__(self, mem, offset): ...
@property
def ptr(self): ...
@property
def size(self): ...
class UnownedMemory:
"""Wrapper for externally managed memory."""
def __init__(self, ptr, size, owner): ...Efficient memory allocation through pooling to reduce allocation overhead.
class MemoryPool:
"""
Memory pool for device memory allocation.
Parameters:
- allocator: function or None, custom allocator function
"""
def __init__(self, allocator=None): ...
def malloc(self, size):
"""
Allocate memory from pool.
Parameters:
- size: int, size in bytes
Returns:
MemoryPointer: Pointer to allocated memory
"""
def free_all_blocks(self):
"""Free all allocated blocks in pool."""
def free_all_free(self):
"""Free all currently unused blocks."""
def n_free_blocks(self):
"""
Number of free blocks.
Returns:
int: Number of free blocks
"""
def used_bytes(self):
"""
Total bytes in use.
Returns:
int: Bytes currently allocated
"""
def free_bytes(self):
"""
Total bytes in free blocks.
Returns:
int: Bytes in free blocks
"""
def total_bytes(self):
"""
Total bytes managed by pool.
Returns:
int: Total bytes (used + free)
"""
class MemoryAsyncPool:
"""Asynchronous memory pool."""
def __init__(self, allocator=None): ...
def malloc(self, size, stream=None): ...
def free_all_blocks(self): ...
def get_default_memory_pool():
"""
Get default GPU memory pool.
Returns:
MemoryPool: Default memory pool for current device
"""
def get_default_pinned_memory_pool():
"""
Get default pinned memory pool.
Returns:
PinnedMemoryPool: Default pinned memory pool
"""Custom memory allocation strategies.
class PythonFunctionAllocator:
"""
Python function-based memory allocator.
Parameters:
- func: function, allocator function taking size and returning Memory
"""
def __init__(self, func): ...
class CFunctionAllocator:
"""
C function-based memory allocator.
Parameters:
- intptr: int, pointer to C allocator function
"""
def __init__(self, intptr): ...
def set_allocator(allocator):
"""
Set thread-local memory allocator.
Parameters:
- allocator: function or None, allocator function
"""
def get_allocator():
"""
Get current thread-local allocator.
Returns:
function: Current allocator function
"""
def using_allocator(allocator=None):
"""
Context manager for temporary allocator.
Parameters:
- allocator: function or None, temporary allocator
Returns:
context manager: Restores previous allocator on exit
"""Host memory that can be accessed efficiently by GPU.
def alloc_pinned_memory(size):
"""
Allocate pinned host memory.
Parameters:
- size: int, size in bytes to allocate
Returns:
PinnedMemory: Pinned memory object
"""
class PinnedMemory:
"""Pinned host memory object."""
@property
def ptr(self): ...
@property
def size(self): ...
def __int__(self): ...
class PinnedMemoryPointer:
"""
Pointer to pinned memory.
Parameters:
- mem: PinnedMemory, underlying memory object
- offset: int, offset in bytes
"""
def __init__(self, mem, offset): ...
@property
def ptr(self): ...
@property
def size(self): ...
class PinnedMemoryPool:
"""
Memory pool for pinned memory allocation.
Parameters:
- allocator: function or None, custom allocator
"""
def __init__(self, allocator=None): ...
def malloc(self, size): ...
def free_all_blocks(self): ...
def used_bytes(self): ...
def free_bytes(self): ...
def total_bytes(self): ...
def set_pinned_memory_allocator(allocator):
"""
Set pinned memory allocator.
Parameters:
- allocator: function or None, allocator function
"""CUDA streams for asynchronous execution and memory operations.
class Stream:
"""
CUDA stream for asynchronous operations.
Parameters:
- null: bool, whether to use null (default) stream
- non_blocking: bool, whether stream can run concurrently with null stream
- priority: int, stream priority (lower = higher priority)
"""
def __init__(self, null=False, non_blocking=False, priority=0): ...
def synchronize(self):
"""Wait for all operations in stream to complete."""
def add_callback(self, callback, arg):
"""
Add callback to be called when stream operations complete.
Parameters:
- callback: function, callback function
- arg: object, argument to pass to callback
"""
def record(self, event=None):
"""
Record event in stream.
Parameters:
- event: Event or None, event to record
Returns:
Event: Recorded event
"""
def wait_event(self, event):
"""
Make stream wait for event.
Parameters:
- event: Event, event to wait for
"""
@property
def ptr(self): ...
class ExternalStream:
"""
Wrapper for externally created CUDA stream.
Parameters:
- ptr: int, pointer to existing CUDA stream
"""
def __init__(self, ptr): ...
def synchronize(self): ...
@property
def ptr(self): ...
def get_current_stream():
"""
Get current CUDA stream.
Returns:
Stream: Current stream for active device
"""CUDA events for synchronization and timing.
class Event:
"""
CUDA event for synchronization and timing.
Parameters:
- block: bool, whether to block host thread
- disable_timing: bool, whether to disable timing capability
- interprocess: bool, whether event can be shared between processes
"""
def __init__(self, block=False, disable_timing=False, interprocess=False): ...
def record(self, stream=None):
"""
Record event in stream.
Parameters:
- stream: Stream or None, stream to record in
"""
def synchronize(self):
"""Wait for event to complete."""
def query(self):
"""
Query event completion status.
Returns:
bool: True if event has completed
"""
@property
def ptr(self): ...
def get_elapsed_time(start_event, end_event):
"""
Get elapsed time between events.
Parameters:
- start_event: Event, start event
- end_event: Event, end event
Returns:
float: Elapsed time in milliseconds
"""Capture and replay sequences of CUDA operations.
class Graph:
"""CUDA graph for capturing and replaying operation sequences."""
def __init__(self): ...
def capture_begin(self, stream=None):
"""
Begin capturing operations into graph.
Parameters:
- stream: Stream or None, stream to capture
"""
def capture_end(self, stream=None):
"""
End capturing operations.
Parameters:
- stream: Stream or None, stream that was captured
"""
def launch(self, stream=None):
"""
Launch (replay) captured graph.
Parameters:
- stream: Stream or None, stream to launch in
"""High-level utilities for CPU-GPU data transfer.
def asnumpy(a, stream=None, order='C', out=None, *, blocking=True):
"""
Transfer CuPy array to NumPy array on CPU.
Parameters:
- a: cupy.ndarray, GPU array to transfer
- stream: Stream or None, CUDA stream for async transfer
- order: {'C', 'F', 'A'}, memory layout of result
- out: numpy.ndarray or None, pre-allocated output array
- blocking: bool, whether to block until transfer complete
Returns:
numpy.ndarray: CPU array with copied data
"""
def get_array_module(*args):
"""
Get appropriate array module (cupy or numpy) based on input types.
Parameters:
- args: array-like objects to check
Returns:
module: cupy if any arg is CuPy array, otherwise numpy
"""import cupy as cp
# Device management
print(f"CUDA available: {cp.cuda.is_available()}")
print(f"Current device: {cp.cuda.get_device_id()}")
# Switch devices
with cp.cuda.Device(1): # Use device 1 within context
arr = cp.zeros((1000, 1000))
print(f"Array on device: {arr.device.id}")
# Memory pool management
pool = cp.get_default_memory_pool()
print(f"Memory usage: {pool.used_bytes()} bytes")
print(f"Free blocks: {pool.n_free_blocks()}")
# Free unused memory
pool.free_all_free()# Custom allocator for memory tracking
def tracking_allocator(size):
print(f"Allocating {size} bytes")
return cp.cuda.alloc(size)
# Use custom allocator temporarily
with cp.cuda.using_allocator(tracking_allocator):
arr = cp.ones((1000, 1000)) # Will print allocation size
# Pinned memory for faster transfers
pinned_mem = cp.cuda.alloc_pinned_memory(1000 * 8) # 1000 float64s
gpu_arr = cp.zeros(1000)
# Async memory allocation (when supported)
stream = cp.cuda.Stream()
async_mem = cp.cuda.malloc_async(1000 * 4, stream)# Create streams for concurrent execution
stream1 = cp.cuda.Stream()
stream2 = cp.cuda.Stream()
# Perform operations on different streams
with stream1:
arr1 = cp.random.random((1000, 1000))
result1 = cp.dot(arr1, arr1)
with stream2:
arr2 = cp.random.random((1000, 1000))
result2 = cp.dot(arr2, arr2)
# Synchronize streams
stream1.synchronize()
stream2.synchronize()
# Events for timing and synchronization
start_event = cp.cuda.Event()
end_event = cp.cuda.Event()
start_event.record()
# ... GPU operations ...
end_event.record()
end_event.synchronize()
elapsed_time = cp.cuda.get_elapsed_time(start_event, end_event)
print(f"Operation took {elapsed_time:.2f} ms")# Asynchronous transfers with streams
stream = cp.cuda.Stream()
# CPU array
cpu_data = np.random.random((10000, 1000))
# Transfer to GPU asynchronously
gpu_data = cp.asarray(cpu_data) # Synchronous by default
# For truly async transfer, use lower-level operations
gpu_buffer = cp.empty_like(cpu_data)
# ... use CUDA runtime API for async memcpy ...
# Transfer results back to CPU
with stream:
result_gpu = cp.dot(gpu_data, gpu_data.T)
# Async transfer back (non-blocking)
result_cpu = cp.asnumpy(result_gpu, stream=stream, blocking=False)
stream.synchronize() # Wait for completion# Monitor memory usage
def print_memory_info():
pool = cp.get_default_memory_pool()
print(f"Used: {pool.used_bytes() / 1e9:.2f} GB")
print(f"Free: {pool.free_bytes() / 1e9:.2f} GB")
print_memory_info()
# Large computation with memory management
for i in range(100):
# Large temporary arrays
temp = cp.random.random((5000, 5000))
result = cp.dot(temp, temp)
# Explicit cleanup every 10 iterations
if i % 10 == 0:
del temp, result
cp.get_default_memory_pool().free_all_free()
print_memory_info()
# Use memory mapping for very large datasets
# (requires careful memory management)Install with Tessl CLI
npx tessl i tessl/pypi-cupy