CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cupy-cuda113

CuPy: NumPy & SciPy-compatible array library for GPU-accelerated computing with Python that provides a drop-in replacement for NumPy/SciPy on NVIDIA CUDA platforms.

Pending
Overview
Eval results
Files

cupy-extensions.mddocs/

CuPy Extensions

Additional functionality through CuPy-X including SciPy compatibility, JIT compilation, specialized operations, and advanced GPU programming features. These extensions provide enhanced capabilities beyond core CuPy functionality.

Capabilities

Core Extensions

CuPy-specific mathematical functions and runtime utilities.

def rsqrt(x, out=None):
    """Reciprocal square root function: 1 / sqrt(x).
    
    Optimized GPU implementation providing better performance
    than computing reciprocal and square root separately.
    
    Parameters:
    - x: array-like, input array
    - out: ndarray, output array
    
    Returns:
    cupy.ndarray: reciprocal square root of input
    """

def get_runtime_info(full=False):
    """Get CuPy runtime information.
    
    Parameters:
    - full: bool, whether to include detailed information
    
    Returns:
    dict: runtime information including CUDA version, device details
    """

Scatter Operations

Efficient scatter operations for advanced indexing and updates.

def scatter_add(a, indices, b, axis=None):
    """Add values to array at specified indices.
    
    Performs atomic addition operations, making it safe for
    concurrent updates to the same memory locations.
    
    Parameters:
    - a: ndarray, destination array to be updated
    - indices: array-like, indices where values should be added
    - b: array-like, values to add
    - axis: int, axis along which to scatter
    
    Returns:
    None: modifies array a in-place
    """

def scatter_max(a, indices, b, axis=None):
    """Take element-wise maximum between array values and scattered values.
    
    Parameters:
    - a: ndarray, destination array to be updated
    - indices: array-like, indices where comparison should occur
    - b: array-like, values to compare
    - axis: int, axis along which to scatter
    
    Returns:
    None: modifies array a in-place
    """

def scatter_min(a, indices, b, axis=None):
    """Take element-wise minimum between array values and scattered values.
    
    Parameters:
    - a: ndarray, destination array to be updated
    - indices: array-like, indices where comparison should occur
    - b: array-like, values to compare
    - axis: int, axis along which to scatter
    
    Returns:
    None: modifies array a in-place
    """

Pinned Memory Arrays

Arrays in host memory that are accessible to GPU for efficient transfers.

def empty_pinned(shape, dtype=float64, order='C'):
    """Return new pinned array with uninitialized entries.
    
    Pinned memory enables faster transfers between CPU and GPU
    and allows for asynchronous memory operations.
    
    Parameters:
    - shape: int or sequence of ints, shape of array
    - dtype: data-type, type of array elements
    - order: memory layout order ('C' or 'F')
    
    Returns:
    numpy.ndarray: array in pinned host memory
    """

def zeros_pinned(shape, dtype=float64, order='C'):
    """Return new pinned array of zeros.
    
    Parameters:
    - shape: int or sequence of ints, shape of array
    - dtype: data-type, type of array elements
    - order: memory layout order
    
    Returns:
    numpy.ndarray: zeros array in pinned host memory
    """

def empty_like_pinned(a, dtype=None, order='K', subok=True, shape=None):
    """Return new pinned array with same shape and type as given array."""

def zeros_like_pinned(a, dtype=None, order='K', subok=True, shape=None):
    """Return pinned array of zeros with same shape and type as given array."""

Error State Management

Control floating-point error handling behavior.

def errstate(**kwargs):
    """Context manager for floating-point error handling.
    
    Temporarily override how floating-point errors are handled,
    including division by zero, overflow, underflow, and invalid operations.
    
    Parameters:
    - all: str, set behavior for all error types
    - divide: str, behavior for division by zero
    - over: str, behavior for overflow
    - under: str, behavior for underflow
    - invalid: str, behavior for invalid operations
    
    Error behaviors: 'ignore', 'warn', 'raise', 'call', 'print', 'log'
    
    Returns:
    context manager
    """

def geterr():
    """Get current floating-point error handling settings.
    
    Returns:
    dict: current error handling settings for each error type
    """

def seterr(all=None, divide=None, over=None, under=None, invalid=None):
    """Set floating-point error handling behavior.
    
    Parameters:
    - all: str, set behavior for all error types
    - divide: str, behavior for division by zero
    - over: str, behavior for overflow  
    - under: str, behavior for underflow
    - invalid: str, behavior for invalid operations
    
    Returns:
    dict: previous error handling settings
    """

Synchronization Control

Advanced control over GPU synchronization behavior.

def allow_synchronize(allow=True):
    """Context manager to control synchronization behavior.
    
    Allows fine-grained control over when GPU operations
    synchronize with CPU, useful for performance optimization.
    
    Parameters:
    - allow: bool, whether to allow synchronization
    
    Returns:
    context manager
    """

class DeviceSynchronized:
    """Device synchronization manager for performance control.
    
    Provides advanced synchronization control for multi-GPU
    applications and performance-critical sections.
    """
    
    def __enter__(self):
        """Enter synchronized section."""
        
    def __exit__(self, *args):
        """Exit synchronized section."""

JIT Compilation

Just-in-time compilation capabilities for dynamic kernel generation.

def rawkernel(mode='python', device=False):
    """Decorator for creating raw kernels from Python functions.
    
    Enables writing CUDA kernels using Python syntax with JIT
    compilation to optimized GPU code.
    
    Parameters:
    - mode: str, compilation mode ('python', 'cuda')
    - device: bool, whether function runs on device
    
    Returns:
    callable: decorated kernel function
    """

# CUDA Built-in Types and Functions
threadIdx = None  # Thread index within block (x, y, z components)
blockDim = None   # Block dimensions (x, y, z components)
blockIdx = None   # Block index within grid (x, y, z components)
gridDim = None    # Grid dimensions (x, y, z components)

def syncthreads():
    """Synchronize all threads within a CUDA block.
    
    Ensures all threads in the current block reach this point
    before any thread continues execution.
    """

def shared_memory(dtype, shape):
    """Allocate shared memory array within CUDA block.
    
    Parameters:
    - dtype: data type of shared memory elements
    - shape: int or tuple, shape of shared memory array
    
    Returns:
    shared memory array accessible to all threads in block
    """

def atomic_add(array, index, value):
    """Perform atomic addition operation.
    
    Safely adds value to array[index] in a thread-safe manner,
    preventing race conditions in parallel execution.
    
    Parameters:
    - array: target array
    - index: index to update
    - value: value to add
    
    Returns:
    previous value at array[index]
    """

SciPy Compatibility

Comprehensive SciPy-compatible functionality with GPU acceleration.

def get_array_module(*args):
    """Get appropriate array module (NumPy or CuPy) for arguments.
    
    Automatically selects the correct module based on input types,
    enabling generic code that works with both CPU and GPU arrays.
    
    Parameters:
    - args: array arguments to analyze
    
    Returns:
    module: cupy or numpy module appropriate for inputs
    """

Sparse Matrix Support

GPU-accelerated sparse matrix operations and formats.

class spmatrix:
    """Base class for sparse matrices.
    
    Provides common interface for all sparse matrix formats
    with GPU-accelerated operations and memory efficiency.
    """

class csr_matrix:
    """Compressed Sparse Row matrix format.
    
    Efficient sparse matrix format for arithmetic operations
    and fast row slicing, optimized for GPU computation.
    """
    
    def __init__(self, arg1, shape=None, dtype=None, copy=False):
        """Initialize CSR sparse matrix.
        
        Parameters:
        - arg1: array-like, sparse matrix data
        - shape: tuple, matrix dimensions
        - dtype: data type
        - copy: bool, whether to copy data
        """

class csc_matrix:
    """Compressed Sparse Column matrix format.
    
    Efficient for column slicing and matrix-vector products
    where the matrix is accessed column-wise.
    """

class coo_matrix:
    """COOrdinate sparse matrix format.
    
    Efficient for sparse matrix construction and format conversion,
    stores (row, col, data) triplets.
    """

class dia_matrix:
    """DIAgonal sparse matrix format.
    
    Efficient storage for matrices with few diagonals,
    optimized for diagonal matrix operations.
    """

def issparse(x):
    """Check if input is a sparse matrix.
    
    Parameters:
    - x: input object to check
    
    Returns:
    bool: True if x is a sparse matrix
    """

def eye(m, n=None, k=0, dtype=float, format=None):
    """Create sparse identity matrix.
    
    Parameters:
    - m: int, number of rows
    - n: int, number of columns (defaults to m)
    - k: int, diagonal offset
    - dtype: data type
    - format: str, sparse matrix format
    
    Returns:
    sparse matrix: identity matrix in specified format
    """

