Detect leaked asyncio tasks, threads, and event loop blocking in Python, inspired by Go's goleak package
—
Comprehensive detection of event loop blocking operations with detailed stack trace information. Event loop blocking detection monitors asyncio event loop responsiveness to catch synchronous operations that destroy performance and can cause timeouts.
The primary function for detecting event loop blocking within a specific scope.
def no_event_loop_blocking(
action: LeakAction = LeakAction.WARN,
logger: Optional[logging.Logger] = None,
*,
threshold: float = 0.2,
check_interval: float = 0.05,
caller_context: CallerContext | None = None,
):
"""
Context manager/decorator that detects event loop blocking within its scope.
Args:
action: Action to take when blocking is detected (LeakAction enum or string)
logger: Optional logger instance
threshold: Minimum blocking duration to report (seconds)
check_interval: How often to check for blocks (seconds)
caller_context: Context information for filtering stack traces
Returns:
_EventLoopBlockContextManager: Context manager that can also be used as decorator
Example:
# Basic usage
async def main():
with no_event_loop_blocking(threshold=0.05):
time.sleep(0.1) # This will be detected with stack trace
# Handle blocking with detailed stack information
try:
with no_event_loop_blocking(action="raise"):
requests.get("https://httpbin.org/delay/1") # Synchronous HTTP call
except EventLoopBlockError as e:
print(f"Event loop blocked {e.block_count} times")
print(e.get_block_summary())
# As decorator
@no_event_loop_blocking(action="raise")
async def my_async_function():
requests.get("https://example.com") # Synchronous HTTP call
"""Exception class for event loop blocking errors with detailed information about blocking events.
class EventLoopBlockError(LeakError):
"""
Raised when event loop blocking is detected and action is set to RAISE.
Attributes:
blocking_events: List of EventLoopBlock objects with detailed information
block_count: Number of blocking events detected
"""
def __init__(self, message: str, blocking_events: list[EventLoopBlock])
blocking_events: list[EventLoopBlock]
block_count: int
def get_block_summary(self) -> str:
"""Get a summary of all blocking events."""
def __str__(self) -> str:
"""String representation including blocking event details."""Data class containing detailed information about blocking events.
class EventLoopBlock:
"""
Information about an event loop blocking event.
Attributes:
block_id: Unique identifier for the blocking event
duration: How long the event loop was blocked (seconds)
threshold: The threshold that was exceeded (seconds)
timestamp: When the blocking occurred
blocking_stack: Stack trace showing what code caused the blocking
"""
block_id: int
duration: float
threshold: float
timestamp: float
blocking_stack: list[traceback.FrameSummary] | None = None
def format_blocking_stack(self) -> str:
"""Format the blocking stack trace as a string."""
def __str__(self) -> str:
"""String representation of the blocking event."""Data class for filtering stack traces to relevant code.
class CallerContext:
"""
Context information about a caller for stack trace filtering.
Attributes:
filename: File where the caller is located
name: Function or method name
lineno: Line number (optional)
"""
filename: str
name: str
lineno: int | None = None
def __str__(self) -> str:
"""String representation: filename:name:lineno"""import asyncio
import time
from pyleak import no_event_loop_blocking
async def main():
with no_event_loop_blocking(threshold=0.1):
# This will be detected as blocking
time.sleep(0.2)
# This will not be detected (below threshold)
time.sleep(0.05)import asyncio
import time
from pyleak import EventLoopBlockError, no_event_loop_blocking
async def some_function_with_blocking_code():
print("starting")
time.sleep(1) # Blocking operation
print("done")
async def main():
try:
with no_event_loop_blocking(action="raise", threshold=0.1):
await some_function_with_blocking_code()
except EventLoopBlockError as e:
print(f"Found {e.block_count} blocking events")
for block in e.blocking_events:
print(f"Block {block.block_id}: {block.duration:.3f}s")
print("Stack trace:")
print(block.format_blocking_stack())import asyncio
import requests
import httpx
from pyleak import no_event_loop_blocking
async def test_sync_vs_async_http():
# This will detect blocking
with no_event_loop_blocking(action="warn"):
response = requests.get("https://httpbin.org/delay/1") # Synchronous!
# This will not detect blocking
with no_event_loop_blocking(action="warn"):
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/delay/1") # Asynchronous!import asyncio
from pyleak import EventLoopBlockError, no_event_loop_blocking
async def process_user_data(user_id: int):
"""CPU intensive work that blocks the event loop."""
print(f"Processing user {user_id}...")
return sum(i * i for i in range(100_000_000)) # Blocking computation
async def main():
try:
with no_event_loop_blocking(action="raise", threshold=0.5):
user1 = await process_user_data(1)
user2 = await process_user_data(2)
except EventLoopBlockError as e:
print(f"Found {e.block_count} blocking events")
print("Consider using asyncio.to_thread() or concurrent.futures for CPU-intensive work")# Warn mode (default) - issues ResourceWarning
with no_event_loop_blocking(action="warn"):
time.sleep(0.3)
# Log mode - writes to logger
with no_event_loop_blocking(action="log"):
time.sleep(0.3)
# Cancel mode - warns that blocking can't be cancelled
with no_event_loop_blocking(action="cancel"):
time.sleep(0.3) # Will warn about inability to cancel blocking
# Raise mode - raises EventLoopBlockError
with no_event_loop_blocking(action="raise"):
time.sleep(0.3)from pyleak import no_event_loop_blocking
# Very sensitive detection (low threshold, frequent checks)
with no_event_loop_blocking(threshold=0.01, check_interval=0.001):
time.sleep(0.02) # Will be detected
# Less sensitive detection (higher threshold, less frequent checks)
with no_event_loop_blocking(threshold=1.0, check_interval=0.1):
time.sleep(0.5) # Will not be detected
# Default settings (good for most use cases)
with no_event_loop_blocking(): # threshold=0.2, check_interval=0.05
time.sleep(0.3) # Will be detected@no_event_loop_blocking(action="raise", threshold=0.1)
async def my_async_function():
# Any blocking operations will cause EventLoopBlockError to be raised
time.sleep(0.2) # This will raise an exceptionimport pytest
import time
from pyleak import no_event_loop_blocking, EventLoopBlockError
@pytest.mark.asyncio
async def test_no_blocking():
"""Test that ensures no event loop blocking occurs."""
with pytest.raises(EventLoopBlockError):
with no_event_loop_blocking(action="raise", threshold=0.1):
time.sleep(0.2) # This should be detected
@pytest.mark.asyncio
async def test_proper_async_usage():
"""Test that properly async code doesn't trigger blocking."""
# This should not raise an exception
with no_event_loop_blocking(action="raise", threshold=0.1):
await asyncio.sleep(0.2) # Proper async operationimport asyncio
import time
from pyleak import EventLoopBlockError, no_event_loop_blocking
async def debug_blocking():
"""Example showing how to debug complex blocking scenarios."""
def cpu_intensive_work():
return sum(i * i for i in range(1_000_000))
def io_blocking_work():
time.sleep(0.1)
return "done"
try:
with no_event_loop_blocking(action="raise", threshold=0.05):
# Multiple different types of blocking
result1 = cpu_intensive_work()
result2 = io_blocking_work()
except EventLoopBlockError as e:
print(f"Found {e.block_count} blocking events:")
for i, block in enumerate(e.blocking_events):
print(f"\nBlocking Event {i+1}:")
print(f" Duration: {block.duration:.3f}s")
print(f" Timestamp: {block.timestamp}")
print(" Caused by:")
print(" " + "\n ".join(
block.format_blocking_stack().strip().split("\n")
))
if __name__ == "__main__":
asyncio.run(debug_blocking())import asyncio
import concurrent.futures
from pyleak import no_event_loop_blocking
async def good_async_patterns():
"""Examples of proper async patterns that won't block."""
with no_event_loop_blocking(threshold=0.1):
# Use asyncio.sleep instead of time.sleep
await asyncio.sleep(0.5)
# Use asyncio.to_thread for CPU-intensive work
result = await asyncio.to_thread(lambda: sum(i*i for i in range(1_000_000)))
# Use thread pool executor for blocking I/O
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, time.sleep, 0.5)
# Use async libraries for HTTP requests
async with httpx.AsyncClient() as client:
response = await client.get("https://example.com")Install with Tessl CLI
npx tessl i tessl/pypi-pyleak