CuPy is a NumPy/SciPy-compatible array library for GPU-accelerated computing with Python
—
Framework for creating custom GPU kernels in CuPy, enabling high-performance operations tailored to specific needs. Provides ElementwiseKernel for element-wise operations, ReductionKernel for reductions, and RawKernel for arbitrary CUDA code.
class ElementwiseKernel:
"""
User-defined elementwise kernel.
Parameters:
- in_params: string of input parameters
- out_params: string of output parameters
- operation: string of element-wise operation
- name: kernel name
- reduce_dims: whether to reduce dimensions
- options: compiler options
- preamble: code to insert at beginning
- loop_prep: code to insert before loop
- after_loop: code to insert after loop
"""
def __init__(self, in_params, out_params, operation, name='kernel',
reduce_dims=True, options=(), preamble='', loop_prep='',
after_loop='', **kwargs): ...
def __call__(self, *args, **kwargs): ...class ReductionKernel:
"""
User-defined reduction kernel.
Parameters:
- in_params: string of input parameters
- out_params: string of output parameters
- map_expr: mapping expression
- reduce_expr: reduction expression
- post_map_expr: post-mapping expression
- identity: identity value for reduction
- name: kernel name
- reduce_type: type for reduction
- reduce_dims: whether to reduce dimensions
- preamble: code to insert at beginning
- options: compiler options
"""
def __init__(self, in_params, out_params, map_expr, reduce_expr,
post_map_expr='', identity=None, name='kernel',
reduce_type=None, reduce_dims=True, preamble='',
options=(), **kwargs): ...
def __call__(self, *args, **kwargs): ...class RawKernel:
"""
User-defined raw CUDA kernel.
Parameters:
- code: CUDA kernel code
- name: kernel function name
- options: compiler options
- backend: compiler backend ('nvcc' or 'nvrtc')
- translate_cucomplex: translate cuComplex types
- enable_cooperative_groups: enable cooperative groups
- jitify: use Jitify for compilation
"""
def __init__(self, code, name, options=(), backend='nvcc',
translate_cucomplex=True, enable_cooperative_groups=False,
jitify=False, **kwargs): ...
def __call__(self, grid, block, args, **kwargs): ...class RawModule:
"""
User-defined raw CUDA module.
Parameters:
- code: CUDA module code
- options: compiler options
- backend: compiler backend
- translate_cucomplex: translate cuComplex types
- enable_cooperative_groups: enable cooperative groups
- jitify: use Jitify for compilation
"""
def __init__(self, code, options=(), backend='nvcc',
translate_cucomplex=True, enable_cooperative_groups=False,
jitify=False, **kwargs): ...
def get_function(self, name): ...def create_ufunc(name, ops, routine=None, preamble='', doc=None,
default_casting=None, loop_prep='', out_ops=None):
"""
Create universal function from operations.
Parameters:
- name: function name
- ops: operation specifications
- routine: routine function
- preamble: preamble code
- doc: documentation string
- default_casting: default casting rule
- loop_prep: loop preparation code
- out_ops: output operations
Returns:
ufunc: created universal function
"""
def create_reduction_func(name, ops, routine=None, identity=None,
preamble='', **kwargs):
"""
Create reduction function.
Parameters:
- name: function name
- ops: operation specifications
- routine: routine function
- identity: identity value
- preamble: preamble code
Returns:
function: created reduction function
"""def clear_memo():
"""Clear memoization cache."""
def memoize(for_each_device=False):
"""
Memoization decorator.
Parameters:
- for_each_device: memoize for each device separately
Returns:
decorator: memoization decorator
"""import cupy as cp
# Simple element-wise operation
add_kernel = cp.ElementwiseKernel(
'float32 x, float32 y', # input parameters
'float32 z', # output parameters
'z = x + y', # operation
'add_kernel' # kernel name
)
# Use the kernel
a = cp.random.random(1000).astype(cp.float32)
b = cp.random.random(1000).astype(cp.float32)
c = cp.empty(1000, dtype=cp.float32)
add_kernel(a, b, c)
# Verify result
expected = a + b
error = cp.linalg.norm(c - expected)
print(f"Add kernel error: {error}")import cupy as cp
# More complex element-wise operation with conditionals
clamp_kernel = cp.ElementwiseKernel(
'float32 x, float32 min_val, float32 max_val',
'float32 y',
'''
if (x < min_val) {
y = min_val;
} else if (x > max_val) {
y = max_val;
} else {
y = x;
}
''',
'clamp_kernel'
)
# Test the kernel
data = cp.random.uniform(-2, 2, 1000).astype(cp.float32)
result = cp.empty_like(data)
clamp_kernel(data, -1.0, 1.0, result)
# Verify clamping
print(f"Min value: {cp.min(result)}") # Should be >= -1.0
print(f"Max value: {cp.max(result)}") # Should be <= 1.0import cupy as cp
# Kernel with helper functions in preamble
sigmoid_kernel = cp.ElementwiseKernel(
'float32 x',
'float32 y',
'y = sigmoid(x)',
'sigmoid_kernel',
preamble='''
__device__ float sigmoid(float x) {
return 1.0f / (1.0f + expf(-x));
}
'''
)
# Use the kernel
x = cp.linspace(-5, 5, 1000).astype(cp.float32)
y = cp.empty_like(x)
sigmoid_kernel(x, y)
# Compare with CuPy implementation
expected = 1 / (1 + cp.exp(-x))
error = cp.linalg.norm(y - expected)
print(f"Sigmoid kernel error: {error}")import cupy as cp
# Custom sum reduction
sum_kernel = cp.ReductionKernel(
'float32 x', # input
'float32 y', # output
'x', # map expression (identity)
'a + b', # reduce expression
'y = a', # post-map expression
'0', # identity value
'sum_kernel' # kernel name
)
# Test the kernel
data = cp.random.random(10000).astype(cp.float32)
result = sum_kernel(data)
# Compare with built-in sum
expected = cp.sum(data)
error = abs(result - expected)
print(f"Sum kernel error: {error}")import cupy as cp
# Root mean square reduction
rms_kernel = cp.ReductionKernel(
'float32 x',
'float32 y',
'x * x', # map: square each element
'a + b', # reduce: sum squares
'y = sqrt(a / _in_ind.size())', # post: sqrt of mean
'0', # identity
'rms_kernel'
)
# Test the kernel
data = cp.random.random(1000).astype(cp.float32)
rms_result = rms_kernel(data)
# Compare with manual calculation
manual_rms = cp.sqrt(cp.mean(data * data))
error = abs(rms_result - manual_rms)
print(f"RMS kernel error: {error}")import cupy as cp
# Matrix multiplication kernel
matmul_code = '''
extern "C" __global__
void matmul(float* A, float* B, float* C, int M, int N, int K) {
int row = blockIdx.y * blockDim.y + threadIdx.y;
int col = blockIdx.x * blockDim.x + threadIdx.x;
if (row < M && col < N) {
float sum = 0.0f;
for (int k = 0; k < K; k++) {
sum += A[row * K + k] * B[k * N + col];
}
C[row * N + col] = sum;
}
}
'''
matmul_kernel = cp.RawKernel(matmul_code, 'matmul')
# Test the kernel
M, N, K = 512, 512, 512
A = cp.random.random((M, K)).astype(cp.float32)
B = cp.random.random((K, N)).astype(cp.float32)
C = cp.zeros((M, N), dtype=cp.float32)
# Launch kernel
block = (16, 16)
grid = ((N + block[0] - 1) // block[0], (M + block[1] - 1) // block[1])
matmul_kernel(grid, block, (A, B, C, M, N, K))
# Compare with CuPy matmul
expected = cp.dot(A, B)
error = cp.linalg.norm(C - expected) / cp.linalg.norm(expected)
print(f"Matmul kernel relative error: {error}")import cupy as cp
# Convolution kernel with shared memory
conv_code = '''
extern "C" __global__
void conv2d(float* input, float* kernel, float* output,
int height, int width, int kernel_size) {
extern __shared__ float shared_mem[];
int tx = threadIdx.x;
int ty = threadIdx.y;
int bx = blockIdx.x;
int by = blockIdx.y;
int bdx = blockDim.x;
int bdy = blockDim.y;
int row = by * bdy + ty;
int col = bx * bdx + tx;
int shared_width = bdx + kernel_size - 1;
int shared_height = bdy + kernel_size - 1;
// Load data into shared memory (simplified)
if (row < height && col < width) {
shared_mem[ty * shared_width + tx] = input[row * width + col];
}
__syncthreads();
// Perform convolution
if (row < height && col < width) {
float sum = 0.0f;
int half_kernel = kernel_size / 2;
for (int ky = 0; ky < kernel_size; ky++) {
for (int kx = 0; kx < kernel_size; kx++) {
int y_idx = ty + ky;
int x_idx = tx + kx;
if (y_idx < shared_height && x_idx < shared_width) {
sum += shared_mem[y_idx * shared_width + x_idx] *
kernel[ky * kernel_size + kx];
}
}
}
output[row * width + col] = sum;
}
}
'''
conv_kernel = cp.RawKernel(conv_code, 'conv2d')
# Test the kernel (simplified example)
height, width = 256, 256
kernel_size = 3
input_data = cp.random.random((height, width)).astype(cp.float32)
kernel_data = cp.random.random((kernel_size, kernel_size)).astype(cp.float32)
output_data = cp.zeros((height, width), dtype=cp.float32)
# Calculate shared memory size
block = (16, 16)
shared_size = (block[0] + kernel_size - 1) * (block[1] + kernel_size - 1) * 4 # 4 bytes per float
grid = ((width + block[0] - 1) // block[0], (height + block[1] - 1) // block[1])
conv_kernel(grid, block, (input_data, kernel_data, output_data, height, width, kernel_size),
shared_mem=shared_size)import cupy as cp
# Module with multiple related kernels
vector_ops_code = '''
extern "C" __global__
void vector_add(float* a, float* b, float* c, int n) {
int idx = blockDim.x * blockIdx.x + threadIdx.x;
if (idx < n) {
c[idx] = a[idx] + b[idx];
}
}
extern "C" __global__
void vector_mult(float* a, float* b, float* c, int n) {
int idx = blockDim.x * blockIdx.x + threadIdx.x;
if (idx < n) {
c[idx] = a[idx] * b[idx];
}
}
extern "C" __global__
void vector_norm(float* a, float* result, int n) {
extern __shared__ float sdata[];
int tid = threadIdx.x;
int idx = blockDim.x * blockIdx.x + threadIdx.x;
// Load and square
sdata[tid] = (idx < n) ? a[idx] * a[idx] : 0.0f;
__syncthreads();
// Reduction in shared memory
for (int s = blockDim.x / 2; s > 0; s >>= 1) {
if (tid < s) {
sdata[tid] += sdata[tid + s];
}
__syncthreads();
}
// Write result
if (tid == 0) {
atomicAdd(result, sdata[0]);
}
}
'''
vector_module = cp.RawModule(code=vector_ops_code)
# Get individual kernels
add_kernel = vector_module.get_function('vector_add')
mult_kernel = vector_module.get_function('vector_mult')
norm_kernel = vector_module.get_function('vector_norm')
# Test the kernels
n = 1000000
a = cp.random.random(n).astype(cp.float32)
b = cp.random.random(n).astype(cp.float32)
c = cp.zeros(n, dtype=cp.float32)
block_size = 256
grid_size = (n + block_size - 1) // block_size
# Vector addition
add_kernel((grid_size,), (block_size,), (a, b, c, n))
add_expected = a + b
add_error = cp.linalg.norm(c - add_expected)
print(f"Add error: {add_error}")
# Vector multiplication
mult_kernel((grid_size,), (block_size,), (a, b, c, n))
mult_expected = a * b
mult_error = cp.linalg.norm(c - mult_expected)
print(f"Mult error: {mult_error}")
# Vector norm
norm_result = cp.zeros(1, dtype=cp.float32)
shared_mem_size = block_size * 4 # 4 bytes per float
norm_kernel((grid_size,), (block_size,), (a, norm_result, n),
shared_mem=shared_mem_size)
norm_expected = cp.linalg.norm(a)**2
norm_error = abs(float(norm_result) - float(norm_expected))
print(f"Norm error: {norm_error}")import cupy as cp
import time
# Compare ElementwiseKernel vs built-in operations
n = 10000000
# Built-in operation timing
a = cp.random.random(n).astype(cp.float32)
b = cp.random.random(n).astype(cp.float32)
start = time.time()
for _ in range(100):
c_builtin = a + b
cp.cuda.Device().synchronize()
builtin_time = time.time() - start
# Custom kernel timing
add_kernel = cp.ElementwiseKernel(
'float32 x, float32 y',
'float32 z',
'z = x + y',
'custom_add'
)
c_custom = cp.empty_like(a)
start = time.time()
for _ in range(100):
add_kernel(a, b, c_custom)
cp.cuda.Device().synchronize()
custom_time = time.time() - start
print(f"Built-in time: {builtin_time:.4f}s")
print(f"Custom kernel time: {custom_time:.4f}s")
print(f"Speedup: {builtin_time/custom_time:.2f}x")
# Verify correctness
error = cp.linalg.norm(c_builtin - c_custom)
print(f"Accuracy error: {error}")Install with Tessl CLI
npx tessl i tessl/pypi-cupy-cuda12x