CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyleak

Detect leaked asyncio tasks, threads, and event loop blocking in Python, inspired by Go's goleak package

Pending
Overview
Eval results
Files

task-detection.mddocs/

AsyncIO Task Detection

Comprehensive detection of asyncio task leaks with detailed stack trace information. Task leak detection helps identify unfinished tasks that could cause memory leaks, prevent graceful shutdown, or make debugging difficult.

Capabilities

Basic Task Leak Detection

The primary function for detecting asyncio task leaks within a specific scope.

def no_task_leaks(
    action: Union[LeakAction, str] = LeakAction.WARN,
    name_filter: Optional[Union[str, re.Pattern]] = None,
    logger: Optional[logging.Logger] = None,
    *,
    enable_creation_tracking: bool = False,
):
    """
    Context manager/decorator that detects task leaks within its scope.

    Args:
        action: Action to take when leaks are detected ("warn", "log", "cancel", "raise")
        name_filter: Optional filter for task names (string or regex)
        logger: Optional logger instance
        enable_creation_tracking: Whether to enable automatic task creation tracking

    Returns:
        _AsyncTaskLeakContextManager: Context manager that can also be used as decorator

    Example:
        # As context manager
        async with no_task_leaks():
            await some_async_function()

        # As decorator
        @no_task_leaks()
        async def my_function():
            await some_async_function()

        # Handle exceptions with full stack traces
        try:
            async with no_task_leaks(action="raise"):
                # Code that leaks tasks
                pass
        except TaskLeakError as e:
            print(f"Found {e.task_count} leaked tasks")
            for task_info in e.leaked_tasks:
                if task_info.task_ref and not task_info.task_ref.done():
                    task_info.task_ref.cancel()
    """

Task Leak Error Handling

Exception class for task leak errors with detailed information about leaked tasks.

class TaskLeakError(LeakError):
    """
    Raised when task leaks are detected and action is set to RAISE.
    
    Attributes:
        leaked_tasks: List of LeakedTask objects with detailed information
        task_count: Number of leaked tasks detected
    """
    def __init__(self, message: str, leaked_tasks: List[LeakedTask])
    
    leaked_tasks: List[LeakedTask]
    task_count: int
    
    def get_stack_summary(self) -> str:
        """Get a summary of all stack traces."""
    
    def __str__(self) -> str:
        """String representation including stack traces."""

Leaked Task Information

Data class containing detailed information about leaked tasks.

class LeakedTask:
    """
    Information about a leaked asyncio task.
    
    Attributes:
        task_id: Unique identifier for the task
        name: Task name (from task.get_name())
        state: Current task state (RUNNING, CANCELLED, DONE)
        current_stack: Stack trace showing where task is currently executing
        creation_stack: Stack trace showing where task was created (if tracking enabled)
        task_ref: Reference to the actual asyncio.Task object
    """
    task_id: int
    name: str
    state: TaskState
    current_stack: Optional[List[traceback.FrameSummary]] = None
    creation_stack: Optional[List[traceback.FrameSummary]] = None
    task_ref: Optional[asyncio.Task] = None

    @classmethod
    def from_task(cls, task: asyncio.Task) -> "LeakedTask":
        """Create a LeakedTask object from an asyncio.Task."""
    
    def format_current_stack(self) -> str:
        """Format the current stack trace as a string."""
    
    def format_creation_stack(self) -> str:
        """Format the creation stack trace as a string."""
    
    def __str__(self) -> str:
        """String representation of the leaked task."""

Task States

Enumeration of possible asyncio task states.

class TaskState(str, Enum):
    """State of an asyncio task."""
    RUNNING = "running"
    CANCELLED = "cancelled"
    DONE = "done"

Usage Examples

Basic Detection

import asyncio
from pyleak import no_task_leaks

async def main():
    async with no_task_leaks():
        # This task will be detected as leaked
        asyncio.create_task(asyncio.sleep(10))
        await asyncio.sleep(0.1)

Exception Handling with Stack Traces

import asyncio
from pyleak import TaskLeakError, no_task_leaks

async def leaky_function():
    async def background_task():
        print("background task started")
        await asyncio.sleep(10)

    print("creating a long running task")
    asyncio.create_task(background_task())

async def main():
    try:
        async with no_task_leaks(action="raise"):
            await leaky_function()
    except TaskLeakError as e:
        print(f"Found {e.task_count} leaked tasks")
        print(e.get_stack_summary())

Creation Stack Tracking

async def main():
    try:
        async with no_task_leaks(action="raise", enable_creation_tracking=True):
            await leaky_function()
    except TaskLeakError as e:
        for task_info in e.leaked_tasks:
            print(f"Task: {task_info.name}")
            print("Currently executing:")
            print(task_info.format_current_stack())
            print("Created at:")
            print(task_info.format_creation_stack())

Name Filtering

import re
from pyleak import no_task_leaks

# Filter by exact name
async with no_task_leaks(name_filter="background-worker"):
    pass

# Filter by regex pattern
async with no_task_leaks(name_filter=re.compile(r"worker-\d+")):
    pass

Action Modes

# Warn mode (default) - issues ResourceWarning
async with no_task_leaks(action="warn"):
    pass

# Log mode - writes to logger
async with no_task_leaks(action="log"):
    pass

# Cancel mode - automatically cancels leaked tasks
async with no_task_leaks(action="cancel"):
    pass

# Raise mode - raises TaskLeakError
async with no_task_leaks(action="raise"):
    pass

Decorator Usage

@no_task_leaks(action="raise")
async def my_async_function():
    # Any leaked tasks will cause TaskLeakError to be raised
    await some_async_operation()

Manual Task Cleanup

try:
    async with no_task_leaks(action="raise"):
        # Code that might leak tasks
        pass
except TaskLeakError as e:
    print(f"Found {e.task_count} leaked tasks")
    
    # Cancel leaked tasks manually
    for task_info in e.leaked_tasks:
        if task_info.task_ref and not task_info.task_ref.done():
            task_info.task_ref.cancel()
            try:
                await task_info.task_ref
            except asyncio.CancelledError:
                pass

Install with Tessl CLI

npx tessl i tessl/pypi-pyleak

docs

blocking-detection.md

index.md

pytest-integration.md

task-detection.md

thread-detection.md

tile.json