Python supercharged for fastai development
56
Multi-threading and multi-processing utilities with simplified APIs, progress tracking, and seamless integration with fastcore's functional programming patterns. The parallel module provides enhanced executors and decorators for concurrent execution with improved error handling and debugging support.
Simple decorators to convert functions for concurrent execution with automatic result handling.
def threaded(process=False):
"""
Decorator to run function in Thread or Process.
Converts a function to run asynchronously in a separate thread or process.
The decorated function returns the Thread/Process object with a 'result'
attribute containing the function's return value.
Parameters:
- process: bool, use Process instead of Thread if True (default: False)
Returns:
Decorator that wraps functions for concurrent execution
Usage:
@threaded
def compute(): return expensive_calculation()
@threaded(process=True)
def cpu_intensive(): return heavy_computation()
"""
def startthread(f):
"""
Like threaded, but start thread immediately.
Decorator that immediately starts the thread when the decorated
function is called, rather than requiring manual .start().
Parameters:
- f: function to run in thread
Returns:
Started Thread object with result attribute
"""
def startproc(f):
"""
Like threaded(True), but start Process immediately.
Decorator that immediately starts the process when the decorated
function is called, providing instant execution.
Parameters:
- f: function to run in process
Returns:
Started Process object with result attribute
"""Improved ThreadPoolExecutor and ProcessPoolExecutor with better error handling and serial execution support.
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""
Enhanced ThreadPoolExecutor with serial execution support and error handling.
Extends Python's ThreadPoolExecutor with the ability to run serially
(max_workers=0), better exception handling, and pause functionality
for rate limiting.
Parameters:
- max_workers: int, number of worker threads (0 for serial, None for CPU count)
- on_exc: callable, exception handler function (default: print)
- pause: float, seconds to pause between operations (default: 0)
- **kwargs: additional arguments passed to parent class
Features:
- Serial execution when max_workers=0 (useful for debugging)
- Automatic exception handling and reporting
- Built-in rate limiting with pause parameter
- Thread-safe operation with Manager().Lock()
"""
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): ...
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
"""
Enhanced map with error handling and rate limiting.
Parameters:
- f: function to apply to each item
- items: iterable of items to process
- *args: additional arguments for f
- timeout: float, timeout for operations
- chunksize: int, items per chunk
- **kwargs: keyword arguments for f
Returns:
Iterator of results
"""
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"""
Enhanced ProcessPoolExecutor with serial execution support.
Extends Python's ProcessPoolExecutor with the same enhancements
as ThreadPoolExecutor, including serial execution mode and
improved error handling.
Parameters:
- max_workers: int, number of worker processes (0 for serial)
- on_exc: callable, exception handler function
- pause: float, seconds to pause between operations
- **kwargs: additional arguments passed to parent class
Note: Serial execution (max_workers=0) is particularly useful for
debugging multiprocessing code without the complexity of separate processes.
"""
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): ...
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs): ...Convenient functions for parallel execution with automatic executor management.
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None,
pause=0, method=map, timeout=None, chunksize=1, **kwargs):
"""
Parallel execution of function over items with progress tracking.
High-level interface for parallel processing that handles executor
creation, progress tracking, and result collection automatically.
Parameters:
- f: function to apply to each item
- items: iterable of items to process
- *args: additional positional arguments for f
- n_workers: int, number of workers (0 for serial execution)
- total: int, total items for progress tracking
- progress: bool|callable, progress display (True for default, callable for custom)
- pause: float, seconds between operations
- method: callable, execution method (map, starmap, etc.)
- timeout: float, timeout for operations
- chunksize: int, items per chunk for process pools
- **kwargs: keyword arguments for f
Returns:
L: List of results from applying f to each item
"""
def parallel_async(f, items, *args, **kwargs):
"""
Async version of parallel execution.
Provides asynchronous parallel execution for async functions
with the same interface as the synchronous parallel function.
Parameters:
- f: async function to apply
- items: iterable of items
- *args: positional arguments for f
- **kwargs: keyword arguments (same as parallel)
Returns:
Awaitable that resolves to L of results
"""
def run_procs(f, f_done, args, n_workers=1):
"""
Run processes with completion callbacks.
Execute function in multiple processes with callback functions
that are called when each process completes.
Parameters:
- f: function to run in each process
- f_done: callback function called when process completes
- args: list of argument tuples for each process
- n_workers: int, number of concurrent processes
"""
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"""
Generate items in parallel using class methods.
Create instances of cls by processing items in parallel,
useful for data loading and transformation pipelines.
Parameters:
- cls: class to instantiate
- items: items to process
- n_workers: int, number of workers
- **kwargs: additional arguments for cls constructor
Yields:
Instances of cls created from processed items
"""Helper functions for parallel processing setup and compatibility checking.
def parallelable(param_name, num_workers, f=None):
"""
Check if function can be parallelized in current environment.
Determines whether parallel processing is available considering
platform limitations, notebook environments, and function location.
Parameters:
- param_name: str, name of parameter being checked
- num_workers: int, requested number of workers
- f: function, function to check (optional)
Returns:
bool: True if parallelization is possible
Note: Returns False and prints warning for Windows + Jupyter + main module functions
"""from fastcore.parallel import threaded, startthread, startproc
import time
# Simple threaded function
@threaded
def slow_calculation(x):
time.sleep(1)
return x ** 2
# Start thread and get result later
thread = slow_calculation(5)
thread.start()
# Do other work...
result = thread.result # 25
# Immediate thread execution
@startthread
def background_task():
time.sleep(2)
print("Background task completed")
return "done"
# Thread starts immediately, continues in background
thread = background_task()
# Process-based computation for CPU-intensive work
@threaded(process=True)
def cpu_intensive_task(data):
# Heavy computation that benefits from separate process
return sum(x*x for x in range(data))
proc = cpu_intensive_task(1000000)
proc.start()
result = proc.resultfrom fastcore.parallel import ThreadPoolExecutor, ProcessPoolExecutor
import requests
# Thread pool with error handling and rate limiting
def fetch_url(url):
response = requests.get(url)
return response.status_code
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
# Serial execution for debugging (max_workers=0)
with ThreadPoolExecutor(max_workers=0, pause=0.5) as executor:
results = list(executor.map(fetch_url, urls))
# Parallel execution with error handling
def safe_fetch(url):
try:
return requests.get(url, timeout=5).status_code
except Exception as e:
return f"Error: {e}"
with ThreadPoolExecutor(max_workers=3, on_exc=print) as executor:
results = list(executor.map(safe_fetch, urls))
# Process pool for CPU-intensive tasks
def compute_fibonacci(n):
if n <= 1: return n
return compute_fibonacci(n-1) + compute_fibonacci(n-2)
numbers = [30, 31, 32, 33, 34]
with ProcessPoolExecutor(max_workers=2) as executor:
results = list(executor.map(compute_fibonacci, numbers))from fastcore.parallel import parallel, parallel_gen
from fastcore.foundation import L
import time
# Simple parallel map
def square(x):
time.sleep(0.1) # Simulate work
return x ** 2
numbers = range(10)
results = parallel(square, numbers, n_workers=4)
print(results) # L([0, 1, 4, 9, 16, 25, 36, 49, 64, 81])
# Parallel with progress tracking
from fastai.utils.testing import progress_bar
def slow_process(item):
time.sleep(0.2)
return item * 10
data = range(20)
results = parallel(
slow_process,
data,
n_workers=4,
progress=True, # Shows progress bar
total=len(data)
)
# Serial execution for debugging
debug_results = parallel(
slow_process,
data[:5],
n_workers=0 # Serial execution
)
# Parallel with additional arguments and kwargs
def process_with_params(item, multiplier=1, offset=0):
return item * multiplier + offset
results = parallel(
process_with_params,
numbers,
2, # multiplier argument
n_workers=3,
offset=10 # keyword argument
)from fastcore.parallel import run_procs, parallel_async
import asyncio
# Process with completion callbacks
def worker_func(data_chunk):
# Process chunk of data
result = sum(x*x for x in data_chunk)
return result
def completion_callback(result):
print(f"Chunk processed with result: {result}")
# Split work into chunks
data_chunks = [range(i*1000, (i+1)*1000) for i in range(4)]
run_procs(
worker_func,
completion_callback,
data_chunks,
n_workers=2
)
# Async parallel processing
async def async_fetch(url):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["https://httpbin.org/uuid" for _ in range(5)]
results = await parallel_async(async_fetch, urls, n_workers=3)
return results
# Run async parallel
results = asyncio.run(main())
# Parallel object generation
class DataProcessor:
def __init__(self, raw_data):
self.processed = self.expensive_process(raw_data)
def expensive_process(self, data):
# Simulate expensive processing
time.sleep(0.1)
return data.upper() if isinstance(data, str) else str(data)
raw_items = ["hello", "world", "fastcore", "parallel"]
# Process items in parallel to create objects
processors = list(parallel_gen(
DataProcessor,
raw_items,
n_workers=2
))
for proc in processors:
print(proc.processed)from fastcore.parallel import parallel, ThreadPoolExecutor
import random
def risky_function(x):
if random.random() < 0.2: # 20% chance of error
raise ValueError(f"Random error with {x}")
return x * 2
# Custom error handler
def log_error(exc):
print(f"Caught exception: {type(exc).__name__}: {exc}")
# Parallel with error handling
try:
results = parallel(
risky_function,
range(20),
n_workers=4
)
except Exception as e:
print(f"Parallel execution failed: {e}")
# Using executor with custom error handler
with ThreadPoolExecutor(max_workers=4, on_exc=log_error) as executor:
# Errors are logged but don't stop processing
results = list(executor.map(risky_function, range(20)))
# Serial debugging mode
def debug_function(x):
print(f"Processing {x}")
if x == 5:
breakpoint() # Debugger will work in serial mode
return x ** 2
# Debug with serial execution
debug_results = parallel(
debug_function,
range(10),
n_workers=0 # Serial - debugger friendly
)from fastcore.parallel import parallel
from fastcore.foundation import L
from fastcore.basics import listify
# Parallel processing with L collections
data = L(range(100))
# Parallel map maintaining L type
squared = data.map(lambda x: x**2) # Serial map
parallel_squared = parallel(lambda x: x**2, data, n_workers=4) # Parallel
# Filter then parallel process
filtered_data = data.filter(lambda x: x % 2 == 0)
results = parallel(
lambda x: x * 3,
filtered_data,
n_workers=3
)
# Parallel processing with complex transformations
def complex_transform(item):
# Simulate complex processing
import time
time.sleep(0.01)
return {
'original': item,
'squared': item ** 2,
'cubed': item ** 3
}
# Process in parallel, convert back to L
transformed = L(parallel(
complex_transform,
data[:20],
n_workers=4
))
# Extract specific fields in parallel
originals = parallel(lambda x: x['original'], transformed, n_workers=2)
squares = parallel(lambda x: x['squared'], transformed, n_workers=2)Install with Tessl CLI
npx tessl i tessl/pypi-fastcoredocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10