Developer-friendly load testing framework for HTTP and other protocols with distributed testing capabilities.
—
Locust's contrib modules extend load testing capabilities beyond HTTP to support databases, WebSockets, and other protocols. These modules provide specialized user classes and clients for comprehensive system testing.
High-performance HTTP client using geventhttpclient for better performance under high concurrency loads.
from locust.contrib.fasthttp import FastHttpUser
class FastHttpUser(User):
"""
High-performance HTTP user using geventhttpclient.
Attributes:
client (FastHttpSession): Fast HTTP client
network_timeout (float): Network timeout in seconds
connection_timeout (float): Connection timeout in seconds
max_redirects (int): Maximum redirects to follow
max_retries (int): Maximum retry attempts
insecure (bool): Skip SSL verification
default_headers (dict): Default HTTP headers
concurrency (int): Connection concurrency level
proxy_host (str): Proxy hostname
proxy_port (int): Proxy port
"""
def rest(self, method, url, **kwargs):
"""Make REST API request with JSON handling."""
def rest_(self, method, url, **kwargs):
"""Make REST API request with response context manager."""Load testing for MongoDB databases with connection pooling and query operations.
from locust.contrib.mongodb import MongoDBUser, MongoDBClient
class MongoDBClient:
"""
MongoDB client wrapper for load testing.
Provides MongoDB operations with request event integration
for statistics collection and performance monitoring.
"""
def __init__(self, host, port=27017, **kwargs):
"""
Initialize MongoDB client.
Args:
host (str): MongoDB host
port (int): MongoDB port (default: 27017)
**kwargs: Additional MongoDB connection parameters
"""
class MongoDBUser(User):
"""
User class for MongoDB load testing.
Provides MongoDB client with automatic statistics collection
and connection management for load testing scenarios.
Attributes:
client (MongoDBClient): MongoDB client instance
"""Load testing for PostgreSQL databases with connection pooling and SQL query execution.
from locust.contrib.postgres import PostgresUser, PostgresClient
class PostgresClient:
"""
PostgreSQL client wrapper for load testing.
Provides SQL query execution with request event integration
for performance monitoring and statistics collection.
"""
def __init__(self, host, port=5432, database=None, user=None, password=None, **kwargs):
"""
Initialize PostgreSQL client.
Args:
host (str): PostgreSQL host
port (int): PostgreSQL port (default: 5432)
database (str): Database name
user (str): Username
password (str): Password
**kwargs: Additional connection parameters
"""
class PostgresUser(User):
"""
User class for PostgreSQL load testing.
Provides PostgreSQL client with automatic connection management
and statistics collection for database load testing.
Attributes:
client (PostgresClient): PostgreSQL client instance
"""Load testing for real-time WebSocket and Socket.IO applications.
from locust.contrib.socketio import SocketIOUser
class SocketIOUser(User):
"""
User class for WebSocket/Socket.IO load testing.
Supports real-time bi-directional communication testing
with event handling and connection management.
"""
def connect(self):
"""
Establish WebSocket/Socket.IO connection.
Returns:
Connection object
"""
def send(self, data):
"""
Send data over WebSocket connection.
Args:
data: Data to send
"""
def emit(self, event, data=None):
"""
Emit Socket.IO event.
Args:
event (str): Event name
data: Event data (optional)
"""
def call(self, event, data=None, timeout=60):
"""
Emit event and wait for response.
Args:
event (str): Event name
data: Event data (optional)
timeout (int): Response timeout in seconds
Returns:
Response data
"""
def on_message(self, message):
"""
Handle incoming WebSocket message.
Args:
message: Received message
"""Load testing for Milvus vector database operations including vector search and insertion.
from locust.contrib.milvus import MilvusUser, MilvusClient
class MilvusClient:
"""
Milvus vector database client for load testing.
Provides vector operations with request event integration
for performance monitoring of vector database workloads.
"""
class MilvusUser(User):
"""
User class for Milvus vector database load testing.
Supports vector search, insertion, and other Milvus operations
with automatic statistics collection and connection management.
Attributes:
client (MilvusClient): Milvus client instance
"""Load testing for OpenAI API endpoints including chat completions and embeddings.
from locust.contrib.oai import OpenAIUser, OpenAIClient
class OpenAIClient:
"""
OpenAI API client for load testing.
Provides OpenAI API operations with request event integration
for performance monitoring of AI service workloads.
"""
class OpenAIUser(User):
"""
User class for OpenAI API load testing.
Supports chat completions, embeddings, and other OpenAI operations
with automatic statistics collection and rate limiting handling.
Attributes:
client (OpenAIClient): OpenAI API client instance
"""from locust import task, between
from locust.contrib.mongodb import MongoDBUser
import random
class MongoUser(MongoDBUser):
wait_time = between(1, 3)
host = "mongodb://localhost:27017"
def on_start(self):
# Connect to specific database and collection
self.db = self.client.get_database("testdb")
self.collection = self.db.get_collection("users")
@task(3)
def read_user(self):
"""Read random user document"""
user_id = random.randint(1, 10000)
user = self.collection.find_one({"user_id": user_id})
@task(2)
def find_users(self):
"""Find users by criteria"""
age_limit = random.randint(18, 65)
users = list(self.collection.find({"age": {"$gte": age_limit}}).limit(10))
@task(1)
def insert_user(self):
"""Insert new user document"""
user_data = {
"user_id": random.randint(10001, 99999),
"name": f"User {random.randint(1, 1000)}",
"age": random.randint(18, 80),
"email": f"user{random.randint(1, 1000)}@example.com"
}
self.collection.insert_one(user_data)
@task(1)
def update_user(self):
"""Update existing user"""
user_id = random.randint(1, 10000)
self.collection.update_one(
{"user_id": user_id},
{"$set": {"last_login": datetime.utcnow()}}
)from locust import task, between
from locust.contrib.postgres import PostgresUser
import random
class PostgresUser(PostgresUser):
wait_time = between(0.5, 2)
# Database connection parameters
host = "localhost"
port = 5432
database = "testdb"
user = "testuser"
password = "testpass"
@task(5)
def select_query(self):
"""Execute SELECT query"""
user_id = random.randint(1, 10000)
query = "SELECT * FROM users WHERE id = %s"
self.client.execute(query, (user_id,))
@task(3)
def aggregate_query(self):
"""Execute aggregate query"""
query = """
SELECT department, COUNT(*), AVG(salary)
FROM employees
GROUP BY department
ORDER BY COUNT(*) DESC
"""
self.client.execute(query)
@task(2)
def insert_record(self):
"""Insert new record"""
query = """
INSERT INTO users (name, email, created_at)
VALUES (%s, %s, NOW())
"""
name = f"User {random.randint(1, 1000)}"
email = f"user{random.randint(1, 1000)}@example.com"
self.client.execute(query, (name, email))
@task(1)
def complex_join(self):
"""Execute complex JOIN query"""
query = """
SELECT u.name, p.title, c.name as category
FROM users u
JOIN posts p ON u.id = p.user_id
JOIN categories c ON p.category_id = c.id
WHERE u.created_at > %s
LIMIT 100
"""
cutoff_date = "2023-01-01"
self.client.execute(query, (cutoff_date,))from locust import task, between
from locust.contrib.socketio import SocketIOUser
import random
import json
class SocketIOUser(SocketIOUser):
wait_time = between(1, 5)
def on_start(self):
"""Connect to Socket.IO server"""
self.connect()
# Subscribe to channels
self.emit("join_room", {"room": "general"})
self.emit("subscribe", {"channel": "notifications"})
@task(3)
def send_message(self):
"""Send chat message"""
message_data = {
"room": "general",
"message": f"Hello from user {self.user_id}",
"timestamp": time.time()
}
self.emit("message", message_data)
@task(2)
def request_data(self):
"""Request data with response"""
try:
response = self.call("get_user_data", {"user_id": self.user_id}, timeout=10)
if response:
# Process response data
self.process_user_data(response)
except Exception as e:
print(f"Request failed: {e}")
@task(1)
def send_notification(self):
"""Send notification to random user"""
target_user = random.randint(1, 100)
notification = {
"target_user": target_user,
"type": "friend_request",
"from_user": self.user_id
}
self.emit("notification", notification)
def on_message(self, message):
"""Handle incoming messages"""
data = json.loads(message)
message_type = data.get("type")
if message_type == "chat":
# Handle chat message
self.handle_chat_message(data)
elif message_type == "notification":
# Handle notification
self.handle_notification(data)
def process_user_data(self, data):
"""Process received user data"""
# Custom processing logic
pass
def handle_chat_message(self, message):
"""Handle incoming chat message"""
# Simulate reading message
pass
def handle_notification(self, notification):
"""Handle incoming notification"""
# Simulate processing notification
passfrom locust import task, between
from locust.contrib.fasthttp import FastHttpUser
import random
class HighPerformanceUser(FastHttpUser):
wait_time = between(0.1, 0.5) # Very fast requests
# FastHTTP configuration for maximum performance
connection_timeout = 60.0
network_timeout = 60.0
concurrency = 20
insecure = True # Skip SSL verification for performance
@task(10)
def fast_api_get(self):
"""High-frequency GET requests"""
endpoint_id = random.randint(1, 1000)
self.client.get(f"/api/fast/{endpoint_id}")
@task(5)
def batch_request(self):
"""Batch multiple requests quickly"""
for i in range(5):
self.client.get(f"/api/batch/{i}")
@task(3)
def rest_api_call(self):
"""REST API with JSON handling"""
data = self.rest("GET", f"/api/data/{random.randint(1, 100)}")
# Process response data
if data and "id" in data:
# Follow up with related request
self.client.get(f"/api/related/{data['id']}")
@task(2)
def post_with_validation(self):
"""POST request with response validation"""
payload = {
"user_id": random.randint(1, 1000),
"action": "fast_update",
"timestamp": time.time()
}
with self.rest_("POST", "/api/update", json=payload) as response:
if response.js.get("status") != "success":
response.failure("Update failed")
# Validate response time
if response.elapsed.total_seconds() > 0.1:
response.failure("Response too slow")from locust import User, task, between, events
import socket
import time
class CustomProtocolClient:
"""Custom protocol client implementation"""
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
def connect(self):
"""Connect to custom protocol server"""
start_time = time.time()
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
# Fire request event for statistics
events.request.fire(
request_type="CONNECT",
name="connect",
response_time=(time.time() - start_time) * 1000,
response_length=0
)
except Exception as e:
events.request.fire(
request_type="CONNECT",
name="connect",
response_time=(time.time() - start_time) * 1000,
response_length=0,
exception=e
)
raise
def send_command(self, command):
"""Send command over custom protocol"""
start_time = time.time()
try:
# Send command
self.socket.send(command.encode())
# Receive response
response = self.socket.recv(1024)
# Fire request event
events.request.fire(
request_type="CUSTOM",
name=command,
response_time=(time.time() - start_time) * 1000,
response_length=len(response)
)
return response.decode()
except Exception as e:
events.request.fire(
request_type="CUSTOM",
name=command,
response_time=(time.time() - start_time) * 1000,
response_length=0,
exception=e
)
raise
def disconnect(self):
"""Disconnect from server"""
if self.socket:
self.socket.close()
class CustomProtocolUser(User):
wait_time = between(1, 3)
def on_start(self):
"""Initialize custom protocol client"""
self.client = CustomProtocolClient("localhost", 9999)
self.client.connect()
@task(3)
def get_data(self):
"""Get data command"""
self.client.send_command("GET_DATA")
@task(2)
def set_data(self):
"""Set data command"""
value = random.randint(1, 1000)
self.client.send_command(f"SET_DATA:{value}")
@task(1)
def ping(self):
"""Ping command"""
self.client.send_command("PING")
def on_stop(self):
"""Cleanup connection"""
self.client.disconnect()from typing import Dict, Any, Optional, Callable, Union
import socket
# MongoDB types
MongoDocument = Dict[str, Any]
MongoQuery = Dict[str, Any]
MongoConnection = Any # PyMongo connection object
# PostgreSQL types
PostgresQuery = str
PostgresParams = Union[tuple, list, Dict[str, Any]]
PostgresConnection = Any # psycopg connection object
# WebSocket/SocketIO types
SocketMessage = Union[str, bytes, Dict[str, Any]]
SocketEvent = str
SocketCallback = Callable[[Any], None]
SocketConnection = Any # WebSocket connection object
# Custom protocol types
ProtocolMessage = Union[str, bytes]
ProtocolResponse = Union[str, bytes, Dict[str, Any]]
CustomSocket = socket.socketInstall with Tessl CLI
npx tessl i tessl/pypi-locust