CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyleak

Detect leaked asyncio tasks, threads, and event loop blocking in Python, inspired by Go's goleak package

Pending
Overview
Eval results
Files

blocking-detection.mddocs/

Event Loop Blocking Detection

Comprehensive detection of event loop blocking operations with detailed stack trace information. Event loop blocking detection monitors asyncio event loop responsiveness to catch synchronous operations that destroy performance and can cause timeouts.

Capabilities

Basic Event Loop Blocking Detection

The primary function for detecting event loop blocking within a specific scope.

def no_event_loop_blocking(
    action: LeakAction = LeakAction.WARN,
    logger: Optional[logging.Logger] = None,
    *,
    threshold: float = 0.2,
    check_interval: float = 0.05,
    caller_context: CallerContext | None = None,
):
    """
    Context manager/decorator that detects event loop blocking within its scope.

    Args:
        action: Action to take when blocking is detected (LeakAction enum or string)
        logger: Optional logger instance
        threshold: Minimum blocking duration to report (seconds)
        check_interval: How often to check for blocks (seconds)
        caller_context: Context information for filtering stack traces

    Returns:
        _EventLoopBlockContextManager: Context manager that can also be used as decorator

    Example:
        # Basic usage
        async def main():
            with no_event_loop_blocking(threshold=0.05):
                time.sleep(0.1)  # This will be detected with stack trace

        # Handle blocking with detailed stack information
        try:
            with no_event_loop_blocking(action="raise"):
                requests.get("https://httpbin.org/delay/1")  # Synchronous HTTP call
        except EventLoopBlockError as e:
            print(f"Event loop blocked {e.block_count} times")
            print(e.get_block_summary())

        # As decorator
        @no_event_loop_blocking(action="raise")
        async def my_async_function():
            requests.get("https://example.com")  # Synchronous HTTP call
    """

Event Loop Block Error Handling

Exception class for event loop blocking errors with detailed information about blocking events.

class EventLoopBlockError(LeakError):
    """
    Raised when event loop blocking is detected and action is set to RAISE.
    
    Attributes:
        blocking_events: List of EventLoopBlock objects with detailed information
        block_count: Number of blocking events detected
    """
    def __init__(self, message: str, blocking_events: list[EventLoopBlock])
    
    blocking_events: list[EventLoopBlock]
    block_count: int
    
    def get_block_summary(self) -> str:
        """Get a summary of all blocking events."""
    
    def __str__(self) -> str:
        """String representation including blocking event details."""

Event Loop Block Information

Data class containing detailed information about blocking events.

class EventLoopBlock:
    """
    Information about an event loop blocking event.
    
    Attributes:
        block_id: Unique identifier for the blocking event
        duration: How long the event loop was blocked (seconds)
        threshold: The threshold that was exceeded (seconds)
        timestamp: When the blocking occurred
        blocking_stack: Stack trace showing what code caused the blocking
    """
    block_id: int
    duration: float
    threshold: float
    timestamp: float
    blocking_stack: list[traceback.FrameSummary] | None = None

    def format_blocking_stack(self) -> str:
        """Format the blocking stack trace as a string."""
    
    def __str__(self) -> str:
        """String representation of the blocking event."""

Caller Context

Data class for filtering stack traces to relevant code.

class CallerContext:
    """
    Context information about a caller for stack trace filtering.
    
    Attributes:
        filename: File where the caller is located
        name: Function or method name
        lineno: Line number (optional)
    """
    filename: str
    name: str
    lineno: int | None = None

    def __str__(self) -> str:
        """String representation: filename:name:lineno"""

Usage Examples

Basic Detection

import asyncio
import time
from pyleak import no_event_loop_blocking

async def main():
    with no_event_loop_blocking(threshold=0.1):
        # This will be detected as blocking
        time.sleep(0.2)
        
        # This will not be detected (below threshold)
        time.sleep(0.05)

Exception Handling with Stack Traces

import asyncio
import time
from pyleak import EventLoopBlockError, no_event_loop_blocking

async def some_function_with_blocking_code():
    print("starting")
    time.sleep(1)  # Blocking operation
    print("done")

async def main():
    try:
        with no_event_loop_blocking(action="raise", threshold=0.1):
            await some_function_with_blocking_code()
    except EventLoopBlockError as e:
        print(f"Found {e.block_count} blocking events")
        for block in e.blocking_events:
            print(f"Block {block.block_id}: {block.duration:.3f}s")
            print("Stack trace:")
            print(block.format_blocking_stack())

Detecting Synchronous HTTP Calls

import asyncio
import requests
import httpx
from pyleak import no_event_loop_blocking

async def test_sync_vs_async_http():
    # This will detect blocking
    with no_event_loop_blocking(action="warn"):
        response = requests.get("https://httpbin.org/delay/1")  # Synchronous!
        
    # This will not detect blocking  
    with no_event_loop_blocking(action="warn"):
        async with httpx.AsyncClient() as client:
            response = await client.get("https://httpbin.org/delay/1")  # Asynchronous!

