CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-locust

Developer-friendly load testing framework for HTTP and other protocols with distributed testing capabilities.

Pending
Overview
Eval results
Files

contrib.mddocs/

Protocol Extensions

Locust's contrib modules extend load testing capabilities beyond HTTP to support databases, WebSockets, and other protocols. These modules provide specialized user classes and clients for comprehensive system testing.

Capabilities

FastHTTP Support

High-performance HTTP client using geventhttpclient for better performance under high concurrency loads.

from locust.contrib.fasthttp import FastHttpUser

class FastHttpUser(User):
    """
    High-performance HTTP user using geventhttpclient.
    
    Attributes:
        client (FastHttpSession): Fast HTTP client
        network_timeout (float): Network timeout in seconds
        connection_timeout (float): Connection timeout in seconds
        max_redirects (int): Maximum redirects to follow
        max_retries (int): Maximum retry attempts
        insecure (bool): Skip SSL verification
        default_headers (dict): Default HTTP headers
        concurrency (int): Connection concurrency level
        proxy_host (str): Proxy hostname
        proxy_port (int): Proxy port
    """
    
    def rest(self, method, url, **kwargs):
        """Make REST API request with JSON handling."""
    
    def rest_(self, method, url, **kwargs):
        """Make REST API request with response context manager."""

MongoDB Support

Load testing for MongoDB databases with connection pooling and query operations.

from locust.contrib.mongodb import MongoDBUser, MongoDBClient

class MongoDBClient:
    """
    MongoDB client wrapper for load testing.
    
    Provides MongoDB operations with request event integration
    for statistics collection and performance monitoring.
    """
    
    def __init__(self, host, port=27017, **kwargs):
        """
        Initialize MongoDB client.
        
        Args:
            host (str): MongoDB host
            port (int): MongoDB port (default: 27017)
            **kwargs: Additional MongoDB connection parameters
        """

class MongoDBUser(User):
    """
    User class for MongoDB load testing.
    
    Provides MongoDB client with automatic statistics collection
    and connection management for load testing scenarios.
    
    Attributes:
        client (MongoDBClient): MongoDB client instance
    """

PostgreSQL Support

Load testing for PostgreSQL databases with connection pooling and SQL query execution.

from locust.contrib.postgres import PostgresUser, PostgresClient

class PostgresClient:
    """
    PostgreSQL client wrapper for load testing.
    
    Provides SQL query execution with request event integration
    for performance monitoring and statistics collection.
    """
    
    def __init__(self, host, port=5432, database=None, user=None, password=None, **kwargs):
        """
        Initialize PostgreSQL client.
        
        Args:
            host (str): PostgreSQL host
            port (int): PostgreSQL port (default: 5432)
            database (str): Database name
            user (str): Username
            password (str): Password
            **kwargs: Additional connection parameters
        """

class PostgresUser(User):
    """
    User class for PostgreSQL load testing.
    
    Provides PostgreSQL client with automatic connection management
    and statistics collection for database load testing.
    
    Attributes:
        client (PostgresClient): PostgreSQL client instance
    """

WebSocket/SocketIO Support

Load testing for real-time WebSocket and Socket.IO applications.

from locust.contrib.socketio import SocketIOUser

class SocketIOUser(User):
    """
    User class for WebSocket/Socket.IO load testing.
    
    Supports real-time bi-directional communication testing
    with event handling and connection management.
    """
    
    def connect(self):
        """
        Establish WebSocket/Socket.IO connection.
        
        Returns:
            Connection object
        """
    
    def send(self, data):
        """
        Send data over WebSocket connection.
        
        Args:
            data: Data to send
        """
    
    def emit(self, event, data=None):
        """
        Emit Socket.IO event.
        
        Args:
            event (str): Event name
            data: Event data (optional)
        """
    
    def call(self, event, data=None, timeout=60):
        """
        Emit event and wait for response.
        
        Args:
            event (str): Event name
            data: Event data (optional)
            timeout (int): Response timeout in seconds
            
        Returns:
            Response data
        """
    
    def on_message(self, message):
        """
        Handle incoming WebSocket message.
        
        Args:
            message: Received message
        """

Milvus Vector Database Support

Load testing for Milvus vector database operations including vector search and insertion.

from locust.contrib.milvus import MilvusUser, MilvusClient

class MilvusClient:
    """
    Milvus vector database client for load testing.
    
    Provides vector operations with request event integration
    for performance monitoring of vector database workloads.
    """

class MilvusUser(User):
    """
    User class for Milvus vector database load testing.
    
    Supports vector search, insertion, and other Milvus operations
    with automatic statistics collection and connection management.
    
    Attributes:
        client (MilvusClient): Milvus client instance
    """

OpenAI API Support

Load testing for OpenAI API endpoints including chat completions and embeddings.

from locust.contrib.oai import OpenAIUser, OpenAIClient

class OpenAIClient:
    """
    OpenAI API client for load testing.
    
    Provides OpenAI API operations with request event integration
    for performance monitoring of AI service workloads.
    """

class OpenAIUser(User):
    """
    User class for OpenAI API load testing.
    
    Supports chat completions, embeddings, and other OpenAI operations
    with automatic statistics collection and rate limiting handling.
    
    Attributes:
        client (OpenAIClient): OpenAI API client instance
    """

Usage Examples

MongoDB Load Testing

from locust import task, between
from locust.contrib.mongodb import MongoDBUser
import random

class MongoUser(MongoDBUser):
    wait_time = between(1, 3)
    host = "mongodb://localhost:27017"
    
    def on_start(self):
        # Connect to specific database and collection
        self.db = self.client.get_database("testdb")
        self.collection = self.db.get_collection("users")
    
    @task(3)
    def read_user(self):
        """Read random user document"""
        user_id = random.randint(1, 10000)
        user = self.collection.find_one({"user_id": user_id})
    
    @task(2)
    def find_users(self):
        """Find users by criteria"""
        age_limit = random.randint(18, 65)
        users = list(self.collection.find({"age": {"$gte": age_limit}}).limit(10))
    
    @task(1)
    def insert_user(self):
        """Insert new user document"""
        user_data = {
            "user_id": random.randint(10001, 99999),
            "name": f"User {random.randint(1, 1000)}",
            "age": random.randint(18, 80),
            "email": f"user{random.randint(1, 1000)}@example.com"
        }
        self.collection.insert_one(user_data)
    
    @task(1)
    def update_user(self):
        """Update existing user"""
        user_id = random.randint(1, 10000)
        self.collection.update_one(
            {"user_id": user_id},
            {"$set": {"last_login": datetime.utcnow()}}
        )

PostgreSQL Load Testing

from locust import task, between
from locust.contrib.postgres import PostgresUser
import random

class PostgresUser(PostgresUser):
    wait_time = between(0.5, 2)
    
    # Database connection parameters
    host = "localhost"
    port = 5432
    database = "testdb"
    user = "testuser"
    password = "testpass"
    
    @task(5)
    def select_query(self):
        """Execute SELECT query"""
        user_id = random.randint(1, 10000)
        query = "SELECT * FROM users WHERE id = %s"
        self.client.execute(query, (user_id,))
    
    @task(3)
    def aggregate_query(self):
        """Execute aggregate query"""
        query = """
        SELECT department, COUNT(*), AVG(salary) 
        FROM employees 
        GROUP BY department 
        ORDER BY COUNT(*) DESC
        """
        self.client.execute(query)
    
    @task(2)
    def insert_record(self):
        """Insert new record"""
        query = """
        INSERT INTO users (name, email, created_at) 
        VALUES (%s, %s, NOW())
        """
        name = f"User {random.randint(1, 1000)}"
        email = f"user{random.randint(1, 1000)}@example.com"
        self.client.execute(query, (name, email))
    
    @task(1)
    def complex_join(self):
        """Execute complex JOIN query"""
        query = """
        SELECT u.name, p.title, c.name as category
        FROM users u
        JOIN posts p ON u.id = p.user_id
        JOIN categories c ON p.category_id = c.id
        WHERE u.created_at > %s
        LIMIT 100
        """
        cutoff_date = "2023-01-01"
        self.client.execute(query, (cutoff_date,))

WebSocket/SocketIO Load Testing

from locust import task, between
from locust.contrib.socketio import SocketIOUser
import random
import json

class SocketIOUser(SocketIOUser):
    wait_time = between(1, 5)
    
    def on_start(self):
        """Connect to Socket.IO server"""
        self.connect()
        
        # Subscribe to channels
        self.emit("join_room", {"room": "general"})
        self.emit("subscribe", {"channel": "notifications"})
    
    @task(3)
    def send_message(self):
        """Send chat message"""
        message_data = {
            "room": "general",
            "message": f"Hello from user {self.user_id}",
            "timestamp": time.time()
        }
        self.emit("message", message_data)
    
    @task(2)
    def request_data(self):
        """Request data with response"""
        try:
            response = self.call("get_user_data", {"user_id": self.user_id}, timeout=10)
            if response:
                # Process response data
                self.process_user_data(response)
        except Exception as e:
            print(f"Request failed: {e}")
    
    @task(1)
    def send_notification(self):
        """Send notification to random user"""
        target_user = random.randint(1, 100)
        notification = {
            "target_user": target_user,
            "type": "friend_request",
            "from_user": self.user_id
        }
        self.emit("notification", notification)
    
    def on_message(self, message):
        """Handle incoming messages"""
        data = json.loads(message)
        message_type = data.get("type")
        
        if message_type == "chat":
            # Handle chat message
            self.handle_chat_message(data)
        elif message_type == "notification":
            # Handle notification
            self.handle_notification(data)
    
    def process_user_data(self, data):
        """Process received user data"""
        # Custom processing logic
        pass
    
    def handle_chat_message(self, message):
        """Handle incoming chat message"""
        # Simulate reading message
        pass
    
    def handle_notification(self, notification):
        """Handle incoming notification"""
        # Simulate processing notification
        pass

FastHTTP Performance Testing

from locust import task, between
from locust.contrib.fasthttp import FastHttpUser
import random

class HighPerformanceUser(FastHttpUser):
    wait_time = between(0.1, 0.5)  # Very fast requests
    
    # FastHTTP configuration for maximum performance
    connection_timeout = 60.0
    network_timeout = 60.0  
    concurrency = 20
    insecure = True  # Skip SSL verification for performance
    
    @task(10)
    def fast_api_get(self):
        """High-frequency GET requests"""
        endpoint_id = random.randint(1, 1000)
        self.client.get(f"/api/fast/{endpoint_id}")
    
    @task(5)
    def batch_request(self):
        """Batch multiple requests quickly"""
        for i in range(5):
            self.client.get(f"/api/batch/{i}")
    
    @task(3)
    def rest_api_call(self):
        """REST API with JSON handling"""
        data = self.rest("GET", f"/api/data/{random.randint(1, 100)}")
        
        # Process response data
        if data and "id" in data:
            # Follow up with related request
            self.client.get(f"/api/related/{data['id']}")
    
    @task(2)
    def post_with_validation(self):
        """POST request with response validation"""
        payload = {
            "user_id": random.randint(1, 1000),
            "action": "fast_update",
            "timestamp": time.time()
        }
        
        with self.rest_("POST", "/api/update", json=payload) as response:
            if response.js.get("status") != "success":
                response.failure("Update failed")
            
            # Validate response time
            if response.elapsed.total_seconds() > 0.1:
                response.failure("Response too slow")

Custom Protocol Implementation

from locust import User, task, between, events
import socket
import time

class CustomProtocolClient:
    """Custom protocol client implementation"""
    
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.socket = None
    
    def connect(self):
        """Connect to custom protocol server"""
        start_time = time.time()
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((self.host, self.port))
            
            # Fire request event for statistics
            events.request.fire(
                request_type="CONNECT",
                name="connect",
                response_time=(time.time() - start_time) * 1000,
                response_length=0
            )
        except Exception as e:
            events.request.fire(
                request_type="CONNECT",
                name="connect", 
                response_time=(time.time() - start_time) * 1000,
                response_length=0,
                exception=e
            )
            raise
    
    def send_command(self, command):
        """Send command over custom protocol"""
        start_time = time.time()
        try:
            # Send command
            self.socket.send(command.encode())
            
            # Receive response
            response = self.socket.recv(1024)
            
            # Fire request event
            events.request.fire(
                request_type="CUSTOM",
                name=command,
                response_time=(time.time() - start_time) * 1000,
                response_length=len(response)
            )
            
            return response.decode()
            
        except Exception as e:
            events.request.fire(
                request_type="CUSTOM",
                name=command,
                response_time=(time.time() - start_time) * 1000,
                response_length=0,
                exception=e
            )
            raise
    
    def disconnect(self):
        """Disconnect from server"""
        if self.socket:
            self.socket.close()

class CustomProtocolUser(User):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize custom protocol client"""
        self.client = CustomProtocolClient("localhost", 9999)
        self.client.connect()
    
    @task(3)
    def get_data(self):
        """Get data command"""
        self.client.send_command("GET_DATA")
    
    @task(2)
    def set_data(self):
        """Set data command"""
        value = random.randint(1, 1000)
        self.client.send_command(f"SET_DATA:{value}")
    
    @task(1)
    def ping(self):
        """Ping command"""
        self.client.send_command("PING")
    
    def on_stop(self):
        """Cleanup connection"""
        self.client.disconnect()

Types

from typing import Dict, Any, Optional, Callable, Union
import socket

# MongoDB types
MongoDocument = Dict[str, Any]
MongoQuery = Dict[str, Any]
MongoConnection = Any  # PyMongo connection object

# PostgreSQL types  
PostgresQuery = str
PostgresParams = Union[tuple, list, Dict[str, Any]]
PostgresConnection = Any  # psycopg connection object

# WebSocket/SocketIO types
SocketMessage = Union[str, bytes, Dict[str, Any]]
SocketEvent = str
SocketCallback = Callable[[Any], None]
SocketConnection = Any  # WebSocket connection object

# Custom protocol types
ProtocolMessage = Union[str, bytes]
ProtocolResponse = Union[str, bytes, Dict[str, Any]]
CustomSocket = socket.socket

Install with Tessl CLI

npx tessl i tessl/pypi-locust

docs

contrib.md

debugging.md

events.md

exceptions.md

index.md

load-shapes.md

tasksets.md

user-classes.md

wait-time.md

tile.json