Highly concurrent networking library
—
Tools for debugging, introspection, and monitoring eventlet applications including tracing, hub inspection, and performance analysis. These utilities help diagnose issues in concurrent greenthread applications.
Tools for tracing greenthread execution and function calls.
def spew():
"""
Install a detailed tracing hook that prints every function call.
Useful for debugging greenthread execution flow.
Returns:
None
Note:
Produces very verbose output. Use unspew() to disable.
"""
def unspew():
"""
Remove the tracing hook installed by spew().
Returns:
None
"""Tools for inspecting the event hub's internal state.
def format_hub_listeners():
"""
Format the current hub's file descriptor listeners as a string.
Returns:
str: formatted information about active listeners
"""
def format_hub_timers():
"""
Format the current hub's timers as a string.
Returns:
str: formatted information about scheduled timers
"""
def hub_listener_stacks(enabled):
"""
Toggle recording of stack traces for hub listeners.
Parameters:
- enabled: bool, whether to record listener stack traces
Returns:
None
"""
def hub_timer_stacks(enabled):
"""
Toggle recording of stack traces for hub timers.
Parameters:
- enabled: bool, whether to record timer stack traces
Returns:
None
"""
def hub_exceptions(enabled):
"""
Toggle printing of exceptions that occur in hub timers.
Parameters:
- enabled: bool, whether to print hub timer exceptions
Returns:
None
"""
def hub_prevent_multiple_readers(enabled):
"""
Toggle prevention of multiple readers on the same file descriptor.
Parameters:
- enabled: bool, whether to prevent multiple readers
Returns:
None
"""
def hub_blocking_detection(enabled, resolution=1):
"""
Toggle detection of blocking behavior in the hub.
Parameters:
- enabled: bool, whether to enable blocking detection
- resolution: float, detection resolution in seconds
Returns:
None
"""Debug information for thread pool operations.
def tpool_exceptions(enabled):
"""
Toggle printing of exceptions that occur in tpool operations.
Parameters:
- enabled: bool, whether to print tpool exceptions
Returns:
None
"""Functions for gathering system and runtime information.
def format_asyncio_info():
"""
Format information about asyncio event loop and tasks.
Returns:
str: formatted asyncio debugging information
"""
def format_threads_info():
"""
Format information about active threads.
Returns:
str: formatted thread debugging information
"""Backdoor server for interactive debugging of running applications.
def backdoor_server(sock, locals=None):
"""
Run a backdoor server on the given socket for interactive debugging.
Parameters:
- sock: listening socket for backdoor connections
- locals: dict, local variables available in backdoor session
Returns:
None
"""
def backdoor(port, host='127.0.0.1', locals=None):
"""
Set up an interactive console backdoor on the specified port.
Parameters:
- port: int, port number for backdoor server
- host: str, host address to bind to
- locals: dict, local variables available in backdoor session
Returns:
None
"""import eventlet
import eventlet.debug
import time
def problematic_function():
"""Function that might have issues"""
print("Starting problematic function")
# Simulate some work
eventlet.sleep(1.0)
# Simulate a potential issue
for i in range(1000000):
if i % 100000 == 0:
eventlet.sleep(0) # Yield control
print("Problematic function completed")
def debug_basic_example():
"""Basic debugging example"""
print("=== Basic Debugging Example ===")
# Enable various debugging features
print("Enabling hub exception printing...")
eventlet.debug.hub_exceptions(True)
print("Enabling tpool exception printing...")
eventlet.debug.tpool_exceptions(True)
print("Enabling hub blocking detection...")
eventlet.debug.hub_blocking_detection(True, resolution=0.1)
# Run some potentially problematic code
def worker(worker_id):
"""Worker that might block"""
try:
print(f"Worker {worker_id} starting")
# This might cause blocking detection to trigger
time.sleep(0.2) # Intentional blocking call
# Some eventlet operations
eventlet.sleep(0.5)
print(f"Worker {worker_id} completed")
except Exception as e:
print(f"Worker {worker_id} error: {e}")
# Start workers
greenthreads = []
for i in range(3):
gt = eventlet.spawn(worker, i+1)
greenthreads.append(gt)
# Wait for completion
for gt in greenthreads:
gt.wait()
print("Basic debugging example completed")
if __name__ == "__main__":
debug_basic_example()import eventlet
import eventlet.debug
import time
def hub_monitoring_example():
"""Example of monitoring hub state"""
print("=== Hub Monitoring Example ===")
# Enable stack trace recording
eventlet.debug.hub_listener_stacks(True)
eventlet.debug.hub_timer_stacks(True)
def network_worker(worker_id, host, port):
"""Worker that creates network listeners"""
try:
print(f"Network worker {worker_id} connecting to {host}:{port}")
# This creates a listener in the hub
sock = eventlet.connect((host, port))
# Send some data
sock.send(b"Hello from worker " + str(worker_id).encode())
# Receive response
response = sock.recv(1024)
print(f"Worker {worker_id} received: {response}")
sock.close()
except Exception as e:
print(f"Network worker {worker_id} error: {e}")
def timer_worker(worker_id, delays):
"""Worker that creates timers"""
print(f"Timer worker {worker_id} starting")
for delay in delays:
print(f"Timer worker {worker_id} sleeping for {delay}s")
eventlet.sleep(delay)
print(f"Timer worker {worker_id} completed")
def monitor_hub():
"""Monitor hub state periodically"""
for i in range(10):
eventlet.sleep(0.5)
print(f"\n--- Hub Status (iteration {i+1}) ---")
# Show hub listeners
listeners_info = eventlet.debug.format_hub_listeners()
if listeners_info.strip():
print("Active Listeners:")
print(listeners_info)
else:
print("No active listeners")
# Show hub timers
timers_info = eventlet.debug.format_hub_timers()
if timers_info.strip():
print("Active Timers:")
print(timers_info)
else:
print("No active timers")
# Start monitoring
eventlet.spawn(monitor_hub)
# Start some network workers (these will likely fail but create listeners)
for i in range(2):
eventlet.spawn(network_worker, i+1, 'httpbin.org', 80)
# Start timer workers
timer_delays = [[1.0, 0.5], [0.3, 1.2, 0.8]]
for i, delays in enumerate(timer_delays):
eventlet.spawn(timer_worker, i+1, delays)
# Let everything run
eventlet.sleep(6)
print("\nHub monitoring completed")
if __name__ == "__main__":
hub_monitoring_example()import eventlet
import eventlet.debug
import time
def performance_analysis_example():
"""Example of performance analysis with tracing"""
print("=== Performance Analysis Example ===")
def slow_function(duration):
"""Function that takes some time"""
print(f"Starting slow function ({duration}s)")
start = time.time()
# Mix of blocking and non-blocking operations
eventlet.sleep(duration / 2) # Non-blocking
# Simulate CPU work (this might show up in traces)
for i in range(int(duration * 100000)):
if i % 10000 == 0:
eventlet.sleep(0) # Yield occasionally
elapsed = time.time() - start
print(f"Slow function completed in {elapsed:.2f}s")
return elapsed
def traced_worker(worker_id):
"""Worker that we'll trace"""
print(f"Traced worker {worker_id} starting")
# Do some work that we want to trace
result1 = slow_function(0.5)
result2 = slow_function(0.3)
total = result1 + result2
print(f"Traced worker {worker_id} total time: {total:.2f}s")
return total
def untraced_worker(worker_id):
"""Worker that runs without tracing"""
print(f"Untraced worker {worker_id} starting")
result1 = slow_function(0.4)
result2 = slow_function(0.2)
total = result1 + result2
print(f"Untraced worker {worker_id} total time: {total:.2f}s")
return total
# Run without tracing first
print("Running workers without tracing...")
start_time = time.time()
untraced_gts = []
for i in range(2):
gt = eventlet.spawn(untraced_worker, i+1)
untraced_gts.append(gt)
for gt in untraced_gts:
gt.wait()
untraced_time = time.time() - start_time
print(f"Untraced execution took {untraced_time:.2f}s")
print("\n" + "="*50)
print("Now running with tracing enabled...")
print("WARNING: This will produce very verbose output!")
# Enable tracing for detailed analysis
eventlet.debug.spew()
start_time = time.time()
traced_gts = []
for i in range(1): # Fewer workers due to verbose output
gt = eventlet.spawn(traced_worker, i+1)
traced_gts.append(gt)
for gt in traced_gts:
gt.wait()
# Disable tracing
eventlet.debug.unspew()
traced_time = time.time() - start_time
print(f"Traced execution took {traced_time:.2f}s")
print("Performance analysis completed")
if __name__ == "__main__":
performance_analysis_example()import eventlet
import eventlet.debug
import threading
import time
# Global application state for debugging
app_state = {
'active_connections': 0,
'total_requests': 0,
'errors': [],
'start_time': time.time()
}
def web_server_simulation():
"""Simulate a web server for debugging"""
def handle_request(request_id):
"""Simulate handling a web request"""
app_state['active_connections'] += 1
app_state['total_requests'] += 1
try:
# Simulate request processing
processing_time = eventlet.random.uniform(0.1, 2.0)
eventlet.sleep(processing_time)
# Occasionally simulate an error
if eventlet.random.random() < 0.1:
raise ValueError(f"Simulated error in request {request_id}")
print(f"Request {request_id} completed in {processing_time:.2f}s")
except Exception as e:
error_info = {
'request_id': request_id,
'error': str(e),
'timestamp': time.time()
}
app_state['errors'].append(error_info)
print(f"Request {request_id} failed: {e}")
finally:
app_state['active_connections'] -= 1
# Simulate incoming requests
request_id = 0
while True:
request_id += 1
eventlet.spawn(handle_request, request_id)
# Random delay between requests
eventlet.sleep(eventlet.random.uniform(0.1, 0.5))
def backdoor_debugging_example():
"""Example using backdoor for interactive debugging"""
print("=== Interactive Debugging Backdoor Example ===")
print("Starting web server simulation...")
# Start the simulated web server
eventlet.spawn(web_server_simulation)
# Set up debugging backdoor
backdoor_port = 9999
backdoor_locals = {
'app_state': app_state,
'eventlet': eventlet,
'debug': eventlet.debug,
'time': time
}
print(f"Starting debugging backdoor on port {backdoor_port}")
print("Connect with: telnet localhost 9999")
print("\nUseful debugging commands to try in the backdoor:")
print(" app_state # View application state")
print(" app_state['total_requests'] # Get request count")
print(" len(app_state['errors']) # Get error count")
print(" debug.format_hub_timers() # View hub timers")
print(" debug.format_hub_listeners() # View hub listeners")
print(" [error['error'] for error in app_state['errors'][-5:]] # Recent errors")
print("\nPress Ctrl+C to stop the server")
try:
# Start backdoor server
eventlet.debug.backdoor(backdoor_port, locals=backdoor_locals)
except KeyboardInterrupt:
print("\nShutting down server...")
# Print final statistics
uptime = time.time() - app_state['start_time']
print(f"\nFinal Statistics:")
print(f" Uptime: {uptime:.1f} seconds")
print(f" Total requests: {app_state['total_requests']}")
print(f" Active connections: {app_state['active_connections']}")
print(f" Total errors: {len(app_state['errors'])}")
if __name__ == "__main__":
# Note: This example requires manual interaction via telnet
# Run: python script.py
# Then in another terminal: telnet localhost 9999
backdoor_debugging_example()import eventlet
import eventlet.debug
import threading
import asyncio
import time
def system_diagnostics_example():
"""Example of gathering system diagnostic information"""
print("=== System Diagnostics Example ===")
def background_worker(worker_type, worker_id):
"""Background worker for diagnostics"""
if worker_type == 'greenthread':
for i in range(5):
print(f"Greenthread worker {worker_id} iteration {i+1}")
eventlet.sleep(1.0)
elif worker_type == 'thread':
for i in range(3):
print(f"Thread worker {worker_id} iteration {i+1}")
time.sleep(1.0) # Blocking sleep
def async_worker():
"""Asyncio worker (if available)"""
try:
import asyncio
async def async_task():
for i in range(3):
print(f"Async worker iteration {i+1}")
await asyncio.sleep(0.5)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_task())
loop.close()
except Exception as e:
print(f"Async worker error: {e}")
def collect_diagnostics():
"""Collect comprehensive diagnostic information"""
print("\n--- Collecting System Diagnostics ---")
# Eventlet hub information
print("\n1. Eventlet Hub Status:")
listeners = eventlet.debug.format_hub_listeners()
timers = eventlet.debug.format_hub_timers()
if listeners.strip():
print("Hub Listeners:")
print(listeners)
else:
print("No active hub listeners")
if timers.strip():
print("Hub Timers:")
print(timers)
else:
print("No active hub timers")
# Thread information
print("\n2. Thread Information:")
thread_info = eventlet.debug.format_threads_info()
if thread_info.strip():
print(thread_info)
else:
print("No additional thread information available")
# Asyncio information (if applicable)
print("\n3. Asyncio Information:")
try:
asyncio_info = eventlet.debug.format_asyncio_info()
if asyncio_info.strip():
print(asyncio_info)
else:
print("No asyncio information available")
except Exception as e:
print(f"Error getting asyncio info: {e}")
# Python threading information
print("\n4. Python Threading:")
active_threads = threading.active_count()
thread_names = [t.name for t in threading.enumerate()]
print(f"Active threads: {active_threads}")
print(f"Thread names: {thread_names}")
# Greenlet information
print("\n5. Greenlet Information:")
try:
import greenlet
current = greenlet.getcurrent()
print(f"Current greenlet: {current}")
print(f"Greenlet parent: {current.parent}")
except Exception as e:
print(f"Error getting greenlet info: {e}")
# Start various types of workers
print("Starting background workers for diagnostics...")
# Greenthread workers
for i in range(2):
eventlet.spawn(background_worker, 'greenthread', i+1)
# Regular thread workers
for i in range(1):
thread = threading.Thread(target=background_worker, args=('thread', i+1))
thread.start()
# Asyncio worker in separate thread
async_thread = threading.Thread(target=async_worker)
async_thread.start()
# Collect diagnostics periodically
for i in range(3):
eventlet.sleep(2.0)
collect_diagnostics()
print("\n" + "="*60)
print("System diagnostics completed")
def memory_leak_detection():
"""Example of detecting potential memory leaks"""
print("=== Memory Leak Detection Example ===")
import gc
import sys
def leaky_function():
"""Function that might create memory leaks"""
# Create some objects that might not be cleaned up
data = list(range(10000))
def closure():
return sum(data) # Closure captures data
# Store closure somewhere it might persist
leaky_function.closures = getattr(leaky_function, 'closures', [])
leaky_function.closures.append(closure)
return closure()
def monitor_memory():
"""Monitor memory usage and object counts"""
print("Monitoring memory usage...")
initial_objects = len(gc.get_objects())
print(f"Initial object count: {initial_objects}")
for iteration in range(5):
# Run potentially leaky code
for i in range(100):
result = leaky_function()
eventlet.sleep(0.01)
# Force garbage collection
collected = gc.collect()
# Check object counts
current_objects = len(gc.get_objects())
object_growth = current_objects - initial_objects
print(f"Iteration {iteration+1}:")
print(f" Objects: {current_objects} (+{object_growth})")
print(f" Collected: {collected}")
if hasattr(leaky_function, 'closures'):
print(f" Closures: {len(leaky_function.closures)}")
eventlet.sleep(1.0)
# Show object types that have grown
print("\nObject type analysis:")
type_counts = {}
for obj in gc.get_objects():
obj_type = type(obj).__name__
type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
# Show top object types
top_types = sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:10]
for obj_type, count in top_types:
print(f" {obj_type}: {count}")
monitor_memory()
if __name__ == "__main__":
print("Choose example to run:")
print("1. System Diagnostics")
print("2. Memory Leak Detection")
choice = input("Enter choice (1 or 2): ").strip()
if choice == "1":
system_diagnostics_example()
elif choice == "2":
memory_leak_detection()
else:
print("Running system diagnostics by default...")
system_diagnostics_example()import eventlet
import eventlet.debug
import time
import functools
class EventletProfiler:
"""Custom profiler for eventlet applications"""
def __init__(self):
self.call_counts = {}
self.call_times = {}
self.start_times = {}
def profile_function(self, func):
"""Decorator to profile function calls"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = f"{func.__module__}.{func.__name__}"
# Update call count
self.call_counts[func_name] = self.call_counts.get(func_name, 0) + 1
# Record start time
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
# Record total time
elapsed = time.time() - start_time
if func_name not in self.call_times:
self.call_times[func_name] = 0
self.call_times[func_name] += elapsed
return wrapper
def get_stats(self):
"""Get profiling statistics"""
stats = []
for func_name in self.call_counts:
stats.append({
'function': func_name,
'calls': self.call_counts[func_name],
'total_time': self.call_times.get(func_name, 0),
'avg_time': self.call_times.get(func_name, 0) / self.call_counts[func_name]
})
# Sort by total time
stats.sort(key=lambda x: x['total_time'], reverse=True)
return stats
def print_stats(self):
"""Print profiling statistics"""
stats = self.get_stats()
print("\n=== Profiling Statistics ===")
print(f"{'Function':<40} {'Calls':<8} {'Total':<10} {'Avg':<10}")
print("-" * 70)
for stat in stats[:10]: # Top 10
print(f"{stat['function']:<40} {stat['calls']:<8} "
f"{stat['total_time']:<10.3f} {stat['avg_time']:<10.3f}")
def custom_debug_example():
"""Example of custom debugging utilities"""
print("=== Custom Debug Utilities Example ===")
profiler = EventletProfiler()
@profiler.profile_function
def fast_operation():
"""Fast operation"""
eventlet.sleep(0.1)
return "fast result"
@profiler.profile_function
def slow_operation():
"""Slow operation"""
eventlet.sleep(0.5)
return "slow result"
@profiler.profile_function
def cpu_operation():
"""CPU-intensive operation"""
result = 0
for i in range(100000):
result += i
if i % 10000 == 0:
eventlet.sleep(0) # Yield
return result
def worker(worker_id, operations):
"""Worker that performs various operations"""
print(f"Worker {worker_id} starting")
for op_type in operations:
if op_type == 'fast':
result = fast_operation()
elif op_type == 'slow':
result = slow_operation()
elif op_type == 'cpu':
result = cpu_operation()
print(f"Worker {worker_id} completed {op_type} operation")
print(f"Worker {worker_id} finished")
# Define work for each worker
worker_operations = [
['fast', 'slow', 'cpu'],
['fast', 'fast', 'slow'],
['cpu', 'fast', 'cpu'],
['slow', 'cpu', 'fast']
]
print("Starting profiled workers...")
# Start workers
greenthreads = []
for i, operations in enumerate(worker_operations):
gt = eventlet.spawn(worker, i+1, operations)
greenthreads.append(gt)
# Wait for completion
for gt in greenthreads:
gt.wait()
# Print profiling results
profiler.print_stats()
if __name__ == "__main__":
custom_debug_example()import eventlet.debug
# Enable debugging at application startup
def setup_debugging():
"""Configure debugging for production use"""
# Enable exception printing (low overhead)
eventlet.debug.hub_exceptions(True)
eventlet.debug.tpool_exceptions(True)
# Enable blocking detection in development
if os.environ.get('DEBUG', '').lower() == 'true':
eventlet.debug.hub_blocking_detection(True, resolution=0.1)
eventlet.debug.hub_listener_stacks(True)
eventlet.debug.hub_timer_stacks(True)
# Call early in application initialization
setup_debugging()import eventlet.debug
import os
def debug_worker(worker_func, *args, **kwargs):
"""Run worker with conditional tracing"""
enable_tracing = os.environ.get('EVENTLET_TRACE', '').lower() == 'true'
if enable_tracing:
print("Enabling tracing for worker")
eventlet.debug.spew()
try:
return worker_func(*args, **kwargs)
finally:
if enable_tracing:
eventlet.debug.unspew()import eventlet.debug
import os
def setup_backdoor():
"""Setup backdoor only in development"""
if os.environ.get('EVENTLET_BACKDOOR') == 'true':
backdoor_port = int(os.environ.get('EVENTLET_BACKDOOR_PORT', '9999'))
# Only bind to localhost for security
eventlet.spawn(eventlet.debug.backdoor, backdoor_port, host='127.0.0.1')
print(f"Debug backdoor available on localhost:{backdoor_port}")Install with Tessl CLI
npx tessl i tessl/pypi-eventlet