CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pylama

Code audit tool for python

Pending
Overview
Eval results
Files

async-processing.mddocs/

Asynchronous Processing

High-performance parallel processing capabilities for large codebases using multiprocessing. Pylama provides asynchronous file checking to significantly improve performance when analyzing many files.

Capabilities

Parallel File Checking

Process multiple files concurrently using a process pool for maximum performance.

def check_async(
    paths: List[str],
    code: str = None,
    options: Namespace = None,
    rootdir: Path = None
) -> List[Error]:
    """
    Check files asynchronously using process pool.
    
    Args:
        paths: List of file paths to check
        code: Source code string (if checking single file with custom code)
        options: Configuration options containing concurrency settings
        rootdir: Root directory for path resolution
        
    Returns:
        List[Error]: All errors found across all files
        
    Performance considerations:
    - Uses ProcessPoolExecutor for true parallelism
    - Automatically determines optimal worker count based on CPU cores
    - Distributes files across workers for load balancing
    - Aggregates results from all workers
    - Significantly faster than sequential checking for multiple files
    """

Worker Function

Individual worker function that processes files in separate processes.

def worker(params):
    """
    Worker function for parallel file processing.
    
    Args:
        params: Tuple containing (path, code, options, rootdir)
        
    Returns:
        List[Error]: Errors found in the processed file
        
    This function runs in a separate process and:
    - Receives serialized parameters
    - Imports pylama.core.run in the worker process
    - Processes a single file
    - Returns serialized results
    """

Configuration

CPU Detection

Automatic detection of available CPU cores for optimal performance.

CPU_COUNT: int
"""
Number of available CPU cores for parallel processing.

Automatically detected using multiprocessing.cpu_count().
Falls back to 1 if multiprocessing is not available or fails.
Used to determine optimal worker pool size.
"""

Enabling Async Processing

Async processing can be enabled through configuration:

  • Command line: --async or --concurrent flags
  • Configuration file: async = 1 or concurrent = 1
  • Programmatic: Set options.concurrent = True

Usage Examples

Basic Async Usage

from typing import List
from pylama.main import check_paths
from pylama.config import parse_options

# Enable async processing via command line options
options = parse_options(['--async', 'src/', 'tests/'])
errors = check_paths(None, options)  # Uses async processing

print(f"Found {len(errors)} issues across all files")

Programmatic Async Control

from typing import List
from pylama.check_async import check_async
from pylama.config import parse_options
from pathlib import Path

# Get list of Python files to check
files = [
    'src/module1.py',
    'src/module2.py', 
    'src/package/__init__.py',
    'src/package/core.py',
    'tests/test_module1.py',
    'tests/test_module2.py'
]

# Configure options
options = parse_options(['--linters=pycodestyle,pyflakes'])

# Run async checking
errors = check_async(
    paths=files,
    options=options,
    rootdir=Path.cwd()
)

# Process results
for error in errors:
    print(f"{error.filename}:{error.lnum} - {error.message}")

Performance Comparison

import time
from typing import List
from pylama.main import check_paths
from pylama.config import parse_options

# Large list of files
files = ['src/' + f'module{i}.py' for i in range(100)]

# Sequential processing
start_time = time.time()
options_seq = parse_options(['--linters=pycodestyle,pyflakes'])
options_seq.concurrent = False
errors_seq = check_paths(files, options_seq)
seq_time = time.time() - start_time

# Async processing  
start_time = time.time()
options_async = parse_options(['--async', '--linters=pycodestyle,pyflakes'])
errors_async = check_paths(files, options_async)
async_time = time.time() - start_time

print(f"Sequential: {seq_time:.2f}s, Async: {async_time:.2f}s")
print(f"Speedup: {seq_time/async_time:.1f}x")

Custom Worker Pool Size

import multiprocessing
from typing import List
from concurrent.futures import ProcessPoolExecutor
from pylama.check_async import worker

def custom_async_check(files, options, max_workers=None):
    """Custom async checking with configurable worker count."""
    
    if max_workers is None:
        max_workers = multiprocessing.cpu_count()
    
    # Prepare parameters for workers
    params_list = [
        (file_path, None, options, Path.cwd())
        for file_path in files
    ]
    
    # Run with custom worker pool
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(worker, params_list))
    
    # Flatten results
    all_errors = []
    for error_list in results:
        all_errors.extend(error_list)
    
    return all_errors

