CuPy: NumPy & SciPy-compatible array library for GPU-accelerated computing with Python that provides a drop-in replacement for NumPy/SciPy on NVIDIA CUDA platforms.
—
Additional functionality through CuPy-X including SciPy compatibility, JIT compilation, specialized operations, and advanced GPU programming features. These extensions provide enhanced capabilities beyond core CuPy functionality.
CuPy-specific mathematical functions and runtime utilities.
def rsqrt(x, out=None):
"""Reciprocal square root function: 1 / sqrt(x).
Optimized GPU implementation providing better performance
than computing reciprocal and square root separately.
Parameters:
- x: array-like, input array
- out: ndarray, output array
Returns:
cupy.ndarray: reciprocal square root of input
"""
def get_runtime_info(full=False):
"""Get CuPy runtime information.
Parameters:
- full: bool, whether to include detailed information
Returns:
dict: runtime information including CUDA version, device details
"""Efficient scatter operations for advanced indexing and updates.
def scatter_add(a, indices, b, axis=None):
"""Add values to array at specified indices.
Performs atomic addition operations, making it safe for
concurrent updates to the same memory locations.
Parameters:
- a: ndarray, destination array to be updated
- indices: array-like, indices where values should be added
- b: array-like, values to add
- axis: int, axis along which to scatter
Returns:
None: modifies array a in-place
"""
def scatter_max(a, indices, b, axis=None):
"""Take element-wise maximum between array values and scattered values.
Parameters:
- a: ndarray, destination array to be updated
- indices: array-like, indices where comparison should occur
- b: array-like, values to compare
- axis: int, axis along which to scatter
Returns:
None: modifies array a in-place
"""
def scatter_min(a, indices, b, axis=None):
"""Take element-wise minimum between array values and scattered values.
Parameters:
- a: ndarray, destination array to be updated
- indices: array-like, indices where comparison should occur
- b: array-like, values to compare
- axis: int, axis along which to scatter
Returns:
None: modifies array a in-place
"""Arrays in host memory that are accessible to GPU for efficient transfers.
def empty_pinned(shape, dtype=float64, order='C'):
"""Return new pinned array with uninitialized entries.
Pinned memory enables faster transfers between CPU and GPU
and allows for asynchronous memory operations.
Parameters:
- shape: int or sequence of ints, shape of array
- dtype: data-type, type of array elements
- order: memory layout order ('C' or 'F')
Returns:
numpy.ndarray: array in pinned host memory
"""
def zeros_pinned(shape, dtype=float64, order='C'):
"""Return new pinned array of zeros.
Parameters:
- shape: int or sequence of ints, shape of array
- dtype: data-type, type of array elements
- order: memory layout order
Returns:
numpy.ndarray: zeros array in pinned host memory
"""
def empty_like_pinned(a, dtype=None, order='K', subok=True, shape=None):
"""Return new pinned array with same shape and type as given array."""
def zeros_like_pinned(a, dtype=None, order='K', subok=True, shape=None):
"""Return pinned array of zeros with same shape and type as given array."""Control floating-point error handling behavior.
def errstate(**kwargs):
"""Context manager for floating-point error handling.
Temporarily override how floating-point errors are handled,
including division by zero, overflow, underflow, and invalid operations.
Parameters:
- all: str, set behavior for all error types
- divide: str, behavior for division by zero
- over: str, behavior for overflow
- under: str, behavior for underflow
- invalid: str, behavior for invalid operations
Error behaviors: 'ignore', 'warn', 'raise', 'call', 'print', 'log'
Returns:
context manager
"""
def geterr():
"""Get current floating-point error handling settings.
Returns:
dict: current error handling settings for each error type
"""
def seterr(all=None, divide=None, over=None, under=None, invalid=None):
"""Set floating-point error handling behavior.
Parameters:
- all: str, set behavior for all error types
- divide: str, behavior for division by zero
- over: str, behavior for overflow
- under: str, behavior for underflow
- invalid: str, behavior for invalid operations
Returns:
dict: previous error handling settings
"""Advanced control over GPU synchronization behavior.
def allow_synchronize(allow=True):
"""Context manager to control synchronization behavior.
Allows fine-grained control over when GPU operations
synchronize with CPU, useful for performance optimization.
Parameters:
- allow: bool, whether to allow synchronization
Returns:
context manager
"""
class DeviceSynchronized:
"""Device synchronization manager for performance control.
Provides advanced synchronization control for multi-GPU
applications and performance-critical sections.
"""
def __enter__(self):
"""Enter synchronized section."""
def __exit__(self, *args):
"""Exit synchronized section."""Just-in-time compilation capabilities for dynamic kernel generation.
def rawkernel(mode='python', device=False):
"""Decorator for creating raw kernels from Python functions.
Enables writing CUDA kernels using Python syntax with JIT
compilation to optimized GPU code.
Parameters:
- mode: str, compilation mode ('python', 'cuda')
- device: bool, whether function runs on device
Returns:
callable: decorated kernel function
"""
# CUDA Built-in Types and Functions
threadIdx = None # Thread index within block (x, y, z components)
blockDim = None # Block dimensions (x, y, z components)
blockIdx = None # Block index within grid (x, y, z components)
gridDim = None # Grid dimensions (x, y, z components)
def syncthreads():
"""Synchronize all threads within a CUDA block.
Ensures all threads in the current block reach this point
before any thread continues execution.
"""
def shared_memory(dtype, shape):
"""Allocate shared memory array within CUDA block.
Parameters:
- dtype: data type of shared memory elements
- shape: int or tuple, shape of shared memory array
Returns:
shared memory array accessible to all threads in block
"""
def atomic_add(array, index, value):
"""Perform atomic addition operation.
Safely adds value to array[index] in a thread-safe manner,
preventing race conditions in parallel execution.
Parameters:
- array: target array
- index: index to update
- value: value to add
Returns:
previous value at array[index]
"""Comprehensive SciPy-compatible functionality with GPU acceleration.
def get_array_module(*args):
"""Get appropriate array module (NumPy or CuPy) for arguments.
Automatically selects the correct module based on input types,
enabling generic code that works with both CPU and GPU arrays.
Parameters:
- args: array arguments to analyze
Returns:
module: cupy or numpy module appropriate for inputs
"""GPU-accelerated sparse matrix operations and formats.
class spmatrix:
"""Base class for sparse matrices.
Provides common interface for all sparse matrix formats
with GPU-accelerated operations and memory efficiency.
"""
class csr_matrix:
"""Compressed Sparse Row matrix format.
Efficient sparse matrix format for arithmetic operations
and fast row slicing, optimized for GPU computation.
"""
def __init__(self, arg1, shape=None, dtype=None, copy=False):
"""Initialize CSR sparse matrix.
Parameters:
- arg1: array-like, sparse matrix data
- shape: tuple, matrix dimensions
- dtype: data type
- copy: bool, whether to copy data
"""
class csc_matrix:
"""Compressed Sparse Column matrix format.
Efficient for column slicing and matrix-vector products
where the matrix is accessed column-wise.
"""
class coo_matrix:
"""COOrdinate sparse matrix format.
Efficient for sparse matrix construction and format conversion,
stores (row, col, data) triplets.
"""
class dia_matrix:
"""DIAgonal sparse matrix format.
Efficient storage for matrices with few diagonals,
optimized for diagonal matrix operations.
"""
def issparse(x):
"""Check if input is a sparse matrix.
Parameters:
- x: input object to check
Returns:
bool: True if x is a sparse matrix
"""
def eye(m, n=None, k=0, dtype=float, format=None):
"""Create sparse identity matrix.
Parameters:
- m: int, number of rows
- n: int, number of columns (defaults to m)
- k: int, diagonal offset
- dtype: data type
- format: str, sparse matrix format
Returns:
sparse matrix: identity matrix in specified format
"""
def diags(diagonals, offsets=0, shape=None, format=None, dtype=None):
"""Construct sparse matrix from diagonals.
Parameters:
- diagonals: array-like, diagonal values
- offsets: int or array-like, diagonal offsets
- shape: tuple, matrix shape
- format: str, output format
- dtype: data type
Returns:
sparse matrix: matrix with specified diagonals
"""Specialized functionality for scientific computing and machine learning.
# Linear Algebra Extensions (cupyx.scipy.linalg)
class LinalgExtensions:
"""Extended linear algebra operations beyond NumPy/SciPy standard."""
# Signal Processing Extensions (cupyx.scipy.signal)
class SignalExtensions:
"""Signal processing functions with GPU acceleration."""
# Image Processing Extensions (cupyx.scipy.ndimage)
class NDImageExtensions:
"""N-dimensional image processing operations."""
# Special Functions Extensions (cupyx.scipy.special)
class SpecialExtensions:
"""Mathematical special functions with GPU support."""
# Statistics Extensions (cupyx.scipy.stats)
class StatsExtensions:
"""Statistical distributions and tests with GPU acceleration."""import cupy as cp
import cupyx
# Create destination array
result = cp.zeros(10, dtype=cp.float32)
indices = cp.array([1, 3, 1, 7, 3])
values = cp.array([1.0, 2.0, 3.0, 4.0, 5.0])
# Scatter add - accumulates values at repeated indices
cupyx.scatter_add(result, indices, values)
print(f"After scatter_add: {result}") # [0, 4, 0, 7, 0, 0, 0, 4, 0, 0]
# Scatter max - keeps maximum value at each index
result.fill(0)
cupyx.scatter_max(result, indices, values)
print(f"After scatter_max: {result}") # [0, 3, 0, 5, 0, 0, 0, 4, 0, 0]
# Use in neural network-like operations
def sparse_embedding_lookup(embeddings, indices):
"""Efficient embedding lookup with scatter operations."""
output = cp.zeros((len(indices), embeddings.shape[1]), dtype=embeddings.dtype)
for i, idx in enumerate(indices):
cupyx.scatter_add(output[i:i+1], 0, embeddings[idx:idx+1], axis=0)
return output
# Example usage
embedding_dim = 128
vocab_size = 10000
embeddings = cp.random.normal(0, 1, (vocab_size, embedding_dim))
word_indices = cp.array([5, 42, 1337, 9999])
embedded = sparse_embedding_lookup(embeddings, word_indices)
print(f"Embedded shape: {embedded.shape}")import cupy as cp
import cupyx.jit as jit
@jit.rawkernel()
def matrix_multiply_jit(A, B, C, M, N, K):
"""JIT-compiled matrix multiplication kernel."""
i = jit.blockIdx.x * jit.blockDim.x + jit.threadIdx.x
j = jit.blockIdx.y * jit.blockDim.y + jit.threadIdx.y
if i < M and j < N:
temp = 0.0
for k in range(K):
temp += A[i, k] * B[k, j]
C[i, j] = temp
# Example usage
M, N, K = 512, 512, 256
A = cp.random.random((M, K), dtype=cp.float32)
B = cp.random.random((K, N), dtype=cp.float32)
C = cp.zeros((M, N), dtype=cp.float32)
# Configure launch parameters
threads_per_block = (16, 16)
blocks_per_grid = ((M + 15) // 16, (N + 15) // 16)
# Launch JIT kernel
matrix_multiply_jit[blocks_per_grid, threads_per_block](A, B, C, M, N, K)
# Verify result
expected = cp.dot(A, B)
print(f"JIT result matches cp.dot: {cp.allclose(C, expected)}")
# Advanced JIT example with shared memory
@jit.rawkernel()
def reduction_with_shared_memory(input_array, output_array, n):
"""Parallel reduction using shared memory."""
# Allocate shared memory
shared_data = jit.shared_memory(cp.float32, 256)
tid = jit.threadIdx.x
bid = jit.blockIdx.x
i = bid * jit.blockDim.x + tid
# Load data into shared memory
if i < n:
shared_data[tid] = input_array[i]
else:
shared_data[tid] = 0.0
jit.syncthreads()
# Perform reduction in shared memory
stride = jit.blockDim.x // 2
while stride > 0:
if tid < stride:
shared_data[tid] += shared_data[tid + stride]
jit.syncthreads()
stride //= 2
# Write result for this block
if tid == 0:
jit.atomic_add(output_array, 0, shared_data[0])
# Test reduction kernel
data = cp.random.random(10000, dtype=cp.float32)
result = cp.zeros(1, dtype=cp.float32)
block_size = 256
grid_size = (len(data) + block_size - 1) // block_size
reduction_with_shared_memory[grid_size, block_size](data, result, len(data))
print(f"Reduction result: {result[0]}")
print(f"Expected (cp.sum): {cp.sum(data)}")import cupy as cp
import cupyx.scipy.sparse as sp
# Create sparse matrices
# COO format - good for construction
row = cp.array([0, 1, 2, 0, 1, 2])
col = cp.array([0, 1, 2, 1, 2, 0])
data = cp.array([1, 2, 3, 4, 5, 6])
coo_matrix = sp.coo_matrix((data, (row, col)), shape=(3, 3))
print(f"COO matrix:\n{coo_matrix.toarray()}")
# Convert to CSR for efficient arithmetic
csr_matrix = coo_matrix.tocsr()
print(f"CSR format - data: {csr_matrix.data}")
print(f"CSR format - indices: {csr_matrix.indices}")
print(f"CSR format - indptr: {csr_matrix.indptr}")
# Sparse matrix operations
vector = cp.array([1, 2, 3])
result = csr_matrix.dot(vector) # Matrix-vector multiplication
print(f"Matrix-vector product: {result}")
# Create large sparse matrix efficiently
n = 10000
# Tridiagonal matrix
main_diag = cp.ones(n) * 2
off_diag = cp.ones(n-1) * -1
tridiag = sp.diags([off_diag, main_diag, off_diag],
offsets=[-1, 0, 1],
shape=(n, n),
format='csr')
print(f"Tridiagonal matrix shape: {tridiag.shape}")
print(f"Non-zero elements: {tridiag.nnz}")
# Solve linear system (requires SciPy compatibility)
b = cp.ones(n)
# x = sp.linalg.spsolve(tridiag, b) # Would solve Ax = bimport cupy as cp
import cupyx
import time
import numpy as np
# Compare regular vs pinned memory transfer performance
size = 10000000 # 10M elements
# Regular CPU array
regular_cpu = np.random.random(size).astype(np.float32)
# Pinned CPU array
pinned_cpu = cupyx.zeros_pinned(size, dtype=cp.float32)
pinned_cpu[:] = np.random.random(size).astype(np.float32)
# Benchmark transfers
def benchmark_transfer(cpu_array, n_iterations=100):
"""Benchmark CPU-GPU transfer performance."""
times = []
for _ in range(n_iterations):
start = time.time()
gpu_array = cp.asarray(cpu_array)
cp.cuda.Stream.null.synchronize()
times.append(time.time() - start)
return np.mean(times), np.std(times)
# Regular memory transfer
regular_mean, regular_std = benchmark_transfer(regular_cpu)
# Pinned memory transfer
pinned_mean, pinned_std = benchmark_transfer(pinned_cpu)
print(f"Regular memory: {regular_mean:.4f} ± {regular_std:.4f} seconds")
print(f"Pinned memory: {pinned_mean:.4f} ± {pinned_std:.4f} seconds")
print(f"Speedup: {regular_mean / pinned_mean:.2f}x")
# Asynchronous transfer example
async_stream = cp.cuda.Stream()
with async_stream:
# Start asynchronous transfer
gpu_async = cp.asarray(pinned_cpu)
# Do other work while transfer happens
other_work = cp.random.random((1000, 1000))
result = cp.sum(other_work)
# Synchronize to ensure transfer is complete
async_stream.synchronize()
print(f"Asynchronous transfer completed, result: {result}")import cupy as cp
import cupyx
# Example with potential floating point errors
a = cp.array([1.0, 0.0, -1.0])
b = cp.array([0.0, 0.0, 0.0])
# Default behavior (may warn or ignore)
result1 = a / b
print(f"Default behavior: {result1}") # [inf, nan, -inf]
# Temporarily change error handling
with cupyx.errstate(divide='raise', invalid='raise'):
try:
result2 = a / b
except FloatingPointError as e:
print(f"Caught error: {e}")
# Check current error state
current_state = cupyx.geterr()
print(f"Current error handling: {current_state}")
# Set specific error handling
old_state = cupyx.seterr(divide='warn', over='ignore', invalid='print')
print(f"Previous state: {old_state}")
# Example with overflow
large_numbers = cp.array([1e308, 1e308])
overflow_result = large_numbers * 10 # Will overflow to inf
# Restore previous state
cupyx.seterr(**old_state)
# Advanced error handling for numerical algorithms
def safe_divide(a, b, default=0.0):
"""Safe division with custom error handling."""
with cupyx.errstate(divide='ignore', invalid='ignore'):
result = a / b
# Replace inf and nan with default value
result = cp.where(cp.isfinite(result), result, default)
return result
# Test safe division
numerator = cp.array([1.0, 2.0, 3.0])
denominator = cp.array([2.0, 0.0, 4.0])
safe_result = safe_divide(numerator, denominator, default=0.0)
print(f"Safe division result: {safe_result}") # [0.5, 0.0, 0.75]import cupy as cp
import cupyx
import time
def gpu_accelerated_monte_carlo(n_samples=10000000):
"""Monte Carlo π estimation with CuPy extensions."""
# Use rsqrt for better performance
def estimate_pi_optimized(samples):
x = cp.random.uniform(-1, 1, samples)
y = cp.random.uniform(-1, 1, samples)
# Use optimized rsqrt instead of sqrt
distances_squared = x*x + y*y
# inside = distances_squared <= 1 is more efficient than sqrt comparison
inside_circle = distances_squared <= 1.0
return 4.0 * cp.mean(inside_circle)
# Benchmark with synchronization control
with cupyx.allow_synchronize(False):
start_time = time.time()
pi_estimate = estimate_pi_optimized(n_samples)
# Explicit synchronization when needed
cp.cuda.Stream.null.synchronize()
end_time = time.time()
return float(pi_estimate), end_time - start_time
# Run Monte Carlo estimation
pi_est, computation_time = gpu_accelerated_monte_carlo(50000000)
error = abs(pi_est - cp.pi)
print(f"π estimate: {pi_est:.6f}")
print(f"Error: {error:.6f}")
print(f"Computation time: {computation_time:.4f} seconds")
print(f"Rate: {50000000 / computation_time / 1e6:.2f} M samples/second")
# Memory-efficient streaming computation
def streaming_mean_computation(total_size, chunk_size=1000000):
"""Compute mean of very large dataset using streaming."""
running_sum = 0.0
running_count = 0
# Use pinned memory for efficient transfers
chunk_pinned = cupyx.empty_pinned(chunk_size, dtype=cp.float32)
for start_idx in range(0, total_size, chunk_size):
end_idx = min(start_idx + chunk_size, total_size)
current_chunk_size = end_idx - start_idx
# Generate data in pinned memory
chunk_pinned[:current_chunk_size] = np.random.normal(
5.0, 2.0, current_chunk_size
).astype(np.float32)
# Transfer to GPU and process
gpu_chunk = cp.asarray(chunk_pinned[:current_chunk_size])
chunk_sum = cp.sum(gpu_chunk)
running_sum += float(chunk_sum)
running_count += current_chunk_size
if start_idx % (chunk_size * 10) == 0:
current_mean = running_sum / running_count
print(f"Processed {running_count:,} samples, current mean: {current_mean:.4f}")
return running_sum / running_count
# Process 100M samples in chunks
final_mean = streaming_mean_computation(100000000, chunk_size=2000000)
print(f"Final streaming mean: {final_mean:.6f}")
print(f"Expected mean: 5.0")Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda113