CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eventlet

Highly concurrent networking library

Pending
Overview
Eval results
Files

debugging.mddocs/

Debugging

Tools for debugging, introspection, and monitoring eventlet applications including tracing, hub inspection, and performance analysis. These utilities help diagnose issues in concurrent greenthread applications.

Capabilities

Execution Tracing

Tools for tracing greenthread execution and function calls.

def spew():
    """
    Install a detailed tracing hook that prints every function call.
    Useful for debugging greenthread execution flow.
    
    Returns:
    None
    
    Note:
    Produces very verbose output. Use unspew() to disable.
    """

def unspew():
    """
    Remove the tracing hook installed by spew().
    
    Returns:
    None
    """

Hub Inspection

Tools for inspecting the event hub's internal state.

def format_hub_listeners():
    """
    Format the current hub's file descriptor listeners as a string.
    
    Returns:
    str: formatted information about active listeners
    """

def format_hub_timers():
    """
    Format the current hub's timers as a string.
    
    Returns:
    str: formatted information about scheduled timers
    """

def hub_listener_stacks(enabled):
    """
    Toggle recording of stack traces for hub listeners.
    
    Parameters:
    - enabled: bool, whether to record listener stack traces
    
    Returns:
    None
    """

def hub_timer_stacks(enabled):
    """
    Toggle recording of stack traces for hub timers.
    
    Parameters:
    - enabled: bool, whether to record timer stack traces
    
    Returns:
    None
    """

def hub_exceptions(enabled):
    """
    Toggle printing of exceptions that occur in hub timers.
    
    Parameters:
    - enabled: bool, whether to print hub timer exceptions
    
    Returns:
    None
    """

def hub_prevent_multiple_readers(enabled):
    """
    Toggle prevention of multiple readers on the same file descriptor.
    
    Parameters:
    - enabled: bool, whether to prevent multiple readers
    
    Returns:
    None
    """

def hub_blocking_detection(enabled, resolution=1):
    """
    Toggle detection of blocking behavior in the hub.
    
    Parameters:
    - enabled: bool, whether to enable blocking detection
    - resolution: float, detection resolution in seconds
    
    Returns:
    None
    """

Thread Pool Debugging

Debug information for thread pool operations.

def tpool_exceptions(enabled):
    """
    Toggle printing of exceptions that occur in tpool operations.
    
    Parameters:
    - enabled: bool, whether to print tpool exceptions
    
    Returns:
    None
    """

System Information

Functions for gathering system and runtime information.

def format_asyncio_info():
    """
    Format information about asyncio event loop and tasks.
    
    Returns:
    str: formatted asyncio debugging information
    """

def format_threads_info():
    """
    Format information about active threads.
    
    Returns:
    str: formatted thread debugging information
    """

Interactive Debugging

Backdoor server for interactive debugging of running applications.

def backdoor_server(sock, locals=None):
    """
    Run a backdoor server on the given socket for interactive debugging.
    
    Parameters:
    - sock: listening socket for backdoor connections
    - locals: dict, local variables available in backdoor session
    
    Returns:
    None
    """

def backdoor(port, host='127.0.0.1', locals=None):
    """
    Set up an interactive console backdoor on the specified port.
    
    Parameters:
    - port: int, port number for backdoor server
    - host: str, host address to bind to
    - locals: dict, local variables available in backdoor session
    
    Returns:
    None
    """

Usage Examples

Basic Debugging Setup

import eventlet
import eventlet.debug
import time

def problematic_function():
    """Function that might have issues"""
    print("Starting problematic function")
    
    # Simulate some work
    eventlet.sleep(1.0)
    
    # Simulate a potential issue
    for i in range(1000000):
        if i % 100000 == 0:
            eventlet.sleep(0)  # Yield control
    
    print("Problematic function completed")

def debug_basic_example():
    """Basic debugging example"""
    
    print("=== Basic Debugging Example ===")
    
    # Enable various debugging features
    print("Enabling hub exception printing...")
    eventlet.debug.hub_exceptions(True)
    
    print("Enabling tpool exception printing...")
    eventlet.debug.tpool_exceptions(True)
    
    print("Enabling hub blocking detection...")
    eventlet.debug.hub_blocking_detection(True, resolution=0.1)
    
    # Run some potentially problematic code
    def worker(worker_id):
        """Worker that might block"""
        try:
            print(f"Worker {worker_id} starting")
            
            # This might cause blocking detection to trigger
            time.sleep(0.2)  # Intentional blocking call
            
            # Some eventlet operations
            eventlet.sleep(0.5)
            
            print(f"Worker {worker_id} completed")
        except Exception as e:
            print(f"Worker {worker_id} error: {e}")
    
    # Start workers
    greenthreads = []
    for i in range(3):
        gt = eventlet.spawn(worker, i+1)
        greenthreads.append(gt)
    
    # Wait for completion
    for gt in greenthreads:
        gt.wait()
    
    print("Basic debugging example completed")

if __name__ == "__main__":
    debug_basic_example()

Hub Inspection and Monitoring

import eventlet
import eventlet.debug
import time

def hub_monitoring_example():
    """Example of monitoring hub state"""
    
    print("=== Hub Monitoring Example ===")
    
    # Enable stack trace recording
    eventlet.debug.hub_listener_stacks(True)
    eventlet.debug.hub_timer_stacks(True)
    
    def network_worker(worker_id, host, port):
        """Worker that creates network listeners"""
        try:
            print(f"Network worker {worker_id} connecting to {host}:{port}")
            
            # This creates a listener in the hub
            sock = eventlet.connect((host, port))
            
            # Send some data
            sock.send(b"Hello from worker " + str(worker_id).encode())
            
            # Receive response
            response = sock.recv(1024)
            print(f"Worker {worker_id} received: {response}")
            
            sock.close()
            
        except Exception as e:
            print(f"Network worker {worker_id} error: {e}")
    
    def timer_worker(worker_id, delays):
        """Worker that creates timers"""
        print(f"Timer worker {worker_id} starting")
        
        for delay in delays:
            print(f"Timer worker {worker_id} sleeping for {delay}s")
            eventlet.sleep(delay)
        
        print(f"Timer worker {worker_id} completed")
    
    def monitor_hub():
        """Monitor hub state periodically"""
        for i in range(10):
            eventlet.sleep(0.5)
            
            print(f"\n--- Hub Status (iteration {i+1}) ---")
            
            # Show hub listeners
            listeners_info = eventlet.debug.format_hub_listeners()
            if listeners_info.strip():
                print("Active Listeners:")
                print(listeners_info)
            else:
                print("No active listeners")
            
            # Show hub timers
            timers_info = eventlet.debug.format_hub_timers()
            if timers_info.strip():
                print("Active Timers:")
                print(timers_info)
            else:
                print("No active timers")
    
    # Start monitoring
    eventlet.spawn(monitor_hub)
    
    # Start some network workers (these will likely fail but create listeners)
    for i in range(2):
        eventlet.spawn(network_worker, i+1, 'httpbin.org', 80)
    
    # Start timer workers
    timer_delays = [[1.0, 0.5], [0.3, 1.2, 0.8]]
    for i, delays in enumerate(timer_delays):
        eventlet.spawn(timer_worker, i+1, delays)
    
    # Let everything run
    eventlet.sleep(6)
    
    print("\nHub monitoring completed")

if __name__ == "__main__":
    hub_monitoring_example()

Performance Analysis with Tracing

import eventlet
import eventlet.debug
import time

def performance_analysis_example():
    """Example of performance analysis with tracing"""
    
    print("=== Performance Analysis Example ===")
    
    def slow_function(duration):
        """Function that takes some time"""
        print(f"Starting slow function ({duration}s)")
        start = time.time()
        
        # Mix of blocking and non-blocking operations
        eventlet.sleep(duration / 2)  # Non-blocking
        
        # Simulate CPU work (this might show up in traces)
        for i in range(int(duration * 100000)):
            if i % 10000 == 0:
                eventlet.sleep(0)  # Yield occasionally
        
        elapsed = time.time() - start
        print(f"Slow function completed in {elapsed:.2f}s")
        return elapsed
    
    def traced_worker(worker_id):
        """Worker that we'll trace"""
        print(f"Traced worker {worker_id} starting")
        
        # Do some work that we want to trace
        result1 = slow_function(0.5)
        result2 = slow_function(0.3)
        
        total = result1 + result2
        print(f"Traced worker {worker_id} total time: {total:.2f}s")
        
        return total
    
    def untraced_worker(worker_id):
        """Worker that runs without tracing"""
        print(f"Untraced worker {worker_id} starting")
        
        result1 = slow_function(0.4)
        result2 = slow_function(0.2)
        
        total = result1 + result2
        print(f"Untraced worker {worker_id} total time: {total:.2f}s")
        
        return total
    
    # Run without tracing first
    print("Running workers without tracing...")
    start_time = time.time()
    
    untraced_gts = []
    for i in range(2):
        gt = eventlet.spawn(untraced_worker, i+1)
        untraced_gts.append(gt)
    
    for gt in untraced_gts:
        gt.wait()
    
    untraced_time = time.time() - start_time
    print(f"Untraced execution took {untraced_time:.2f}s")
    
    print("\n" + "="*50)
    print("Now running with tracing enabled...")
    print("WARNING: This will produce very verbose output!")
    
    # Enable tracing for detailed analysis
    eventlet.debug.spew()
    
    start_time = time.time()
    
    traced_gts = []
    for i in range(1):  # Fewer workers due to verbose output
        gt = eventlet.spawn(traced_worker, i+1)
        traced_gts.append(gt)
    
    for gt in traced_gts:
        gt.wait()
    
    # Disable tracing
    eventlet.debug.unspew()
    
    traced_time = time.time() - start_time
    print(f"Traced execution took {traced_time:.2f}s")
    
    print("Performance analysis completed")

if __name__ == "__main__":
    performance_analysis_example()

Interactive Debugging Backdoor

import eventlet
import eventlet.debug
import threading
import time

# Global application state for debugging
app_state = {
    'active_connections': 0,
    'total_requests': 0,
    'errors': [],
    'start_time': time.time()
}

def web_server_simulation():
    """Simulate a web server for debugging"""
    
    def handle_request(request_id):
        """Simulate handling a web request"""
        app_state['active_connections'] += 1
        app_state['total_requests'] += 1
        
        try:
            # Simulate request processing
            processing_time = eventlet.random.uniform(0.1, 2.0)
            eventlet.sleep(processing_time)
            
            # Occasionally simulate an error
            if eventlet.random.random() < 0.1:
                raise ValueError(f"Simulated error in request {request_id}")
            
            print(f"Request {request_id} completed in {processing_time:.2f}s")
            
        except Exception as e:
            error_info = {
                'request_id': request_id,
                'error': str(e),
                'timestamp': time.time()
            }
            app_state['errors'].append(error_info)
            print(f"Request {request_id} failed: {e}")
        
        finally:
            app_state['active_connections'] -= 1
    
    # Simulate incoming requests
    request_id = 0
    while True:
        request_id += 1
        eventlet.spawn(handle_request, request_id)
        
        # Random delay between requests
        eventlet.sleep(eventlet.random.uniform(0.1, 0.5))

def backdoor_debugging_example():
    """Example using backdoor for interactive debugging"""
    
    print("=== Interactive Debugging Backdoor Example ===")
    print("Starting web server simulation...")
    
    # Start the simulated web server
    eventlet.spawn(web_server_simulation)
    
    # Set up debugging backdoor
    backdoor_port = 9999
    backdoor_locals = {
        'app_state': app_state,
        'eventlet': eventlet,
        'debug': eventlet.debug,
        'time': time
    }
    
    print(f"Starting debugging backdoor on port {backdoor_port}")
    print("Connect with: telnet localhost 9999")
    print("\nUseful debugging commands to try in the backdoor:")
    print("  app_state                    # View application state")
    print("  app_state['total_requests']  # Get request count") 
    print("  len(app_state['errors'])     # Get error count")
    print("  debug.format_hub_timers()    # View hub timers")
    print("  debug.format_hub_listeners() # View hub listeners")
    print("  [error['error'] for error in app_state['errors'][-5:]]  # Recent errors")
    print("\nPress Ctrl+C to stop the server")
    
    try:
        # Start backdoor server
        eventlet.debug.backdoor(backdoor_port, locals=backdoor_locals)
    
    except KeyboardInterrupt:
        print("\nShutting down server...")
    
    # Print final statistics
    uptime = time.time() - app_state['start_time']
    print(f"\nFinal Statistics:")
    print(f"  Uptime: {uptime:.1f} seconds")
    print(f"  Total requests: {app_state['total_requests']}")
    print(f"  Active connections: {app_state['active_connections']}")
    print(f"  Total errors: {len(app_state['errors'])}")

if __name__ == "__main__":
    # Note: This example requires manual interaction via telnet
    # Run: python script.py
    # Then in another terminal: telnet localhost 9999
    backdoor_debugging_example()

System Information and Diagnostics

import eventlet
import eventlet.debug
import threading
import asyncio
import time

def system_diagnostics_example():
    """Example of gathering system diagnostic information"""
    
    print("=== System Diagnostics Example ===")
    
    def background_worker(worker_type, worker_id):
        """Background worker for diagnostics"""
        if worker_type == 'greenthread':
            for i in range(5):
                print(f"Greenthread worker {worker_id} iteration {i+1}")
                eventlet.sleep(1.0)
        
        elif worker_type == 'thread':
            for i in range(3):
                print(f"Thread worker {worker_id} iteration {i+1}")
                time.sleep(1.0)  # Blocking sleep
    
    def async_worker():
        """Asyncio worker (if available)"""
        try:
            import asyncio
            
            async def async_task():
                for i in range(3):
                    print(f"Async worker iteration {i+1}")
                    await asyncio.sleep(0.5)
            
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop.run_until_complete(async_task())
            loop.close()
            
        except Exception as e:
            print(f"Async worker error: {e}")
    
    def collect_diagnostics():
        """Collect comprehensive diagnostic information"""
        print("\n--- Collecting System Diagnostics ---")
        
        # Eventlet hub information
        print("\n1. Eventlet Hub Status:")
        listeners = eventlet.debug.format_hub_listeners()
        timers = eventlet.debug.format_hub_timers()
        
        if listeners.strip():
            print("Hub Listeners:")
            print(listeners)
        else:
            print("No active hub listeners")
        
        if timers.strip():
            print("Hub Timers:")
            print(timers)
        else:
            print("No active hub timers")
        
        # Thread information
        print("\n2. Thread Information:")
        thread_info = eventlet.debug.format_threads_info()
        if thread_info.strip():
            print(thread_info)
        else:
            print("No additional thread information available")
        
        # Asyncio information (if applicable)
        print("\n3. Asyncio Information:")
        try:
            asyncio_info = eventlet.debug.format_asyncio_info()
            if asyncio_info.strip():
                print(asyncio_info)
            else:
                print("No asyncio information available")
        except Exception as e:
            print(f"Error getting asyncio info: {e}")
        
        # Python threading information
        print("\n4. Python Threading:")
        active_threads = threading.active_count()
        thread_names = [t.name for t in threading.enumerate()]
        print(f"Active threads: {active_threads}")
        print(f"Thread names: {thread_names}")
        
        # Greenlet information
        print("\n5. Greenlet Information:")
        try:
            import greenlet
            current = greenlet.getcurrent()
            print(f"Current greenlet: {current}")
            print(f"Greenlet parent: {current.parent}")
        except Exception as e:
            print(f"Error getting greenlet info: {e}")
    
    # Start various types of workers
    print("Starting background workers for diagnostics...")
    
    # Greenthread workers
    for i in range(2):
        eventlet.spawn(background_worker, 'greenthread', i+1)
    
    # Regular thread workers
    for i in range(1):
        thread = threading.Thread(target=background_worker, args=('thread', i+1))
        thread.start()
    
    # Asyncio worker in separate thread
    async_thread = threading.Thread(target=async_worker)
    async_thread.start()
    
    # Collect diagnostics periodically
    for i in range(3):
        eventlet.sleep(2.0)
        collect_diagnostics()
        print("\n" + "="*60)
    
    print("System diagnostics completed")

def memory_leak_detection():
    """Example of detecting potential memory leaks"""
    
    print("=== Memory Leak Detection Example ===")
    
    import gc
    import sys
    
    def leaky_function():
        """Function that might create memory leaks"""
        # Create some objects that might not be cleaned up
        data = list(range(10000))
        
        def closure():
            return sum(data)  # Closure captures data
        
        # Store closure somewhere it might persist
        leaky_function.closures = getattr(leaky_function, 'closures', [])
        leaky_function.closures.append(closure)
        
        return closure()
    
    def monitor_memory():
        """Monitor memory usage and object counts"""
        print("Monitoring memory usage...")
        
        initial_objects = len(gc.get_objects())
        print(f"Initial object count: {initial_objects}")
        
        for iteration in range(5):
            # Run potentially leaky code
            for i in range(100):
                result = leaky_function()
                eventlet.sleep(0.01)
            
            # Force garbage collection
            collected = gc.collect()
            
            # Check object counts
            current_objects = len(gc.get_objects())
            object_growth = current_objects - initial_objects
            
            print(f"Iteration {iteration+1}:")
            print(f"  Objects: {current_objects} (+{object_growth})")
            print(f"  Collected: {collected}")
            
            if hasattr(leaky_function, 'closures'):
                print(f"  Closures: {len(leaky_function.closures)}")
            
            eventlet.sleep(1.0)
        
        # Show object types that have grown
        print("\nObject type analysis:")
        type_counts = {}
        for obj in gc.get_objects():
            obj_type = type(obj).__name__
            type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
        
        # Show top object types
        top_types = sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:10]
        for obj_type, count in top_types:
            print(f"  {obj_type}: {count}")
    
    monitor_memory()

if __name__ == "__main__":
    print("Choose example to run:")
    print("1. System Diagnostics")
    print("2. Memory Leak Detection")  
    
    choice = input("Enter choice (1 or 2): ").strip()
    
    if choice == "1":
        system_diagnostics_example()
    elif choice == "2":
        memory_leak_detection()
    else:
        print("Running system diagnostics by default...")
        system_diagnostics_example()

Custom Debug Utilities

import eventlet
import eventlet.debug
import time
import functools

class EventletProfiler:
    """Custom profiler for eventlet applications"""
    
    def __init__(self):
        self.call_counts = {}
        self.call_times = {}
        self.start_times = {}
    
    def profile_function(self, func):
        """Decorator to profile function calls"""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            func_name = f"{func.__module__}.{func.__name__}"
            
            # Update call count
            self.call_counts[func_name] = self.call_counts.get(func_name, 0) + 1
            
            # Record start time
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                # Record total time
                elapsed = time.time() - start_time
                if func_name not in self.call_times:
                    self.call_times[func_name] = 0
                self.call_times[func_name] += elapsed
        
        return wrapper
    
    def get_stats(self):
        """Get profiling statistics"""
        stats = []
        for func_name in self.call_counts:
            stats.append({
                'function': func_name,
                'calls': self.call_counts[func_name],
                'total_time': self.call_times.get(func_name, 0),
                'avg_time': self.call_times.get(func_name, 0) / self.call_counts[func_name]
            })
        
        # Sort by total time
        stats.sort(key=lambda x: x['total_time'], reverse=True)
        return stats
    
    def print_stats(self):
        """Print profiling statistics"""
        stats = self.get_stats()
        
        print("\n=== Profiling Statistics ===")
        print(f"{'Function':<40} {'Calls':<8} {'Total':<10} {'Avg':<10}")
        print("-" * 70)
        
        for stat in stats[:10]:  # Top 10
            print(f"{stat['function']:<40} {stat['calls']:<8} "
                  f"{stat['total_time']:<10.3f} {stat['avg_time']:<10.3f}")