Error Handling in Async Mode

from typing import List
from pylama.check_async import check_async
from pylama.config import parse_options

try:
    files = ['src/valid.py', 'src/invalid_syntax.py']
    options = parse_options(['--linters=pyflakes'])
    
    errors = check_async(files, options=options)
    
    # Separate syntax errors from style issues
    syntax_errors = [e for e in errors if 'SyntaxError' in e.message]
    style_errors = [e for e in errors if 'SyntaxError' not in e.message]
    
    print(f"Syntax errors: {len(syntax_errors)}")
    print(f"Style issues: {len(style_errors)}")
    
except Exception as e:
    print(f"Async processing failed: {e}")
    # Fallback to sequential processing
    from pylama.main import check_paths
    options.concurrent = False
    errors = check_paths(files, options)

Monitoring Progress

import time
from typing import List
from concurrent.futures import ProcessPoolExecutor, as_completed
from pylama.check_async import worker

def check_with_progress(files, options):
    """Async checking with progress monitoring."""
    
    params_list = [
        (file_path, None, options, Path.cwd())
        for file_path in files
    ]
    
    all_errors = []
    
    with ProcessPoolExecutor() as executor:
        # Submit all tasks
        future_to_file = {
            executor.submit(worker, params): params[0]
            for params in params_list
        }
        
        # Process completed tasks
        completed = 0
        for future in as_completed(future_to_file):
            file_path = future_to_file[future]
            try:
                errors = future.result()
                all_errors.extend(errors)
                completed += 1
                print(f"Processed {completed}/{len(files)}: {file_path}")
            except Exception as e:
                print(f"Error processing {file_path}: {e}")
    
    return all_errors

Integration with Configuration

from typing import List
from pylama.config import parse_options
from pylama.main import check_paths

# Configuration file with async settings
config_content = """
[pylama]
async = 1
linters = pycodestyle,pyflakes,mccabe
paths = src/,tests/
"""

# Write config file
with open('pylama.ini', 'w') as f:
    f.write(config_content)

# Load configuration (async will be enabled automatically)
options = parse_options([])
print(f"Async enabled: {options.concurrent}")

# Check files (will use async processing)
errors = check_paths(None, options)

Memory Considerations

import os
import psutil
from typing import List
from pylama.check_async import check_async

def check_with_memory_monitoring(files, options):
    """Monitor memory usage during async processing."""
    
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    print(f"Initial memory usage: {initial_memory:.1f} MB")
    
    # Run async checking
    errors = check_async(files, options=options)
    
    final_memory = process.memory_info().rss / 1024 / 1024  # MB
    print(f"Final memory usage: {final_memory:.1f} MB")
    print(f"Memory increase: {final_memory - initial_memory:.1f} MB")
    
    return errors

Performance Guidelines

When to Use Async Processing

Recommended for:

  • Projects with 50+ Python files
  • CI/CD pipelines with time constraints
  • Large codebases (>10,000 lines of code)
  • Multiple linters enabled simultaneously

Not recommended for:

  • Single file checking
  • Very small projects (<10 files)
  • Memory-constrained environments
  • Systems with limited CPU cores

Optimization Tips

# Optimal configuration for async processing
recommended_options = [
    '--async',                    # Enable async processing
    '--linters=pycodestyle,pyflakes',  # Use fast linters
    '--ignore=E501',              # Ignore non-critical issues
    '--skip=migrations/*,build/*' # Skip non-essential directories
]

options = parse_options(recommended_options)

Troubleshooting Async Issues

from typing import List
from pylama.check_async import CPU_COUNT
import multiprocessing

print(f"Detected CPU cores: {CPU_COUNT}")
print(f"Multiprocessing available: {multiprocessing.cpu_count()}")

# Test worker function
from pylama.check_async import worker
from pylama.config import parse_options

test_params = ('test_file.py', 'print("test")', parse_options([]), Path.cwd())
try:
    result = worker(test_params)
    print(f"Worker test successful: {len(result)} errors")
except Exception as e:
    print(f"Worker test failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-pylama

docs

async-processing.md

configuration.md

error-processing.md

index.md

main-interface.md

plugin-development.md

pytest-integration.md

vcs-hooks.md

tile.json