def diags(diagonals, offsets=0, shape=None, format=None, dtype=None):
    """Construct sparse matrix from diagonals.
    
    Parameters:
    - diagonals: array-like, diagonal values
    - offsets: int or array-like, diagonal offsets
    - shape: tuple, matrix shape
    - format: str, output format
    - dtype: data type
    
    Returns:
    sparse matrix: matrix with specified diagonals
    """

Advanced Extensions

Specialized functionality for scientific computing and machine learning.

# Linear Algebra Extensions (cupyx.scipy.linalg)
class LinalgExtensions:
    """Extended linear algebra operations beyond NumPy/SciPy standard."""

# Signal Processing Extensions (cupyx.scipy.signal) 
class SignalExtensions:
    """Signal processing functions with GPU acceleration."""

# Image Processing Extensions (cupyx.scipy.ndimage)
class NDImageExtensions:
    """N-dimensional image processing operations."""

# Special Functions Extensions (cupyx.scipy.special)
class SpecialExtensions:
    """Mathematical special functions with GPU support."""

# Statistics Extensions (cupyx.scipy.stats)
class StatsExtensions:
    """Statistical distributions and tests with GPU acceleration."""

Usage Examples

Scatter Operations

import cupy as cp
import cupyx

# Create destination array
result = cp.zeros(10, dtype=cp.float32)
indices = cp.array([1, 3, 1, 7, 3])
values = cp.array([1.0, 2.0, 3.0, 4.0, 5.0])

# Scatter add - accumulates values at repeated indices
cupyx.scatter_add(result, indices, values)
print(f"After scatter_add: {result}")  # [0, 4, 0, 7, 0, 0, 0, 4, 0, 0]

# Scatter max - keeps maximum value at each index
result.fill(0)
cupyx.scatter_max(result, indices, values)
print(f"After scatter_max: {result}")  # [0, 3, 0, 5, 0, 0, 0, 4, 0, 0]

# Use in neural network-like operations
def sparse_embedding_lookup(embeddings, indices):
    """Efficient embedding lookup with scatter operations."""
    output = cp.zeros((len(indices), embeddings.shape[1]), dtype=embeddings.dtype)
    for i, idx in enumerate(indices):
        cupyx.scatter_add(output[i:i+1], 0, embeddings[idx:idx+1], axis=0)
    return output

# Example usage
embedding_dim = 128
vocab_size = 10000
embeddings = cp.random.normal(0, 1, (vocab_size, embedding_dim))
word_indices = cp.array([5, 42, 1337, 9999])

embedded = sparse_embedding_lookup(embeddings, word_indices)
print(f"Embedded shape: {embedded.shape}")

JIT Compilation

import cupy as cp
import cupyx.jit as jit

@jit.rawkernel()
def matrix_multiply_jit(A, B, C, M, N, K):
    """JIT-compiled matrix multiplication kernel."""
    i = jit.blockIdx.x * jit.blockDim.x + jit.threadIdx.x
    j = jit.blockIdx.y * jit.blockDim.y + jit.threadIdx.y
    
    if i < M and j < N:
        temp = 0.0
        for k in range(K):
            temp += A[i, k] * B[k, j]
        C[i, j] = temp

# Example usage
M, N, K = 512, 512, 256
A = cp.random.random((M, K), dtype=cp.float32)
B = cp.random.random((K, N), dtype=cp.float32)
C = cp.zeros((M, N), dtype=cp.float32)