def custom_debug_example():
    """Example of custom debugging utilities"""
    
    print("=== Custom Debug Utilities Example ===")
    
    profiler = EventletProfiler()
    
    @profiler.profile_function
    def fast_operation():
        """Fast operation"""
        eventlet.sleep(0.1)
        return "fast result"
    
    @profiler.profile_function  
    def slow_operation():
        """Slow operation"""
        eventlet.sleep(0.5)
        return "slow result"
    
    @profiler.profile_function
    def cpu_operation():
        """CPU-intensive operation"""
        result = 0
        for i in range(100000):
            result += i
            if i % 10000 == 0:
                eventlet.sleep(0)  # Yield
        return result
    
    def worker(worker_id, operations):
        """Worker that performs various operations"""
        print(f"Worker {worker_id} starting")
        
        for op_type in operations:
            if op_type == 'fast':
                result = fast_operation()
            elif op_type == 'slow':
                result = slow_operation()
            elif op_type == 'cpu':
                result = cpu_operation()
            
            print(f"Worker {worker_id} completed {op_type} operation")
        
        print(f"Worker {worker_id} finished")
    
    # Define work for each worker
    worker_operations = [
        ['fast', 'slow', 'cpu'],
        ['fast', 'fast', 'slow'],
        ['cpu', 'fast', 'cpu'],
        ['slow', 'cpu', 'fast']
    ]
    
    print("Starting profiled workers...")
    
    # Start workers
    greenthreads = []
    for i, operations in enumerate(worker_operations):
        gt = eventlet.spawn(worker, i+1, operations)
        greenthreads.append(gt)
    
    # Wait for completion
    for gt in greenthreads:
        gt.wait()
    
    # Print profiling results
    profiler.print_stats()

if __name__ == "__main__":
    custom_debug_example()

Debugging Best Practices

Enable Debugging Early

import eventlet.debug

# Enable debugging at application startup
def setup_debugging():
    """Configure debugging for production use"""
    
    # Enable exception printing (low overhead)
    eventlet.debug.hub_exceptions(True)
    eventlet.debug.tpool_exceptions(True)
    
    # Enable blocking detection in development
    if os.environ.get('DEBUG', '').lower() == 'true':
        eventlet.debug.hub_blocking_detection(True, resolution=0.1)
        eventlet.debug.hub_listener_stacks(True)
        eventlet.debug.hub_timer_stacks(True)

# Call early in application initialization
setup_debugging()

Conditional Tracing

import eventlet.debug
import os

def debug_worker(worker_func, *args, **kwargs):
    """Run worker with conditional tracing"""
    
    enable_tracing = os.environ.get('EVENTLET_TRACE', '').lower() == 'true'
    
    if enable_tracing:
        print("Enabling tracing for worker")
        eventlet.debug.spew()
    
    try:
        return worker_func(*args, **kwargs)
    finally:
        if enable_tracing:
            eventlet.debug.unspew()

Safe Backdoor Setup

import eventlet.debug
import os

def setup_backdoor():
    """Setup backdoor only in development"""
    
    if os.environ.get('EVENTLET_BACKDOOR') == 'true':
        backdoor_port = int(os.environ.get('EVENTLET_BACKDOOR_PORT', '9999'))
        
        # Only bind to localhost for security
        eventlet.spawn(eventlet.debug.backdoor, backdoor_port, host='127.0.0.1')
        print(f"Debug backdoor available on localhost:{backdoor_port}")

Install with Tessl CLI

npx tessl i tessl/pypi-eventlet

docs

core-concurrency.md

debugging.md

green-stdlib.md

index.md

monkey-patching.md

networking.md

resource-pooling.md

synchronization.md

thread-pools.md

web-server.md

tile.json