NumPy & SciPy compatible GPU-accelerated array library for CUDA computing
—
Just-in-time compilation capabilities and custom CUDA kernel creation for performance-critical applications requiring low-level GPU programming. CuPy provides comprehensive JIT compilation through kernel templates, raw CUDA kernels, and the cupyx.jit module for advanced GPU programming.
High-level kernel creation for common GPU computation patterns.
class ElementwiseKernel:
"""Create custom element-wise operation kernel.
Args:
in_params: Input parameter specifications
out_params: Output parameter specifications
operation: CUDA C++ code for element operation
name: Kernel name for caching
reduce_dims: Whether to reduce dimensions
preamble: Additional declarations
loop_prep: Code before loop
after_loop: Code after loop
Example:
kernel = ElementwiseKernel(
'float32 x, float32 y',
'float32 z',
'z = x * x + y * y',
'squared_sum'
)
"""
def __init__(self, in_params, out_params, operation, name='kernel',
reduce_dims=True, preamble='', loop_prep='', after_loop='', **kwargs):
pass
def __call__(self, *args, **kwargs):
"""Execute kernel with given arguments."""
pass
class ReductionKernel:
"""Create custom reduction operation kernel.
Args:
in_params: Input parameter specifications
out_params: Output parameter specifications
map_expr: Expression for mapping phase
reduce_expr: Expression for reduction phase
post_map_expr: Expression after mapping
identity: Identity value for reduction
name: Kernel name
reduce_type: Type for reduction variable
reduce_dims: Whether to reduce dimensions
preamble: Additional declarations
Example:
kernel = ReductionKernel(
'float32 x',
'float32 y',
'x',
'a + b',
'y = a',
'0'
)
"""
def __init__(self, in_params, out_params, map_expr, reduce_expr,
post_map_expr='', identity=None, name='kernel', reduce_type=None,
reduce_dims=True, preamble='', **kwargs):
pass
def __call__(self, *args, **kwargs):
"""Execute reduction kernel."""
pass
class RawKernel:
"""Create kernel from raw CUDA C++ code.
Args:
code: Complete CUDA kernel source code
name: Kernel function name
options: Compiler options
backend: Compilation backend ('nvcc', 'nvrtc')
translate_cucomplex: Translate cuComplex types
Example:
code = '''
extern "C" __global__ void my_kernel(float* x, float* y, int n) {
int i = blockIdx.x * blockDim.x + threadIdx.x;
if (i < n) {
y[i] = x[i] * x[i];
}
}
'''
kernel = RawKernel(code, 'my_kernel')
"""
def __init__(self, code, name, options=(), backend='nvcc',
translate_cucomplex=True, **kwargs):
pass
def __call__(self, grid, block, args, **kwargs):
"""Launch kernel with grid/block configuration."""
pass
class RawModule:
"""Create module from raw CUDA C++ code.
Args:
code: CUDA source code with multiple functions
options: Compiler options
backend: Compilation backend
name_expressions: Named expressions for kernel names
log_stream: Compilation log output stream
Example:
code = '''
extern "C" {
__global__ void kernel1(float* data) { ... }
__global__ void kernel2(float* data) { ... }
}
'''
module = RawModule(code)
kernel1 = module.get_function('kernel1')
"""
def __init__(self, code, options=(), backend='nvcc', name_expressions=None,
log_stream=None, **kwargs):
pass
def get_function(self, name):
"""Get kernel function by name."""
passAdvanced JIT compilation with Python decorators and runtime code generation.
def rawkernel(mode='CUDA'):
"""Decorator for raw CUDA kernel functions.
Args:
mode: Compilation mode ('CUDA' or 'HIP')
Example:
@rawkernel()
def my_kernel(x, y, size):
tid = threadIdx.x + blockIdx.x * blockDim.x
if tid < size:
y[tid] = x[tid] * x[tid]
# Launch kernel
my_kernel((grid_size,), (block_size,), (x_gpu, y_gpu, n))
"""
def jit(signature=None, device=False, inline=False, cache=True):
"""JIT compile Python functions for GPU execution.
Args:
signature: Function signature specification
device: Compile for device execution
inline: Allow inlining
cache: Enable compilation caching
Returns:
Compiled function object
"""
def compile_with_cache(source, name, options=(), arch=None, cachdir=None,
prepend_cupy_headers=True, backend='nvcc',
translate_cucomplex=True, enable_cooperative_groups=False,
name_expressions=None, log_stream=None,
cache_in_memory=False, jitify=False):
"""Compile CUDA source with caching.
Args:
source: CUDA C++ source code
name: Function name to extract
options: Compiler options tuple
arch: Target architecture
cachdir: Cache directory path
prepend_cupy_headers: Include CuPy headers
backend: Compilation backend
translate_cucomplex: Handle cuComplex types
enable_cooperative_groups: Enable cooperative groups
name_expressions: Named expressions for kernels
log_stream: Compilation log stream
cache_in_memory: Use in-memory caching
jitify: Use Jitify for compilation
Returns:
Function: Compiled CUDA function
"""Low-level CUDA execution primitives and thread management.
# Thread and Block Indexing
threadIdx = ThreadIndex() # Thread index within block
blockIdx = BlockIndex() # Block index within grid
blockDim = BlockDimension() # Block dimensions
gridDim = GridDimension() # Grid dimensions
class ThreadIndex:
"""Thread index within block."""
x: int # X dimension thread index
y: int # Y dimension thread index
z: int # Z dimension thread index
class BlockIndex:
"""Block index within grid."""
x: int # X dimension block index
y: int # Y dimension block index
z: int # Z dimension block index
class BlockDimension:
"""Block dimensions."""
x: int # X dimension block size
y: int # Y dimension block size
z: int # Z dimension block size
class GridDimension:
"""Grid dimensions."""
x: int # X dimension grid size
y: int # Y dimension grid size
z: int # Z dimension grid size
warpsize: int = 32 # Warp size constant
def laneid():
"""Get lane ID within warp (0-31).
Returns:
int: Lane ID within current warp
"""
def grid(ndim):
"""Get linearized grid index.
Args:
ndim: Number of dimensions (1, 2, or 3)
Returns:
int or tuple: Grid index
"""
def gridsize(ndim):
"""Get total grid size.
Args:
ndim: Number of dimensions
Returns:
int or tuple: Grid size
"""Thread and warp synchronization functions for coordinated GPU execution.
def syncthreads():
"""Synchronize all threads in a block.
Blocks until all threads in the current thread block have reached
this point and all memory accesses are visible to all threads.
"""
def syncwarp(mask=0xffffffff):
"""Synchronize threads in a warp.
Args:
mask: Thread mask specifying which threads to synchronize
Note:
Only threads with corresponding bit set in mask participate.
"""
def barrier(scope='block'):
"""Memory barrier with specified scope.
Args:
scope: Barrier scope ('block', 'grid', 'device', 'system')
"""
def memfence_block():
"""Block-level memory fence."""
def memfence_grid():
"""Grid-level memory fence."""
def memfence_system():
"""System-level memory fence."""Shared memory allocation and access for high-performance block-local storage.
def shared_memory(dtype, shape):
"""Allocate shared memory array.
Args:
dtype: Data type for array elements
shape: Array shape (int or tuple)
Returns:
SharedArray: Shared memory array object
Example:
# Allocate 256 float32 values in shared memory
shared_data = shared_memory(cp.float32, 256)
# 2D shared memory array
shared_matrix = shared_memory(cp.float32, (16, 16))
"""
def dynamic_shared_memory(dtype):
"""Access dynamically allocated shared memory.
Args:
dtype: Data type for interpreting shared memory
Returns:
SharedArray: Dynamic shared memory view
Note:
Size determined by kernel launch parameters.
"""
class SharedArray:
"""Shared memory array interface."""
def __getitem__(self, key):
"""Access shared memory elements."""
pass
def __setitem__(self, key, value):
"""Set shared memory elements."""
passThread-safe atomic operations for lock-free algorithms and reductions.
def atomic_add(array, index, value):
"""Atomic addition.
Args:
array: Target array
index: Array index
value: Value to add
Returns:
Previous value at index
"""
def atomic_sub(array, index, value):
"""Atomic subtraction."""
def atomic_exch(array, index, value):
"""Atomic exchange.
Args:
array: Target array
index: Array index
value: New value
Returns:
Previous value at index
"""
def atomic_min(array, index, value):
"""Atomic minimum operation."""
def atomic_max(array, index, value):
"""Atomic maximum operation."""
def atomic_inc(array, index):
"""Atomic increment.
Args:
array: Target array
index: Array index
Returns:
Previous value at index
"""
def atomic_dec(array, index):
"""Atomic decrement."""
def atomic_cas(array, index, compare, value):
"""Atomic compare-and-swap.
Args:
array: Target array
index: Array index
compare: Compare value
value: New value if comparison succeeds
Returns:
Previous value at index
"""
def atomic_and(array, index, value):
"""Atomic bitwise AND."""
def atomic_or(array, index, value):
"""Atomic bitwise OR."""
def atomic_xor(array, index, value):
"""Atomic bitwise XOR."""Efficient warp-level collective operations for high-performance algorithms.
def shfl_sync(mask, var, srcLane, width=32):
"""Warp shuffle operation.
Args:
mask: Thread participation mask
var: Variable to shuffle
srcLane: Source lane index
width: Warp width (power of 2, ≤32)
Returns:
Value from source lane
"""
def shfl_up_sync(mask, var, delta, width=32):
"""Warp shuffle up operation.
Args:
mask: Thread participation mask
var: Variable to shuffle
delta: Offset to source lane
width: Warp width
Returns:
Value from lane (current - delta)
"""
def shfl_down_sync(mask, var, delta, width=32):
"""Warp shuffle down operation.
Args:
mask: Thread participation mask
var: Variable to shuffle
delta: Offset to source lane
width: Warp width
Returns:
Value from lane (current + delta)
"""
def shfl_xor_sync(mask, var, laneMask, width=32):
"""Warp shuffle XOR operation.
Args:
mask: Thread participation mask
var: Variable to shuffle
laneMask: XOR mask for lane selection
width: Warp width
Returns:
Value from lane (current ^ laneMask)
"""
def vote_all_sync(mask, predicate):
"""Test if predicate is true for all threads in mask.
Args:
mask: Thread participation mask
predicate: Boolean expression to test
Returns:
bool: True if all threads have true predicate
"""
def vote_any_sync(mask, predicate):
"""Test if predicate is true for any thread in mask."""
def vote_uni_sync(mask, predicate):
"""Test if predicate has same value for all threads."""
def ballot_sync(mask, predicate):
"""Get ballot of predicate results across warp.
Args:
mask: Thread participation mask
predicate: Boolean expression
Returns:
int: Bitmask of predicate results
"""
def activemask():
"""Get mask of currently active threads in warp.
Returns:
int: Bitmask of active threads
"""GPU-optimized mathematical functions for kernel development.
def fma(x, y, z):
"""Fused multiply-add: x * y + z with single rounding."""
def rsqrt(x):
"""Fast reciprocal square root: 1/sqrt(x)."""
def rcp(x):
"""Fast reciprocal: 1/x."""
def sin_pi(x):
"""Compute sin(π * x) accurately."""
def cos_pi(x):
"""Compute cos(π * x) accurately."""
def sincos(x):
"""Compute sin and cos simultaneously.
Returns:
tuple: (sin(x), cos(x))
"""
def exp2(x):
"""Base-2 exponential: 2^x."""
def log2(x):
"""Base-2 logarithm."""
def pow(x, y):
"""Power function: x^y."""
def sqrt(x):
"""Square root."""
def cbrt(x):
"""Cube root."""
def hypot(x, y):
"""Euclidean distance: sqrt(x^2 + y^2)."""
def remainder(x, y):
"""Floating point remainder."""
def fmod(x, y):
"""Floating point modulo."""
def copysign(x, y):
"""Copy sign of y to magnitude of x."""
def ldexp(x, exp):
"""Compute x * 2^exp."""
def frexp(x):
"""Extract mantissa and exponent.
Returns:
tuple: (mantissa, exponent)
"""import cupy as cp
from cupy import ElementwiseKernel
# Simple arithmetic kernel
add_kernel = ElementwiseKernel(
'float32 x, float32 y',
'float32 z',
'z = x + y',
'elementwise_add'
)
# Complex expression kernel
norm_kernel = ElementwiseKernel(
'float32 x, float32 y',
'float32 norm',
'norm = sqrt(x * x + y * y)',
'vector_norm'
)
# Multi-output kernel
polar_kernel = ElementwiseKernel(
'float32 x, float32 y',
'float32 r, float32 theta',
'''
r = sqrt(x * x + y * y);
theta = atan2(y, x);
''',
'cartesian_to_polar'
)
# Usage
x = cp.random.randn(1000000).astype(cp.float32)
y = cp.random.randn(1000000).astype(cp.float32)
result = add_kernel(x, y)
norms = norm_kernel(x, y)
r, theta = polar_kernel(x, y)import cupy as cp
from cupy import ReductionKernel
# Sum reduction
sum_kernel = ReductionKernel(
'float32 x',
'float32 y',
'x', # map expression
'a + b', # reduce expression
'y = a', # post-map expression
'0', # identity value
'sum_reduction'
)
# Maximum reduction
max_kernel = ReductionKernel(
'float32 x',
'float32 y',
'x',
'max(a, b)',
'y = a',
'-INFINITY',
'max_reduction'
)
# Variance reduction
variance_kernel = ReductionKernel(
'float32 x, float32 mean',
'float32 var',
'(x - mean) * (x - mean)',
'a + b',
'var = a / (_in_ind.size() - 1)',
'0',
'variance_reduction'
)
# Usage
data = cp.random.randn(1000000).astype(cp.float32)
total = sum_kernel(data)
maximum = max_kernel(data)
mean_val = cp.mean(data)
var_val = variance_kernel(data, mean_val)import cupy as cp
from cupy import RawKernel
# Matrix multiplication kernel
matmul_code = '''
extern "C" __global__ void matmul(
const float* A, const float* B, float* C,
int M, int N, int K
) {
int row = blockIdx.y * blockDim.y + threadIdx.y;
int col = blockIdx.x * blockDim.x + threadIdx.x;
if (row < M && col < N) {
float sum = 0.0f;
for (int k = 0; k < K; k++) {
sum += A[row * K + k] * B[k * N + col];
}
C[row * N + col] = sum;
}
}
'''
matmul_kernel = RawKernel(matmul_code, 'matmul')
# Optimized reduction kernel
reduction_code = '''
extern "C" __global__ void block_reduce_sum(
const float* input, float* output, int n
) {
extern __shared__ float shared_data[];
int tid = threadIdx.x;
int i = blockIdx.x * blockDim.x + threadIdx.x;
// Load data into shared memory
shared_data[tid] = (i < n) ? input[i] : 0.0f;
__syncthreads();
// Reduction in shared memory
for (int s = blockDim.x / 2; s > 0; s >>= 1) {
if (tid < s) {
shared_data[tid] += shared_data[tid + s];
}
__syncthreads();
}
// Write result
if (tid == 0) {
output[blockIdx.x] = shared_data[0];
}
}
'''
reduce_kernel = RawKernel(reduction_code, 'block_reduce_sum')
# Usage
A = cp.random.randn(512, 256).astype(cp.float32)
B = cp.random.randn(256, 128).astype(cp.float32)
C = cp.zeros((512, 128), dtype=cp.float32)
# Launch matrix multiplication
block_size = (16, 16)
grid_size = ((128 + block_size[0] - 1) // block_size[0],
(512 + block_size[1] - 1) // block_size[1])
matmul_kernel(grid_size, block_size, (A, B, C, 512, 128, 256))import cupy as cp
from cupyx.jit import rawkernel
@rawkernel()
def saxpy_kernel(a, x, y, n):
"""SAXPY: y = a*x + y"""
tid = cp.cuda.grid(1)
if tid < n:
y[tid] = a * x[tid] + y[tid]
@rawkernel()
def transpose_kernel(input_mat, output_mat, width, height):
"""Matrix transpose kernel"""
col = cp.cuda.blockIdx.x * cp.cuda.blockDim.x + cp.cuda.threadIdx.x
row = cp.cuda.blockIdx.y * cp.cuda.blockDim.y + cp.cuda.threadIdx.y
if col < width and row < height:
output_mat[col * height + row] = input_mat[row * width + col]
@rawkernel()
def stencil_kernel(input_arr, output_arr, width, height):
"""5-point stencil computation"""
col = cp.cuda.blockIdx.x * cp.cuda.blockDim.x + cp.cuda.threadIdx.x
row = cp.cuda.blockIdx.y * cp.cuda.blockDim.y + cp.cuda.threadIdx.y
if 1 <= col < width-1 and 1 <= row < height-1:
idx = row * width + col
result = (input_arr[idx] +
input_arr[idx-1] + input_arr[idx+1] +
input_arr[idx-width] + input_arr[idx+width]) * 0.2
output_arr[idx] = result
# Usage
n = 1000000
a = 2.5
x = cp.random.randn(n).astype(cp.float32)
y = cp.random.randn(n).astype(cp.float32)
# Launch SAXPY kernel
block_size = 256
grid_size = (n + block_size - 1) // block_size
saxpy_kernel((grid_size,), (block_size,), (a, x, y, n))import cupy as cp
from cupyx.jit import rawkernel
@rawkernel()
def histogram_kernel(data, bins, hist, n, num_bins):
"""Compute histogram using shared memory and atomics"""
# Shared memory for local histogram
shared_hist = cp.cuda.shared_memory(cp.int32, 256)
tid = cp.cuda.threadIdx.x
bid = cp.cuda.blockIdx.x
# Initialize shared memory
if tid < num_bins:
shared_hist[tid] = 0
cp.cuda.syncthreads()
# Process data elements
idx = bid * cp.cuda.blockDim.x + tid
while idx < n:
bin_idx = int(data[idx] * num_bins)
if 0 <= bin_idx < num_bins:
cp.cuda.atomic_add(shared_hist, bin_idx, 1)
idx += cp.cuda.gridDim.x * cp.cuda.blockDim.x
cp.cuda.syncthreads()
# Reduce to global histogram
if tid < num_bins:
cp.cuda.atomic_add(hist, tid, shared_hist[tid])
@rawkernel()
def prefix_sum_kernel(data, result, n):
"""Parallel prefix sum using shared memory"""
shared_data = cp.cuda.shared_memory(cp.float32, 512)
tid = cp.cuda.threadIdx.x
bid = cp.cuda.blockIdx.x
block_size = cp.cuda.blockDim.x
# Load data
idx = bid * block_size + tid
shared_data[tid] = data[idx] if idx < n else 0.0
cp.cuda.syncthreads()
# Up-sweep phase
offset = 1
while offset < block_size:
if (tid + 1) % (2 * offset) == 0:
shared_data[tid] += shared_data[tid - offset]
offset *= 2
cp.cuda.syncthreads()
# Down-sweep phase
if tid == block_size - 1:
shared_data[tid] = 0.0
offset = block_size // 2
while offset > 0:
cp.cuda.syncthreads()
if (tid + 1) % (2 * offset) == 0:
temp = shared_data[tid - offset]
shared_data[tid - offset] = shared_data[tid]
shared_data[tid] += temp
offset //= 2
cp.cuda.syncthreads()
# Store result
if idx < n:
result[idx] = shared_data[tid]
# Usage examples
data = cp.random.rand(1000000).astype(cp.float32)
hist = cp.zeros(256, dtype=cp.int32)
# Compute histogram
block_size = 256
grid_size = 128
histogram_kernel((grid_size,), (block_size,), (data, None, hist, len(data), 256))
# Prefix sum
prefix_result = cp.zeros_like(data)
prefix_sum_kernel((grid_size,), (block_size,), (data, prefix_result, len(data)))import cupy as cp
from cupyx.jit import rawkernel
@rawkernel()
def optimized_gemm_kernel(A, B, C, M, N, K, tile_size=16):
"""Optimized matrix multiplication with tiling"""
# Shared memory tiles
tile_A = cp.cuda.shared_memory(cp.float32, (16, 16))
tile_B = cp.cuda.shared_memory(cp.float32, (16, 16))
# Thread and block indices
tx, ty = cp.cuda.threadIdx.x, cp.cuda.threadIdx.y
bx, by = cp.cuda.blockIdx.x, cp.cuda.blockIdx.y
# Calculate output position
row = by * tile_size + ty
col = bx * tile_size + tx
result = 0.0
# Tile across K dimension
for tile in range((K + tile_size - 1) // tile_size):
# Load tile into shared memory
if row < M and tile * tile_size + tx < K:
tile_A[ty, tx] = A[row * K + tile * tile_size + tx]
else:
tile_A[ty, tx] = 0.0
if col < N and tile * tile_size + ty < K:
tile_B[ty, tx] = B[(tile * tile_size + ty) * N + col]
else:
tile_B[ty, tx] = 0.0
cp.cuda.syncthreads()
# Compute partial result
for k in range(tile_size):
result += tile_A[ty, k] * tile_B[k, tx]
cp.cuda.syncthreads()
# Store result
if row < M and col < N:
C[row * N + col] = result
# Memory coalescing example
@rawkernel()
def coalesced_transpose(input_mat, output_mat, width, height, tile_size=32):
"""Memory-coalesced matrix transpose"""
tile = cp.cuda.shared_memory(cp.float32, (32, 33)) # +1 to avoid bank conflicts
x = cp.cuda.blockIdx.x * tile_size + cp.cuda.threadIdx.x
y = cp.cuda.blockIdx.y * tile_size + cp.cuda.threadIdx.y
# Load tile with coalesced access
if x < width and y < height:
tile[cp.cuda.threadIdx.y, cp.cuda.threadIdx.x] = input_mat[y * width + x]
cp.cuda.syncthreads()
# Transpose coordinates for output
x = cp.cuda.blockIdx.y * tile_size + cp.cuda.threadIdx.x
y = cp.cuda.blockIdx.x * tile_size + cp.cuda.threadIdx.y
# Store with coalesced access
if x < height and y < width:
output_mat[y * height + x] = tile[cp.cuda.threadIdx.x, cp.cuda.threadIdx.y]
# Usage with performance considerations
M, N, K = 2048, 2048, 2048
A = cp.random.randn(M, K).astype(cp.float32)
B = cp.random.randn(K, N).astype(cp.float32)
C = cp.zeros((M, N), dtype=cp.float32)
# Optimized GEMM launch
tile_size = 16
grid_dim = ((N + tile_size - 1) // tile_size, (M + tile_size - 1) // tile_size)
block_dim = (tile_size, tile_size)
optimized_gemm_kernel(grid_dim, block_dim, (A, B, C, M, N, K, tile_size))Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda114