Detect leaked asyncio tasks, threads, and event loop blocking in Python, inspired by Go's goleak package
—
Comprehensive detection of asyncio task leaks with detailed stack trace information. Task leak detection helps identify unfinished tasks that could cause memory leaks, prevent graceful shutdown, or make debugging difficult.
The primary function for detecting asyncio task leaks within a specific scope.
def no_task_leaks(
action: Union[LeakAction, str] = LeakAction.WARN,
name_filter: Optional[Union[str, re.Pattern]] = None,
logger: Optional[logging.Logger] = None,
*,
enable_creation_tracking: bool = False,
):
"""
Context manager/decorator that detects task leaks within its scope.
Args:
action: Action to take when leaks are detected ("warn", "log", "cancel", "raise")
name_filter: Optional filter for task names (string or regex)
logger: Optional logger instance
enable_creation_tracking: Whether to enable automatic task creation tracking
Returns:
_AsyncTaskLeakContextManager: Context manager that can also be used as decorator
Example:
# As context manager
async with no_task_leaks():
await some_async_function()
# As decorator
@no_task_leaks()
async def my_function():
await some_async_function()
# Handle exceptions with full stack traces
try:
async with no_task_leaks(action="raise"):
# Code that leaks tasks
pass
except TaskLeakError as e:
print(f"Found {e.task_count} leaked tasks")
for task_info in e.leaked_tasks:
if task_info.task_ref and not task_info.task_ref.done():
task_info.task_ref.cancel()
"""Exception class for task leak errors with detailed information about leaked tasks.
class TaskLeakError(LeakError):
"""
Raised when task leaks are detected and action is set to RAISE.
Attributes:
leaked_tasks: List of LeakedTask objects with detailed information
task_count: Number of leaked tasks detected
"""
def __init__(self, message: str, leaked_tasks: List[LeakedTask])
leaked_tasks: List[LeakedTask]
task_count: int
def get_stack_summary(self) -> str:
"""Get a summary of all stack traces."""
def __str__(self) -> str:
"""String representation including stack traces."""Data class containing detailed information about leaked tasks.
class LeakedTask:
"""
Information about a leaked asyncio task.
Attributes:
task_id: Unique identifier for the task
name: Task name (from task.get_name())
state: Current task state (RUNNING, CANCELLED, DONE)
current_stack: Stack trace showing where task is currently executing
creation_stack: Stack trace showing where task was created (if tracking enabled)
task_ref: Reference to the actual asyncio.Task object
"""
task_id: int
name: str
state: TaskState
current_stack: Optional[List[traceback.FrameSummary]] = None
creation_stack: Optional[List[traceback.FrameSummary]] = None
task_ref: Optional[asyncio.Task] = None
@classmethod
def from_task(cls, task: asyncio.Task) -> "LeakedTask":
"""Create a LeakedTask object from an asyncio.Task."""
def format_current_stack(self) -> str:
"""Format the current stack trace as a string."""
def format_creation_stack(self) -> str:
"""Format the creation stack trace as a string."""
def __str__(self) -> str:
"""String representation of the leaked task."""Enumeration of possible asyncio task states.
class TaskState(str, Enum):
"""State of an asyncio task."""
RUNNING = "running"
CANCELLED = "cancelled"
DONE = "done"import asyncio
from pyleak import no_task_leaks
async def main():
async with no_task_leaks():
# This task will be detected as leaked
asyncio.create_task(asyncio.sleep(10))
await asyncio.sleep(0.1)import asyncio
from pyleak import TaskLeakError, no_task_leaks
async def leaky_function():
async def background_task():
print("background task started")
await asyncio.sleep(10)
print("creating a long running task")
asyncio.create_task(background_task())
async def main():
try:
async with no_task_leaks(action="raise"):
await leaky_function()
except TaskLeakError as e:
print(f"Found {e.task_count} leaked tasks")
print(e.get_stack_summary())async def main():
try:
async with no_task_leaks(action="raise", enable_creation_tracking=True):
await leaky_function()
except TaskLeakError as e:
for task_info in e.leaked_tasks:
print(f"Task: {task_info.name}")
print("Currently executing:")
print(task_info.format_current_stack())
print("Created at:")
print(task_info.format_creation_stack())import re
from pyleak import no_task_leaks
# Filter by exact name
async with no_task_leaks(name_filter="background-worker"):
pass
# Filter by regex pattern
async with no_task_leaks(name_filter=re.compile(r"worker-\d+")):
pass# Warn mode (default) - issues ResourceWarning
async with no_task_leaks(action="warn"):
pass
# Log mode - writes to logger
async with no_task_leaks(action="log"):
pass
# Cancel mode - automatically cancels leaked tasks
async with no_task_leaks(action="cancel"):
pass
# Raise mode - raises TaskLeakError
async with no_task_leaks(action="raise"):
pass@no_task_leaks(action="raise")
async def my_async_function():
# Any leaked tasks will cause TaskLeakError to be raised
await some_async_operation()try:
async with no_task_leaks(action="raise"):
# Code that might leak tasks
pass
except TaskLeakError as e:
print(f"Found {e.task_count} leaked tasks")
# Cancel leaked tasks manually
for task_info in e.leaked_tasks:
if task_info.task_ref and not task_info.task_ref.done():
task_info.task_ref.cancel()
try:
await task_info.task_ref
except asyncio.CancelledError:
passInstall with Tessl CLI
npx tessl i tessl/pypi-pyleak