CPU Intensive Operations

import asyncio
from pyleak import EventLoopBlockError, no_event_loop_blocking

async def process_user_data(user_id: int):
    """CPU intensive work that blocks the event loop."""
    print(f"Processing user {user_id}...")
    return sum(i * i for i in range(100_000_000))  # Blocking computation

async def main():
    try:
        with no_event_loop_blocking(action="raise", threshold=0.5):
            user1 = await process_user_data(1)
            user2 = await process_user_data(2)
    except EventLoopBlockError as e:
        print(f"Found {e.block_count} blocking events")
        print("Consider using asyncio.to_thread() or concurrent.futures for CPU-intensive work")

Action Modes

# Warn mode (default) - issues ResourceWarning
with no_event_loop_blocking(action="warn"):
    time.sleep(0.3)

# Log mode - writes to logger
with no_event_loop_blocking(action="log"):
    time.sleep(0.3)

# Cancel mode - warns that blocking can't be cancelled
with no_event_loop_blocking(action="cancel"):
    time.sleep(0.3)  # Will warn about inability to cancel blocking

# Raise mode - raises EventLoopBlockError
with no_event_loop_blocking(action="raise"):
    time.sleep(0.3)

Threshold and Check Interval Configuration

from pyleak import no_event_loop_blocking

# Very sensitive detection (low threshold, frequent checks)
with no_event_loop_blocking(threshold=0.01, check_interval=0.001):
    time.sleep(0.02)  # Will be detected

# Less sensitive detection (higher threshold, less frequent checks)
with no_event_loop_blocking(threshold=1.0, check_interval=0.1):
    time.sleep(0.5)  # Will not be detected

# Default settings (good for most use cases)
with no_event_loop_blocking():  # threshold=0.2, check_interval=0.05
    time.sleep(0.3)  # Will be detected

Decorator Usage

@no_event_loop_blocking(action="raise", threshold=0.1)
async def my_async_function():
    # Any blocking operations will cause EventLoopBlockError to be raised
    time.sleep(0.2)  # This will raise an exception

Testing Event Loop Blocking

import pytest
import time
from pyleak import no_event_loop_blocking, EventLoopBlockError

@pytest.mark.asyncio
async def test_no_blocking():
    """Test that ensures no event loop blocking occurs."""
    with pytest.raises(EventLoopBlockError):
        with no_event_loop_blocking(action="raise", threshold=0.1):
            time.sleep(0.2)  # This should be detected

@pytest.mark.asyncio
async def test_proper_async_usage():
    """Test that properly async code doesn't trigger blocking."""
    # This should not raise an exception
    with no_event_loop_blocking(action="raise", threshold=0.1):
        await asyncio.sleep(0.2)  # Proper async operation

Complex Blocking Detection

import asyncio
import time
from pyleak import EventLoopBlockError, no_event_loop_blocking

async def debug_blocking():
    """Example showing how to debug complex blocking scenarios."""
    
    def cpu_intensive_work():
        return sum(i * i for i in range(1_000_000))
    
    def io_blocking_work():
        time.sleep(0.1)
        return "done"
    
    try:
        with no_event_loop_blocking(action="raise", threshold=0.05):
            # Multiple different types of blocking
            result1 = cpu_intensive_work()
            result2 = io_blocking_work()
            
    except EventLoopBlockError as e:
        print(f"Found {e.block_count} blocking events:")
        
        for i, block in enumerate(e.blocking_events):
            print(f"\nBlocking Event {i+1}:")
            print(f"  Duration: {block.duration:.3f}s")
            print(f"  Timestamp: {block.timestamp}")
            print("  Caused by:")
            print("    " + "\n    ".join(
                block.format_blocking_stack().strip().split("\n")
            ))

if __name__ == "__main__":
    asyncio.run(debug_blocking())

Best Practices

import asyncio
import concurrent.futures
from pyleak import no_event_loop_blocking

async def good_async_patterns():
    """Examples of proper async patterns that won't block."""
    
    with no_event_loop_blocking(threshold=0.1):
        # Use asyncio.sleep instead of time.sleep
        await asyncio.sleep(0.5)
        
        # Use asyncio.to_thread for CPU-intensive work
        result = await asyncio.to_thread(lambda: sum(i*i for i in range(1_000_000)))
        
        # Use thread pool executor for blocking I/O
        loop = asyncio.get_event_loop()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            result = await loop.run_in_executor(executor, time.sleep, 0.5)
        
        # Use async libraries for HTTP requests
        async with httpx.AsyncClient() as client:
            response = await client.get("https://example.com")

Install with Tessl CLI

npx tessl i tessl/pypi-pyleak

docs

blocking-detection.md

index.md

pytest-integration.md

task-detection.md

thread-detection.md

tile.json