# Configure launch parameters
threads_per_block = (16, 16)
blocks_per_grid = ((M + 15) // 16, (N + 15) // 16)

# Launch JIT kernel
matrix_multiply_jit[blocks_per_grid, threads_per_block](A, B, C, M, N, K)

# Verify result
expected = cp.dot(A, B)
print(f"JIT result matches cp.dot: {cp.allclose(C, expected)}")

# Advanced JIT example with shared memory
@jit.rawkernel()
def reduction_with_shared_memory(input_array, output_array, n):
    """Parallel reduction using shared memory."""
    # Allocate shared memory
    shared_data = jit.shared_memory(cp.float32, 256)
    
    tid = jit.threadIdx.x
    bid = jit.blockIdx.x
    i = bid * jit.blockDim.x + tid
    
    # Load data into shared memory
    if i < n:
        shared_data[tid] = input_array[i]
    else:
        shared_data[tid] = 0.0
    
    jit.syncthreads()
    
    # Perform reduction in shared memory
    stride = jit.blockDim.x // 2
    while stride > 0:
        if tid < stride:
            shared_data[tid] += shared_data[tid + stride]
        jit.syncthreads()
        stride //= 2
    
    # Write result for this block
    if tid == 0:
        jit.atomic_add(output_array, 0, shared_data[0])

# Test reduction kernel
data = cp.random.random(10000, dtype=cp.float32)
result = cp.zeros(1, dtype=cp.float32)

block_size = 256
grid_size = (len(data) + block_size - 1) // block_size

reduction_with_shared_memory[grid_size, block_size](data, result, len(data))
print(f"Reduction result: {result[0]}")
print(f"Expected (cp.sum): {cp.sum(data)}")

Sparse Matrix Operations

import cupy as cp
import cupyx.scipy.sparse as sp

# Create sparse matrices
# COO format - good for construction
row = cp.array([0, 1, 2, 0, 1, 2])
col = cp.array([0, 1, 2, 1, 2, 0])
data = cp.array([1, 2, 3, 4, 5, 6])

coo_matrix = sp.coo_matrix((data, (row, col)), shape=(3, 3))
print(f"COO matrix:\n{coo_matrix.toarray()}")

# Convert to CSR for efficient arithmetic
csr_matrix = coo_matrix.tocsr()
print(f"CSR format - data: {csr_matrix.data}")
print(f"CSR format - indices: {csr_matrix.indices}")
print(f"CSR format - indptr: {csr_matrix.indptr}")

# Sparse matrix operations
vector = cp.array([1, 2, 3])
result = csr_matrix.dot(vector)  # Matrix-vector multiplication
print(f"Matrix-vector product: {result}")

# Create large sparse matrix efficiently
n = 10000
# Tridiagonal matrix
main_diag = cp.ones(n) * 2
off_diag = cp.ones(n-1) * -1

tridiag = sp.diags([off_diag, main_diag, off_diag], 
                   offsets=[-1, 0, 1], 
                   shape=(n, n), 
                   format='csr')

print(f"Tridiagonal matrix shape: {tridiag.shape}")
print(f"Non-zero elements: {tridiag.nnz}")

# Solve linear system (requires SciPy compatibility)
b = cp.ones(n)
# x = sp.linalg.spsolve(tridiag, b)  # Would solve Ax = b

Pinned Memory for Optimal Transfers

import cupy as cp
import cupyx
import time
import numpy as np

# Compare regular vs pinned memory transfer performance
size = 10000000  # 10M elements

# Regular CPU array
regular_cpu = np.random.random(size).astype(np.float32)

# Pinned CPU array
pinned_cpu = cupyx.zeros_pinned(size, dtype=cp.float32)
pinned_cpu[:] = np.random.random(size).astype(np.float32)

# Benchmark transfers
def benchmark_transfer(cpu_array, n_iterations=100):
    """Benchmark CPU-GPU transfer performance."""
    times = []
    
    for _ in range(n_iterations):
        start = time.time()
        gpu_array = cp.asarray(cpu_array)
        cp.cuda.Stream.null.synchronize()
        times.append(time.time() - start)
    
    return np.mean(times), np.std(times)

# Regular memory transfer
regular_mean, regular_std = benchmark_transfer(regular_cpu)

# Pinned memory transfer  
pinned_mean, pinned_std = benchmark_transfer(pinned_cpu)

print(f"Regular memory: {regular_mean:.4f} ± {regular_std:.4f} seconds")
print(f"Pinned memory:  {pinned_mean:.4f} ± {pinned_std:.4f} seconds")
print(f"Speedup: {regular_mean / pinned_mean:.2f}x")

# Asynchronous transfer example
async_stream = cp.cuda.Stream()

with async_stream:
    # Start asynchronous transfer
    gpu_async = cp.asarray(pinned_cpu)
    
    # Do other work while transfer happens
    other_work = cp.random.random((1000, 1000))
    result = cp.sum(other_work)
    
    # Synchronize to ensure transfer is complete
    async_stream.synchronize()

print(f"Asynchronous transfer completed, result: {result}")

Error State Management

import cupy as cp
import cupyx

# Example with potential floating point errors
a = cp.array([1.0, 0.0, -1.0])
b = cp.array([0.0, 0.0, 0.0])

# Default behavior (may warn or ignore)
result1 = a / b
print(f"Default behavior: {result1}")  # [inf, nan, -inf]

# Temporarily change error handling
with cupyx.errstate(divide='raise', invalid='raise'):
    try:
        result2 = a / b
    except FloatingPointError as e:
        print(f"Caught error: {e}")

# Check current error state
current_state = cupyx.geterr()
print(f"Current error handling: {current_state}")

# Set specific error handling
old_state = cupyx.seterr(divide='warn', over='ignore', invalid='print')
print(f"Previous state: {old_state}")

# Example with overflow
large_numbers = cp.array([1e308, 1e308])
overflow_result = large_numbers * 10  # Will overflow to inf

# Restore previous state
cupyx.seterr(**old_state)

# Advanced error handling for numerical algorithms
def safe_divide(a, b, default=0.0):
    """Safe division with custom error handling."""
    with cupyx.errstate(divide='ignore', invalid='ignore'):
        result = a / b
        # Replace inf and nan with default value
        result = cp.where(cp.isfinite(result), result, default)
    return result

# Test safe division
numerator = cp.array([1.0, 2.0, 3.0])
denominator = cp.array([2.0, 0.0, 4.0])
safe_result = safe_divide(numerator, denominator, default=0.0)
print(f"Safe division result: {safe_result}")  # [0.5, 0.0, 0.75]

High-Performance Computing Patterns

import cupy as cp
import cupyx
import time

def gpu_accelerated_monte_carlo(n_samples=10000000):
    """Monte Carlo π estimation with CuPy extensions."""
    
    # Use rsqrt for better performance
    def estimate_pi_optimized(samples):
        x = cp.random.uniform(-1, 1, samples)
        y = cp.random.uniform(-1, 1, samples)
        
        # Use optimized rsqrt instead of sqrt
        distances_squared = x*x + y*y
        # inside = distances_squared <= 1 is more efficient than sqrt comparison
        inside_circle = distances_squared <= 1.0
        
        return 4.0 * cp.mean(inside_circle)
    
    # Benchmark with synchronization control
    with cupyx.allow_synchronize(False):
        start_time = time.time()
        pi_estimate = estimate_pi_optimized(n_samples)
        # Explicit synchronization when needed
        cp.cuda.Stream.null.synchronize()
        end_time = time.time()
    
    return float(pi_estimate), end_time - start_time

# Run Monte Carlo estimation
pi_est, computation_time = gpu_accelerated_monte_carlo(50000000)
error = abs(pi_est - cp.pi)

print(f"π estimate: {pi_est:.6f}")
print(f"Error: {error:.6f}")
print(f"Computation time: {computation_time:.4f} seconds")
print(f"Rate: {50000000 / computation_time / 1e6:.2f} M samples/second")

# Memory-efficient streaming computation
def streaming_mean_computation(total_size, chunk_size=1000000):
    """Compute mean of very large dataset using streaming."""
    running_sum = 0.0
    running_count = 0
    
    # Use pinned memory for efficient transfers
    chunk_pinned = cupyx.empty_pinned(chunk_size, dtype=cp.float32)
    
    for start_idx in range(0, total_size, chunk_size):
        end_idx = min(start_idx + chunk_size, total_size)
        current_chunk_size = end_idx - start_idx
        
        # Generate data in pinned memory
        chunk_pinned[:current_chunk_size] = np.random.normal(
            5.0, 2.0, current_chunk_size
        ).astype(np.float32)
        
        # Transfer to GPU and process
        gpu_chunk = cp.asarray(chunk_pinned[:current_chunk_size])
        chunk_sum = cp.sum(gpu_chunk)
        
        running_sum += float(chunk_sum)
        running_count += current_chunk_size
        
        if start_idx % (chunk_size * 10) == 0:
            current_mean = running_sum / running_count
            print(f"Processed {running_count:,} samples, current mean: {current_mean:.4f}")
    
    return running_sum / running_count

# Process 100M samples in chunks
final_mean = streaming_mean_computation(100000000, chunk_size=2000000)
print(f"Final streaming mean: {final_mean:.6f}")
print(f"Expected mean: 5.0")

Install with Tessl CLI

npx tessl i tessl/pypi-cupy-cuda113

docs

array-operations.md

cuda-integration.md

cupy-extensions.md

custom-kernels.md

fft-operations.md

index.md

linear-algebra.md

math-functions.md

random-generation.md

statistical-functions.md

tile.json