Developer-friendly load testing framework for HTTP and other protocols with distributed testing capabilities.
—
Locust's event system provides extensibility hooks for custom metrics, logging, integrations, and test lifecycle management. The global events instance offers access to all built-in events for extending Locust functionality.
The primary interface for accessing Locust's event system, available as a global instance.
from locust import events
# Global events instance providing access to all event hooks
events: EventsEvents fired during HTTP request lifecycle for statistics collection and custom processing.
# Request completion event
events.request.add_listener(handler)
def request_handler(request_type, name, response_time, response_length, exception=None, **kwargs):
"""
Handle request completion events.
Args:
request_type (str): HTTP method or custom request type
name (str): Request name for statistics grouping
response_time (float): Response time in milliseconds
response_length (int): Response body length in bytes
exception (Exception): Exception if request failed
**kwargs: Additional request metadata
"""
# User error event
events.user_error.add_listener(handler)
def user_error_handler(user_instance, exception, tb, **kwargs):
"""
Handle user execution errors.
Args:
user_instance: User instance that encountered error
exception (Exception): Exception that occurred
tb: Traceback object
**kwargs: Additional error context
"""Events for hooking into test execution phases and managing test lifecycle.
# Test initialization
events.init.add_listener(handler)
def init_handler(environment, **kwargs):
"""
Handle test initialization.
Args:
environment: Locust environment instance
**kwargs: Additional initialization context
"""
# Test start
events.test_start.add_listener(handler)
def test_start_handler(environment, **kwargs):
"""
Handle test start event.
Args:
environment: Locust environment instance
**kwargs: Additional test start context
"""
# Test stopping (before cleanup)
events.test_stopping.add_listener(handler)
def test_stopping_handler(environment, **kwargs):
"""
Handle test stopping event (before cleanup).
Args:
environment: Locust environment instance
**kwargs: Additional context
"""
# Test stop (after cleanup)
events.test_stop.add_listener(handler)
def test_stop_handler(environment, **kwargs):
"""
Handle test stop event (after cleanup).
Args:
environment: Locust environment instance
**kwargs: Additional context
"""
# Test quit (final cleanup)
events.quit.add_listener(handler)
events.quitting.add_listener(handler)
def quit_handler(exit_code, **kwargs):
"""
Handle test quit events.
Args:
exit_code (int): Exit code for test run
**kwargs: Additional quit context
"""Events related to user spawning and management during test execution.
# User spawning complete
events.spawning_complete.add_listener(handler)
def spawning_complete_handler(user_count, **kwargs):
"""
Handle user spawning completion.
Args:
user_count (int): Total number of spawned users
**kwargs: Additional spawning context
"""Events for custom statistics handling and processing.
# Statistics reset
events.reset_stats.add_listener(handler)
def reset_stats_handler(**kwargs):
"""
Handle statistics reset event.
Args:
**kwargs: Reset context
"""Events for master/worker coordination in distributed testing scenarios.
# Worker reporting to master
events.report_to_master.add_listener(handler)
def report_to_master_handler(client_id, data, **kwargs):
"""
Handle worker report to master.
Args:
client_id (str): Worker client identifier
data (dict): Report data from worker
**kwargs: Additional report context
"""
# Master receiving worker report
events.worker_report.add_listener(handler)
def worker_report_handler(client_id, data, **kwargs):
"""
Handle worker report on master.
Args:
client_id (str): Worker client identifier
data (dict): Report data from worker
**kwargs: Additional report context
"""
# Worker connect to master
events.worker_connect.add_listener(handler)
def worker_connect_handler(client_id, **kwargs):
"""
Handle worker connection to master.
Args:
client_id (str): Worker client identifier
**kwargs: Connection context
"""Events for system resource monitoring and alerts.
# CPU usage warning
events.cpu_warning.add_listener(handler)
def cpu_warning_handler(usage_percent, **kwargs):
"""
Handle CPU usage warnings.
Args:
usage_percent (float): CPU usage percentage
**kwargs: Additional system context
"""
# Heartbeat events (distributed testing)
events.heartbeat_sent.add_listener(handler)
events.heartbeat_received.add_listener(handler)
def heartbeat_handler(client_id, **kwargs):
"""
Handle heartbeat events.
Args:
client_id (str): Client identifier
**kwargs: Heartbeat context
"""
# Usage monitoring
events.usage_monitor.add_listener(handler)
def usage_monitor_handler(stats, **kwargs):
"""
Handle usage monitoring events.
Args:
stats (dict): Usage statistics
**kwargs: Monitoring context
"""Events for customizing command line argument parsing.
# Command line parser initialization
events.init_command_line_parser.add_listener(handler)
def init_parser_handler(parser, **kwargs):
"""
Handle command line parser initialization.
Args:
parser: ArgumentParser instance
**kwargs: Parser initialization context
"""Main events manager class for creating custom event systems.
class Events:
"""
Event system manager providing event hooks.
Attributes:
request: Request completion event hook
user_error: User error event hook
report_to_master: Worker to master reporting hook
worker_report: Master receiving worker report hook
worker_connect: Worker connection hook
spawning_complete: User spawning completion hook
quitting: Test quitting hook
quit: Test quit hook
init: Test initialization hook
init_command_line_parser: CLI parser init hook
test_start: Test start hook
test_stopping: Test stopping hook
test_stop: Test stop hook
reset_stats: Statistics reset hook
cpu_warning: CPU usage warning hook
heartbeat_sent: Heartbeat sent hook
heartbeat_received: Heartbeat received hook
usage_monitor: Usage monitoring hook
"""Individual event hook implementation for subscribing and firing events.
class EventHook:
"""
Individual event hook for managing event listeners.
"""
def add_listener(self, func):
"""
Add event listener function.
Args:
func (callable): Event handler function
"""
def remove_listener(self, func):
"""
Remove event listener function.
Args:
func (callable): Event handler function to remove
"""
def fire(self, **kwargs):
"""
Fire event to all registered listeners.
Args:
**kwargs: Event data to pass to listeners
"""from locust import HttpUser, task, between, events
import time
import json
# Custom metrics storage
custom_metrics = {
"api_calls": 0,
"error_count": 0,
"response_times": []
}
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
"""Collect custom metrics on each request"""
custom_metrics["api_calls"] += 1
custom_metrics["response_times"].append(response_time)
if exception:
custom_metrics["error_count"] += 1
# Log slow requests
if response_time > 5000: # 5 seconds
print(f"SLOW REQUEST: {name} took {response_time}ms")
def on_test_stop(environment, **kwargs):
"""Report custom metrics at test end"""
total_calls = custom_metrics["api_calls"]
avg_response_time = sum(custom_metrics["response_times"]) / len(custom_metrics["response_times"])
error_rate = custom_metrics["error_count"] / total_calls * 100
print(f"\n--- Custom Metrics ---")
print(f"Total API calls: {total_calls}")
print(f"Average response time: {avg_response_time:.2f}ms")
print(f"Error rate: {error_rate:.2f}%")
# Save to file
with open("custom_metrics.json", "w") as f:
json.dump(custom_metrics, f, indent=2)
# Register event listeners
events.request.add_listener(on_request)
events.test_stop.add_listener(on_test_stop)
class APIUser(HttpUser):
wait_time = between(1, 3)
@task
def api_call(self):
self.client.get("/api/data")from locust import HttpUser, task, events
import requests
import json
class ExternalReporter:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
self.test_start_time = None
self.request_count = 0
def on_test_start(self, environment, **kwargs):
"""Notify external system of test start"""
self.test_start_time = time.time()
payload = {
"event": "test_start",
"timestamp": self.test_start_time,
"users": environment.runner.target_user_count if environment.runner else 0
}
self.send_webhook(payload)
def on_request(self, request_type, name, response_time, response_length, exception, **kwargs):
"""Track request metrics"""
self.request_count += 1
# Send alerts for errors
if exception:
payload = {
"event": "error",
"timestamp": time.time(),
"request_type": request_type,
"name": name,
"error": str(exception)
}
self.send_webhook(payload)
def on_test_stop(self, environment, **kwargs):
"""Send final test report"""
duration = time.time() - self.test_start_time
payload = {
"event": "test_complete",
"timestamp": time.time(),
"duration": duration,
"total_requests": self.request_count,
"stats": environment.stats.serialize_stats() if environment.stats else {}
}
self.send_webhook(payload)
def send_webhook(self, payload):
"""Send webhook notification"""
try:
requests.post(self.webhook_url, json=payload, timeout=5)
except Exception as e:
print(f"Webhook failed: {e}")
# Initialize external reporter
reporter = ExternalReporter("https://monitoring.example.com/webhook")
# Register event listeners
events.test_start.add_listener(reporter.on_test_start)
events.request.add_listener(reporter.on_request)
events.test_stop.add_listener(reporter.on_test_stop)
class WebUser(HttpUser):
wait_time = between(1, 2)
@task
def browse(self):
self.client.get("/")from locust import HttpUser, task, events
import csv
import time
from collections import defaultdict
class DetailedStats:
def __init__(self):
self.start_time = time.time()
self.requests_by_minute = defaultdict(list)
self.errors_by_type = defaultdict(int)
self.response_time_buckets = defaultdict(int)
def on_request(self, request_type, name, response_time, response_length, exception, **kwargs):
"""Collect detailed request statistics"""
minute = int((time.time() - self.start_time) // 60)
# Track requests per minute
self.requests_by_minute[minute].append({
"type": request_type,
"name": name,
"response_time": response_time,
"length": response_length,
"success": exception is None
})
# Track errors by type
if exception:
error_type = type(exception).__name__
self.errors_by_type[error_type] += 1
# Response time buckets
if response_time < 100:
self.response_time_buckets["<100ms"] += 1
elif response_time < 500:
self.response_time_buckets["100-500ms"] += 1
elif response_time < 1000:
self.response_time_buckets["500ms-1s"] += 1
elif response_time < 5000:
self.response_time_buckets["1-5s"] += 1
else:
self.response_time_buckets[">5s"] += 1
def on_test_stop(self, environment, **kwargs):
"""Generate detailed CSV report"""
with open("detailed_stats.csv", "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Minute", "Request_Type", "Name", "Response_Time", "Length", "Success"])
for minute, requests in self.requests_by_minute.items():
for req in requests:
writer.writerow([
minute, req["type"], req["name"],
req["response_time"], req["length"], req["success"]
])
# Print summary
print("\n--- Response Time Distribution ---")
for bucket, count in self.response_time_buckets.items():
print(f"{bucket}: {count} requests")
print("\n--- Error Summary ---")
for error_type, count in self.errors_by_type.items():
print(f"{error_type}: {count} occurrences")
# Initialize detailed stats collector
stats_collector = DetailedStats()
events.request.add_listener(stats_collector.on_request)
events.test_stop.add_listener(stats_collector.on_test_stop)
class AnalyticsUser(HttpUser):
wait_time = between(0.5, 2)
@task(3)
def page_view(self):
self.client.get("/page")
@task(1)
def api_call(self):
self.client.get("/api/data")from locust import HttpUser, task, events
import os
def setup_environment_specific_handlers():
"""Setup different event handlers based on environment"""
environment = os.getenv("TEST_ENV", "development")
if environment == "production":
# Production: minimal logging, external monitoring
def prod_error_handler(user_instance, exception, tb, **kwargs):
# Send to external monitoring service
send_to_monitoring_service(str(exception))
events.user_error.add_listener(prod_error_handler)
elif environment == "staging":
# Staging: detailed logging for debugging
def staging_request_handler(request_type, name, response_time, response_length, exception, **kwargs):
if exception or response_time > 2000:
print(f"STAGING ALERT: {name} - {response_time}ms - {exception}")
events.request.add_listener(staging_request_handler)
else:
# Development: verbose logging
def dev_request_handler(**kwargs):
print(f"DEV REQUEST: {kwargs}")
events.request.add_listener(dev_request_handler)
# Setup environment-specific handling
setup_environment_specific_handlers()
class EnvironmentUser(HttpUser):
wait_time = between(1, 3)
@task
def environment_test(self):
self.client.get("/api/environment")from typing import Callable, Any, Dict, Optional
from locust.env import Environment
# Event handler function types
RequestHandler = Callable[[str, str, float, int, Optional[Exception]], None]
UserErrorHandler = Callable[[Any, Exception, Any], None]
LifecycleHandler = Callable[[Environment], None]
StatisticsHandler = Callable[[Dict[str, Any]], None]
SystemHandler = Callable[[float], None] # For CPU warnings, etc.
# Generic event handler
EventHandler = Callable[..., None]
# Event hook interface
class EventHook:
def add_listener(self, func: EventHandler) -> None: ...
def remove_listener(self, func: EventHandler) -> None: ...
def fire(self, **kwargs: Any) -> None: ...Install with Tessl CLI
npx tessl i tessl/pypi-locust