CuPy: NumPy & SciPy for GPU - A NumPy/SciPy-compatible array library for GPU-accelerated computing with Python, specifically built for CUDA 11.1
—
Direct CUDA functionality providing low-level GPU programming capabilities, memory management, device control, custom kernel integration, and asynchronous execution. This module bridges the gap between high-level array operations and CUDA's powerful parallel computing features.
Control and query CUDA devices for multi-GPU systems and device selection.
class Device:
"""
CUDA device representation and context management.
"""
def __init__(self, device=None):
"""
Initialize device object.
Parameters:
- device: int or Device, device ID or device object
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, *args):
"""Context manager exit."""
def use(self):
"""
Use this device in with statement.
Returns:
- context manager for device usage
"""
@property
def id(self):
"""Device ID."""
def synchronize(self):
"""Synchronize device."""
def get_device_id():
"""
Get current device ID.
Returns:
- int: Current device ID
"""
def is_available():
"""
Check if CUDA is available.
Returns:
- bool: True if CUDA is available
"""
def get_cublas_handle():
"""
Get cuBLAS handle for current device.
Returns:
- cuBLAS handle object
"""GPU memory allocation, pools, and efficient memory reuse strategies.
def alloc(size):
"""
Allocate GPU memory.
Parameters:
- size: int, number of bytes to allocate
Returns:
- MemoryPointer: Pointer to allocated memory
"""
def malloc_managed(size):
"""
Allocate managed (unified) memory.
Parameters:
- size: int, number of bytes to allocate
Returns:
- ManagedMemory: Managed memory object
"""
def malloc_async(size, stream=None):
"""
Allocate memory asynchronously.
Parameters:
- size: int, number of bytes to allocate
- stream: Stream, optional, CUDA stream for allocation
Returns:
- MemoryAsync: Asynchronously allocated memory
"""
class MemoryPointer:
"""
Pointer to device memory with automatic deallocation.
"""
def __init__(self, mem, owner):
"""
Initialize memory pointer.
Parameters:
- mem: raw memory pointer
- owner: memory owner object
"""
@property
def ptr(self):
"""Raw memory pointer address."""
@property
def size(self):
"""Memory size in bytes."""
def copy_from_device(self, src, size):
"""Copy from device memory."""
def copy_from_host(self, src, size):
"""Copy from host memory."""
def copy_to_host(self, dst, size):
"""Copy to host memory."""
def memset(self, value, size):
"""Set memory values."""
class MemoryPool:
"""
Memory pool for efficient GPU memory management.
"""
def __init__(self, allocator=None):
"""
Initialize memory pool.
Parameters:
- allocator: callable, custom allocator function
"""
def malloc(self, size):
"""
Allocate memory from pool.
Parameters:
- size: int, number of bytes
Returns:
- MemoryPointer: Allocated memory
"""
def free_all_blocks(self):
"""Free all allocated blocks."""
def free_all_free(self):
"""Free all free blocks."""
def used_bytes(self):
"""
Get used memory bytes.
Returns:
- int: Used memory in bytes
"""
def total_bytes(self):
"""
Get total allocated bytes.
Returns:
- int: Total memory in bytes
"""
def set_allocator(allocator):
"""
Set global memory allocator.
Parameters:
- allocator: callable or None, allocator function
"""
def get_allocator():
"""
Get current memory allocator.
Returns:
- callable: Current allocator function
"""Page-locked host memory for efficient CPU-GPU transfers.
def alloc_pinned_memory(size):
"""
Allocate pinned host memory.
Parameters:
- size: int, number of bytes to allocate
Returns:
- PinnedMemoryPointer: Pointer to pinned memory
"""
class PinnedMemoryPointer:
"""
Pointer to pinned host memory.
"""
def __init__(self, mem, size):
"""
Initialize pinned memory pointer.
Parameters:
- mem: raw memory pointer
- size: int, memory size in bytes
"""
@property
def ptr(self):
"""Raw memory pointer."""
@property
def size(self):
"""Memory size in bytes."""
class PinnedMemoryPool:
"""
Memory pool for pinned host memory.
"""
def malloc(self, size):
"""
Allocate pinned memory from pool.
Parameters:
- size: int, number of bytes
Returns:
- PinnedMemoryPointer: Allocated pinned memory
"""
def free_all_blocks(self):
"""Free all allocated blocks."""
def used_bytes(self):
"""
Used memory in bytes.
Returns:
- int: Used memory
"""
def total_bytes(self):
"""
Total allocated memory in bytes.
Returns:
- int: Total memory
"""
def set_pinned_memory_allocator(allocator):
"""
Set pinned memory allocator.
Parameters:
- allocator: callable or None, allocator function
"""Asynchronous execution control and synchronization primitives.
class Stream:
"""
CUDA stream for asynchronous operations.
"""
def __init__(self, null=False, non_blocking=False, ptds=False):
"""
Initialize CUDA stream.
Parameters:
- null: bool, use null stream
- non_blocking: bool, create non-blocking stream
- ptds: bool, per-thread default stream
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, *args):
"""Context manager exit."""
def use(self):
"""
Use stream in context manager.
Returns:
- context manager for stream usage
"""
def synchronize(self):
"""Wait for stream operations to complete."""
def add_callback(self, callback, arg=None):
"""
Add callback to stream.
Parameters:
- callback: callable, callback function
- arg: object, optional argument to callback
"""
@property
def ptr(self):
"""Raw CUDA stream pointer."""
class ExternalStream:
"""
Wrapper for externally created CUDA stream.
"""
def __init__(self, ptr):
"""
Initialize external stream.
Parameters:
- ptr: int, raw CUDA stream pointer
"""
def get_current_stream():
"""
Get current CUDA stream.
Returns:
- Stream: Current stream object
"""
class Event:
"""
CUDA event for synchronization and timing.
"""
def __init__(self, block=True, disable_timing=False, interprocess=False):
"""
Initialize CUDA event.
Parameters:
- block: bool, blocking event
- disable_timing: bool, disable timing measurement
- interprocess: bool, enable interprocess sharing
"""
def record(self, stream=None):
"""
Record event in stream.
Parameters:
- stream: Stream, optional, stream to record in
"""
def synchronize(self):
"""Wait for event completion."""
def elapsed_time(self, end_event):
"""
Compute elapsed time to another event.
Parameters:
- end_event: Event, end event
Returns:
- float: Elapsed time in milliseconds
"""
@property
def ptr(self):
"""Raw CUDA event pointer."""
def get_elapsed_time(start_event, end_event):
"""
Get elapsed time between events.
Parameters:
- start_event: Event, start event
- end_event: Event, end event
Returns:
- float: Elapsed time in milliseconds
"""Capture and replay sequences of operations for performance optimization.
class Graph:
"""
CUDA graph for capturing and replaying operation sequences.
"""
def __init__(self):
"""Initialize empty CUDA graph."""
def capture_begin(self, stream=None, mode='global'):
"""
Begin graph capture.
Parameters:
- stream: Stream, stream to capture
- mode: str, capture mode ('global', 'thread_local', 'relaxed')
"""
def capture_end(self, stream=None):
"""
End graph capture.
Parameters:
- stream: Stream, stream being captured
"""
def launch(self, stream=None):
"""
Launch captured graph.
Parameters:
- stream: Stream, stream to launch in
"""Integration of user-defined CUDA kernels for specialized computations.
class ElementwiseKernel:
"""
User-defined elementwise CUDA kernel.
"""
def __init__(self, in_params, out_params, operation, name='kernel', reduce_dims=True, options=(), loop_prep='', after_loop='', preamble='', **kwargs):
"""
Initialize elementwise kernel.
Parameters:
- in_params: str, input parameter declarations
- out_params: str, output parameter declarations
- operation: str, kernel operation code
- name: str, kernel name
- reduce_dims: bool, reduce dimensions automatically
- options: tuple, compiler options
- loop_prep: str, code before main loop
- after_loop: str, code after main loop
- preamble: str, code before kernel function
"""
def __call__(self, *args, **kwargs):
"""
Execute kernel with given arguments.
Parameters:
- *args: input and output arrays
- size: int, optional, number of elements to process
- stream: Stream, optional, execution stream
Returns:
- output arrays or None
"""
class ReductionKernel:
"""
User-defined reduction CUDA kernel.
"""
def __init__(self, in_params, out_params, map_expr, reduce_expr, post_map_expr='', identity=None, name='reduce_kernel', reduce_type=None, reduce_dims=True, options=(), preamble='', **kwargs):
"""
Initialize reduction kernel.
Parameters:
- in_params: str, input parameter declarations
- out_params: str, output parameter declarations
- map_expr: str, mapping expression
- reduce_expr: str, reduction expression
- post_map_expr: str, post-mapping expression
- identity: str, identity value for reduction
- name: str, kernel name
- reduce_type: type, reduction data type
- reduce_dims: bool, reduce dimensions
- options: tuple, compiler options
- preamble: str, preamble code
"""
def __call__(self, *args, **kwargs):
"""Execute reduction kernel."""
class RawKernel:
"""
User-defined raw CUDA kernel from source code.
"""
def __init__(self, code, name, options=(), backend='auto', translate_cucomplex=True, **kwargs):
"""
Initialize raw kernel from CUDA source.
Parameters:
- code: str, CUDA kernel source code
- name: str, kernel function name
- options: tuple, compilation options
- backend: str, compilation backend
- translate_cucomplex: bool, translate complex types
"""
def __call__(self, grid, block, *args, **kwargs):
"""
Launch kernel with specified grid and block dimensions.
Parameters:
- grid: tuple, grid dimensions
- block: tuple, block dimensions
- *args: kernel arguments
- shared_mem: int, shared memory size
- stream: Stream, execution stream
"""
class RawModule:
"""
CUDA module containing multiple kernels and functions.
"""
def __init__(self, code, options=(), backend='auto', translate_cucomplex=True, **kwargs):
"""
Initialize module from CUDA source code.
Parameters:
- code: str, CUDA module source code
- options: tuple, compilation options
- backend: str, compilation backend
- translate_cucomplex: bool, translate complex types
"""
def get_function(self, name):
"""
Get function from module.
Parameters:
- name: str, function name
Returns:
- Function: CUDA function object
"""
class Function:
"""
CUDA function from compiled module.
"""
def __call__(self, grid, block, *args, **kwargs):
"""
Launch function.
Parameters:
- grid: tuple, grid dimensions
- block: tuple, block dimensions
- *args: function arguments
- shared_mem: int, shared memory size
- stream: Stream, execution stream
"""
@property
def max_threads_per_block(self):
"""Maximum threads per block for this function."""
@property
def num_regs(self):
"""Number of registers used by function."""Dynamic CUDA code compilation and caching.
def compile_with_cache(source, options=(), arch=None, cache_dir=None, prepend_cupy_headers=True, backend='auto', translate_cucomplex=True, **kwargs):
"""
Compile CUDA source code with caching.
Parameters:
- source: str, CUDA source code
- options: tuple, compilation options
- arch: str, target architecture
- cache_dir: str, cache directory path
- prepend_cupy_headers: bool, add CuPy headers
- backend: str, compilation backend
- translate_cucomplex: bool, translate complex types
Returns:
- bytes: Compiled module binary
"""Convenient context managers for resource management.
def using_allocator(allocator=None):
"""
Context manager for temporary allocator change.
Parameters:
- allocator: callable or None, temporary allocator
Returns:
- context manager
"""
def profile():
"""
Context manager for CUDA profiling (deprecated).
Returns:
- context manager
"""System configuration and tool detection.
def get_cuda_path():
"""
Get CUDA installation path.
Returns:
- str: Path to CUDA installation
"""
def get_nvcc_path():
"""
Get nvcc compiler path.
Returns:
- str: Path to nvcc compiler
"""
def get_rocm_path():
"""
Get ROCm installation path.
Returns:
- str: Path to ROCm installation
"""
def get_hipcc_path():
"""
Get hipcc compiler path.
Returns:
- str: Path to hipcc compiler
"""Direct access to CUDA runtime and driver APIs.
# CUDA Runtime API
from cupy_backends.cuda.api import runtime
# CUDA Driver API
from cupy_backends.cuda.api import driver
# cuBLAS library
from cupy_backends.cuda.libs import cublas
# cuRAND library
from cupy_backends.cuda.libs import curand
# cuSOLVER library
from cupy_backends.cuda.libs import cusolver
# cuSPARSE library
from cupy_backends.cuda.libs import cusparse
# NVRTC (Runtime Compilation)
from cupy_backends.cuda.libs import nvrtc
# CUDA Profiler
from cupy_backends.cuda.libs import profilerimport cupy as cp
# Check CUDA availability
if cp.cuda.is_available():
print(f"CUDA devices available: {cp.cuda.runtime.getDeviceCount()}")
# Use specific device
with cp.cuda.Device(0):
# All operations use device 0
arr = cp.array([1, 2, 3, 4, 5])
result = cp.sum(arr)
# Memory pool management
mempool = cp.get_default_memory_pool()
print(f"Used memory: {mempool.used_bytes()} bytes")
print(f"Total memory: {mempool.total_bytes()} bytes")
# Free unused memory
mempool.free_all_free()import cupy as cp
# Create streams for async operations
stream1 = cp.cuda.Stream()
stream2 = cp.cuda.Stream()
# Async operations on different streams
with stream1:
a1 = cp.random.random((1000, 1000))
result1 = cp.matmul(a1, a1.T)
with stream2:
a2 = cp.random.random((1000, 1000))
result2 = cp.matmul(a2, a2.T)
# Synchronize streams
stream1.synchronize()
stream2.synchronize()
# Event-based synchronization
start_event = cp.cuda.Event()
end_event = cp.cuda.Event()
start_event.record()
# ... GPU operations ...
end_event.record()
# Measure elapsed time
end_event.synchronize()
elapsed_time = cp.cuda.get_elapsed_time(start_event, end_event)
print(f"Elapsed time: {elapsed_time} ms")import cupy as cp
# Elementwise kernel example
add_kernel = cp.ElementwiseKernel(
'float32 x, float32 y', # input parameters
'float32 z', # output parameters
'z = x + y * 2', # operation
'add_kernel' # kernel name
)
# Use the kernel
a = cp.array([1, 2, 3, 4], dtype=cp.float32)
b = cp.array([5, 6, 7, 8], dtype=cp.float32)
c = cp.empty_like(a)
add_kernel(a, b, c)
print("Custom kernel result:", c)
# Raw CUDA kernel
raw_kernel_code = '''
extern "C" __global__ void vector_add(float* a, float* b, float* c, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
c[idx] = a[idx] + b[idx];
}
}
'''
raw_kernel = cp.RawKernel(raw_kernel_code, 'vector_add')
# Launch raw kernel
n = 1000
a_gpu = cp.random.random(n, dtype=cp.float32)
b_gpu = cp.random.random(n, dtype=cp.float32)
c_gpu = cp.empty(n, dtype=cp.float32)
threads_per_block = 256
blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
raw_kernel((blocks_per_grid,), (threads_per_block,),
(a_gpu, b_gpu, c_gpu, n))import cupy as cp
import numpy as np
# Pinned memory for faster transfers
size = 10000000
pinned_mem = cp.cuda.alloc_pinned_memory(size * 4) # 4 bytes per float32
# Create numpy array using pinned memory
pinned_array = np.frombuffer(pinned_mem, dtype=np.float32).reshape(-1)
pinned_array[:] = np.random.random(size)
# Fast transfer from pinned memory to GPU
gpu_array = cp.asarray(pinned_array)
# Async transfer with streams
stream = cp.cuda.Stream()
with stream:
gpu_result = cp.sum(gpu_array)
# Transfer result back asynchronously
result_pinned = cp.cuda.pinned_memory.alloc_pinned_memory(4)
gpu_result.get(out=np.frombuffer(result_pinned, dtype=np.float32))import cupy as cp
# Capture operations in a graph
graph = cp.cuda.Graph()
stream = cp.cuda.Stream()
# Begin graph capture
graph.capture_begin(stream)
with stream:
# Operations to capture
a = cp.random.random((1000, 1000))
b = cp.random.random((1000, 1000))
c = cp.matmul(a, b)
result = cp.sum(c)
# End capture
graph.capture_end(stream)
# Launch graph multiple times (very efficient)
for _ in range(100):
graph.launch(stream)
stream.synchronize()Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda111