Python helpers to limit the number of threads used in threadpool-backed native libraries for scientific computing
Temporarily limit the number of threads used by thread pool libraries using context managers or decorators. This capability is essential for preventing oversubscription in workloads with nested parallelism.
The primary interface for temporarily limiting thread pools.
class threadpool_limits:
"""
Context manager and decorator for limiting thread pool sizes.
Can be used as:
1. Context manager: with threadpool_limits(limits=1): ...
2. Direct call: limiter = threadpool_limits(limits=1)
3. Decorator: @threadpool_limits(limits=1)
Parameters:
limits: int | dict | str | None
Thread limit specification:
- int: Set global limit for all selected libraries
- dict: Per-API or per-library limits {api/prefix: limit}
- 'sequential_blas_under_openmp': Special case for nested parallelism
- None: No limits applied (no-op)
user_api: str | None
API type to limit when limits is int:
- 'blas': Limit only BLAS libraries
- 'openmp': Limit only OpenMP libraries
- None: Limit all detected libraries
"""
def __init__(self, limits=None, user_api=None): ...
def __enter__(self):
"""Enter context manager, returns self."""
def __exit__(self, type, value, traceback):
"""Exit context manager, restores original limits."""
def restore_original_limits(self):
"""Manually restore original thread limits."""
def unregister(self):
"""Alias for restore_original_limits() (backward compatibility)."""
def get_original_num_threads(self):
"""
Get original thread counts before limiting.
Returns:
dict[str, int]: Original thread counts by user_api
"""
@classmethod
def wrap(cls, limits=None, user_api=None):
"""
Create decorator version that delays limit setting.
Returns:
Decorator function for use with @threadpool_limits.wrap(...)
"""from threadpoolctl import threadpool_limits
import numpy as np
# Global thread limiting
with threadpool_limits(limits=1):
# All thread pools limited to 1 thread
result = np.dot(large_matrix_a, large_matrix_b)
# API-specific limiting
with threadpool_limits(limits=2, user_api='blas'):
# Only BLAS libraries limited to 2 threads
# OpenMP libraries keep their original limits
result = np.linalg.solve(A, b)
# Per-library limiting using dict
with threadpool_limits(limits={'libmkl_rt': 1, 'libgomp': 4}):
# MKL limited to 1 thread, GNU OpenMP to 4 threads
result = compute_intensive_operation()
# Special case for nested parallelism
with threadpool_limits(limits='sequential_blas_under_openmp'):
# Automatically handle BLAS/OpenMP interaction
with parallel_backend('threading', n_jobs=4): # scikit-learn example
result = some_parallel_computation()from threadpoolctl import threadpool_limits
@threadpool_limits(limits=1)
def single_threaded_computation():
"""This function always runs with 1 thread."""
return np.linalg.eigvals(large_matrix)
@threadpool_limits(limits=2, user_api='blas')
def blas_limited_computation():
"""BLAS operations limited to 2 threads."""
return np.dot(A, B) + np.linalg.inv(C)
# Using the wrap class method
@threadpool_limits.wrap(limits={'openblas': 1})
def openblas_sequential():
"""OpenBLAS operations run sequentially."""
return np.fft.fft2(image_data)from threadpoolctl import threadpool_limits, threadpool_info
# Conditional limiting based on current state
current_info = threadpool_info()
if any(lib['num_threads'] > 8 for lib in current_info):
limits = 4 # Reduce high thread counts
else:
limits = None # No limiting needed
with threadpool_limits(limits=limits):
result = expensive_computation()
# Nested limiting (inner limits override outer)
with threadpool_limits(limits=4): # Outer limit
result1 = computation1()
with threadpool_limits(limits=1): # Inner limit overrides
result2 = computation2() # Runs with 1 thread
result3 = computation3() # Back to 4 threads
# Manual control with restore
limiter = threadpool_limits(limits=1)
try:
result = computation()
finally:
limiter.restore_original_limits()from threadpoolctl import threadpool_limits
# Context manager handles errors gracefully
try:
with threadpool_limits(limits=1):
result = potentially_failing_computation()
except ComputationError:
# Thread limits are still restored even if computation fails
pass
# Check original limits before/after
limiter = threadpool_limits(limits=2)
original = limiter.get_original_num_threads()
print(f"Original BLAS threads: {original.get('blas', 'N/A')}")
print(f"Original OpenMP threads: {original.get('openmp', 'N/A')}")For nested parallelism scenarios where outer parallelism uses OpenMP and inner operations use BLAS:
from threadpoolctl import threadpool_limits
# Automatically handles BLAS/OpenMP interaction
with threadpool_limits(limits='sequential_blas_under_openmp'):
# Special logic:
# - If OpenBLAS with OpenMP threading: no limits applied
# - Otherwise: BLAS libraries limited to 1 thread
parallel_workload_with_blas_operations()from threadpoolctl import threadpool_limits, threadpool_info
# Get specific library prefixes
info = threadpool_info()
mkl_prefix = next((lib['prefix'] for lib in info if lib['internal_api'] == 'mkl'), None)
if mkl_prefix:
with threadpool_limits(limits={mkl_prefix: 1}):
# Only MKL limited, other libraries unchanged
result = computation()Install with Tessl CLI
npx tessl i tessl/pypi-threadpoolctl