CuPy: NumPy & SciPy for GPU - A NumPy/SciPy-compatible array library for GPU-accelerated computing with Python, specifically built for AMD ROCm 4.3 platform
—
Extended functionality beyond NumPy compatibility including SciPy-compatible functions, JIT compilation, optimization utilities, and specialized GPU algorithms. Provides advanced features for high-performance computing.
Specialized operations not available in standard NumPy.
def scatter_add(a, indices, updates, axis=None):
"""
Add updates to array at specified indices.
Parameters:
- a: cupy.ndarray, target array to update
- indices: cupy.ndarray, indices where updates are applied
- updates: cupy.ndarray, values to add
- axis: int or None, axis along which to scatter
Returns:
cupy.ndarray: Array with scattered additions
"""
def scatter_max(a, indices, updates, axis=None):
"""Apply element-wise maximum at scattered indices."""
def scatter_min(a, indices, updates, axis=None):
"""Apply element-wise minimum at scattered indices."""
def rsqrt(x):
"""
Reciprocal square root (1/sqrt(x)).
Parameters:
- x: array-like, input array with positive values
Returns:
cupy.ndarray: Reciprocal square root of each element
"""Control floating-point error handling behavior.
def errstate(**kwargs):
"""
Context manager for floating-point error handling.
Parameters:
- all: str, set behavior for all error types
- divide: str, behavior for division by zero
- over: str, behavior for overflow
- under: str, behavior for underflow
- invalid: str, behavior for invalid operations
Error behaviors: 'ignore', 'warn', 'raise', 'call', 'print', 'log'
Usage:
with cupyx.errstate(divide='ignore'):
result = a / b # Division by zero won't raise error
"""
def geterr():
"""
Get current error handling behavior.
Returns:
dict: Current error handling settings
"""
def seterr(**kwargs):
"""
Set error handling behavior.
Returns:
dict: Previous error handling settings
"""Control when GPU operations synchronize with CPU.
def allow_synchronize(allow=True):
"""
Context manager to control synchronization behavior.
Parameters:
- allow: bool, whether to allow synchronization
Usage:
with cupyx.allow_synchronize(False):
# Operations run asynchronously
result = cupy.matmul(a, b)
"""
class DeviceSynchronized:
"""Context manager for device synchronization."""
def __enter__(self):
"""Enter synchronized context."""
def __exit__(self, *args):
"""Exit synchronized context."""Create arrays in pinned host memory for faster GPU transfers.
def empty_pinned(shape, dtype=float, order='C'):
"""
Create empty array in pinned host memory.
Parameters:
- shape: int or tuple, array shape
- dtype: data type, array data type
- order: {'C', 'F'}, memory layout
Returns:
numpy.ndarray: Pinned memory array
"""
def empty_like_pinned(a, dtype=None, order='K', subok=True, shape=None):
"""Create empty pinned array with same shape as existing array."""
def zeros_pinned(shape, dtype=float, order='C'):
"""Create zeros array in pinned host memory."""
def zeros_like_pinned(a, dtype=None, order='K', subok=True, shape=None):
"""Create zeros pinned array with same shape as existing array."""Create custom ufuncs with advanced broadcasting and type handling.
class GeneralizedUFunc:
"""
Create generalized universal function.
Parameters:
- definition: str, function signature and operation
- name: str, function name
- doc: str, documentation string
"""
def __init__(self, definition, name=None, doc=None): ...
def __call__(self, *args, **kwargs):
"""Execute generalized ufunc."""Get detailed information about CuPy runtime environment.
def get_runtime_info(full=False):
"""
Get CuPy runtime information.
Parameters:
- full: bool, include detailed information
Returns:
str: Runtime information including CUDA version, device info, memory usage
"""GPU-accelerated versions of SciPy functionality.
def get_array_module(*args):
"""
Get appropriate array module for SciPy functions.
Returns:
module: cupyx.scipy if CuPy arrays present, otherwise scipy
"""Just-in-time compilation for custom GPU kernels.
def rawkernel(func=None, *, device=False):
"""
Decorator for JIT compilation of raw CUDA kernels.
Parameters:
- func: function, kernel function to compile
- device: bool, whether this is a device function
Usage:
@cupyx.jit.rawkernel
def my_kernel(x, y, size):
tid = jit.threadIdx.x + jit.blockIdx.x * jit.blockDim.x
if tid < size:
y[tid] = x[tid] * 2
"""
# CUDA threading model access
threadIdx: object # Thread index within block
blockDim: object # Block dimensions
blockIdx: object # Block index within grid
gridDim: object # Grid dimensions
warpsize: int # Warp size constant
# Built-in functions for JIT kernels
def syncthreads():
"""Synchronize threads in block."""
def syncwarp(mask=0xffffffff):
"""Synchronize threads in warp."""
def range(start, stop=None, step=None):
"""Range function for JIT kernels."""
# Atomic operations
def atomic_add(array, index, value):
"""Atomic addition."""
def atomic_sub(array, index, value):
"""Atomic subtraction."""
def atomic_max(array, index, value):
"""Atomic maximum."""
def atomic_min(array, index, value):
"""Atomic minimum."""
def atomic_cas(array, index, compare, value):
"""Atomic compare-and-swap."""Performance profiling and benchmarking tools.
def profile():
"""
Context manager for CUDA profiling.
Usage:
with cupyx.profiler.profile():
# Code to profile
result = cupy.matmul(a, b)
"""
def benchmark(func, args=(), kwargs=None, n_warmup=1, n_repeat=1, name=None, n_sync=1):
"""
Benchmark function performance.
Parameters:
- func: callable, function to benchmark
- args: tuple, function arguments
- kwargs: dict, function keyword arguments
- n_warmup: int, number of warmup runs
- n_repeat: int, number of timing runs
- name: str, benchmark name
- n_sync: int, number of synchronizations per run
Returns:
dict: Timing statistics
"""
def time_range(message=None, color_id=None, *, sync=False):
"""
Context manager for timing code ranges.
Parameters:
- message: str, range description
- color_id: int, color for profiler display
- sync: bool, synchronize before timing
"""import cupy as cp
import cupyx
# Scatter operations for sparse updates
indices = cp.array([0, 2, 4, 6, 8])
updates = cp.array([10, 20, 30, 40, 50])
target = cp.zeros(10)
# Add updates at specified indices
result = cupyx.scatter_add(target, indices, updates)
print(result) # [10, 0, 20, 0, 30, 0, 40, 0, 50, 0]
# Reciprocal square root (common in ML)
x = cp.array([1.0, 4.0, 9.0, 16.0])
rsqrt_result = cupyx.rsqrt(x) # [1.0, 0.5, 0.333, 0.25]import cupy as cp
import cupyx
# Handle division by zero gracefully
a = cp.array([1.0, 2.0, 3.0])
b = cp.array([1.0, 0.0, 3.0])
# Without error handling (would raise warning)
# result = a / b
# With error handling
with cupyx.errstate(divide='ignore', invalid='ignore'):
result = a / b # [1.0, inf, 1.0] - no warning
# Check current error state
current_settings = cupyx.geterr()
print(current_settings)import cupy as cp
import cupyx
import numpy as np
# Create pinned memory arrays for faster CPU-GPU transfers
pinned_array = cupyx.zeros_pinned((1000, 1000), dtype=np.float32)
# Fill with data (on CPU)
pinned_array[:] = np.random.rand(1000, 1000).astype(np.float32)
# Fast transfer to GPU
gpu_array = cp.asarray(pinned_array)
# Process on GPU
result = cp.matmul(gpu_array, gpu_array.T)
# Fast transfer back to pinned memory
result_pinned = cupyx.zeros_like_pinned(pinned_array)
result_pinned[:] = cp.asnumpy(result)import cupy as cp
import cupyx.jit as jit
# JIT-compiled custom kernel
@jit.rawkernel()
def elementwise_multiply(x, y, out, size):
"""Custom element-wise multiplication kernel."""
tid = jit.threadIdx.x + jit.blockIdx.x * jit.blockDim.x
if tid < size:
out[tid] = x[tid] * y[tid]
# Use JIT kernel
a = cp.random.rand(1000000)
b = cp.random.rand(1000000)
result = cp.zeros_like(a)
# Launch kernel
threads_per_block = 256
blocks_per_grid = (len(a) + threads_per_block - 1) // threads_per_block
elementwise_multiply[blocks_per_grid, threads_per_block](a, b, result, len(a))
# More advanced JIT kernel with shared memory
@jit.rawkernel()
def block_sum(data, output, n):
"""Sum elements within each block using shared memory."""
# Shared memory declaration
shared = jit.shared_memory.array(256, jit.float32)
tid = jit.threadIdx.x
bid = jit.blockIdx.x
idx = bid * jit.blockDim.x + tid
# Load data into shared memory
if idx < n:
shared[tid] = data[idx]
else:
shared[tid] = 0.0
jit.syncthreads()
# Parallel reduction
s = jit.blockDim.x // 2
while s > 0:
if tid < s:
shared[tid] += shared[tid + s]
jit.syncthreads()
s //= 2
# Write result
if tid == 0:
output[bid] = shared[0]import cupy as cp
import cupyx.profiler as profiler
# Benchmark different implementations
def matmul_standard(a, b):
return cp.matmul(a, b)
def matmul_dot(a, b):
return cp.dot(a, b)
# Setup test data
a = cp.random.rand(1000, 1000)
b = cp.random.rand(1000, 1000)
# Benchmark both implementations
stats1 = profiler.benchmark(matmul_standard, (a, b), n_repeat=10)
stats2 = profiler.benchmark(matmul_dot, (a, b), n_repeat=10)
print(f"matmul: {stats1['mean']:.4f} ms")
print(f"dot: {stats2['mean']:.4f} ms")
# Profile code sections
with profiler.profile():
with profiler.time_range("Matrix multiply"):
result1 = cp.matmul(a, b)
with profiler.time_range("SVD decomposition"):
u, s, vh = cp.linalg.svd(result1)
with profiler.time_range("Eigenvalue computation"):
eigenvals = cp.linalg.eigvals(result1 @ result1.T)import cupyx
# Get basic runtime info
info = cupyx.get_runtime_info()
print(info)
# Get detailed runtime info
detailed_info = cupyx.get_runtime_info(full=True)
print(detailed_info)
# Example output includes:
# - CUDA version
# - Device properties
# - Memory information
# - Library versions
# - Compilation settingsInstall with Tessl CLI
npx tessl i tessl/pypi-cupy-rocm-4-3