0
# Protocol Extensions
1
2
Locust's contrib modules extend load testing capabilities beyond HTTP to support databases, WebSockets, and other protocols. These modules provide specialized user classes and clients for comprehensive system testing.
3
4
## Capabilities
5
6
### FastHTTP Support
7
8
High-performance HTTP client using geventhttpclient for better performance under high concurrency loads.
9
10
```python { .api }
11
from locust.contrib.fasthttp import FastHttpUser
12
13
class FastHttpUser(User):
14
"""
15
High-performance HTTP user using geventhttpclient.
16
17
Attributes:
18
client (FastHttpSession): Fast HTTP client
19
network_timeout (float): Network timeout in seconds
20
connection_timeout (float): Connection timeout in seconds
21
max_redirects (int): Maximum redirects to follow
22
max_retries (int): Maximum retry attempts
23
insecure (bool): Skip SSL verification
24
default_headers (dict): Default HTTP headers
25
concurrency (int): Connection concurrency level
26
proxy_host (str): Proxy hostname
27
proxy_port (int): Proxy port
28
"""
29
30
def rest(self, method, url, **kwargs):
31
"""Make REST API request with JSON handling."""
32
33
def rest_(self, method, url, **kwargs):
34
"""Make REST API request with response context manager."""
35
```
36
37
### MongoDB Support
38
39
Load testing for MongoDB databases with connection pooling and query operations.
40
41
```python { .api }
42
from locust.contrib.mongodb import MongoDBUser, MongoDBClient
43
44
class MongoDBClient:
45
"""
46
MongoDB client wrapper for load testing.
47
48
Provides MongoDB operations with request event integration
49
for statistics collection and performance monitoring.
50
"""
51
52
def __init__(self, host, port=27017, **kwargs):
53
"""
54
Initialize MongoDB client.
55
56
Args:
57
host (str): MongoDB host
58
port (int): MongoDB port (default: 27017)
59
**kwargs: Additional MongoDB connection parameters
60
"""
61
62
class MongoDBUser(User):
63
"""
64
User class for MongoDB load testing.
65
66
Provides MongoDB client with automatic statistics collection
67
and connection management for load testing scenarios.
68
69
Attributes:
70
client (MongoDBClient): MongoDB client instance
71
"""
72
```
73
74
### PostgreSQL Support
75
76
Load testing for PostgreSQL databases with connection pooling and SQL query execution.
77
78
```python { .api }
79
from locust.contrib.postgres import PostgresUser, PostgresClient
80
81
class PostgresClient:
82
"""
83
PostgreSQL client wrapper for load testing.
84
85
Provides SQL query execution with request event integration
86
for performance monitoring and statistics collection.
87
"""
88
89
def __init__(self, host, port=5432, database=None, user=None, password=None, **kwargs):
90
"""
91
Initialize PostgreSQL client.
92
93
Args:
94
host (str): PostgreSQL host
95
port (int): PostgreSQL port (default: 5432)
96
database (str): Database name
97
user (str): Username
98
password (str): Password
99
**kwargs: Additional connection parameters
100
"""
101
102
class PostgresUser(User):
103
"""
104
User class for PostgreSQL load testing.
105
106
Provides PostgreSQL client with automatic connection management
107
and statistics collection for database load testing.
108
109
Attributes:
110
client (PostgresClient): PostgreSQL client instance
111
"""
112
```
113
114
### WebSocket/SocketIO Support
115
116
Load testing for real-time WebSocket and Socket.IO applications.
117
118
```python { .api }
119
from locust.contrib.socketio import SocketIOUser
120
121
class SocketIOUser(User):
122
"""
123
User class for WebSocket/Socket.IO load testing.
124
125
Supports real-time bi-directional communication testing
126
with event handling and connection management.
127
"""
128
129
def connect(self):
130
"""
131
Establish WebSocket/Socket.IO connection.
132
133
Returns:
134
Connection object
135
"""
136
137
def send(self, data):
138
"""
139
Send data over WebSocket connection.
140
141
Args:
142
data: Data to send
143
"""
144
145
def emit(self, event, data=None):
146
"""
147
Emit Socket.IO event.
148
149
Args:
150
event (str): Event name
151
data: Event data (optional)
152
"""
153
154
def call(self, event, data=None, timeout=60):
155
"""
156
Emit event and wait for response.
157
158
Args:
159
event (str): Event name
160
data: Event data (optional)
161
timeout (int): Response timeout in seconds
162
163
Returns:
164
Response data
165
"""
166
167
def on_message(self, message):
168
"""
169
Handle incoming WebSocket message.
170
171
Args:
172
message: Received message
173
"""
174
```
175
176
### Milvus Vector Database Support
177
178
Load testing for Milvus vector database operations including vector search and insertion.
179
180
```python { .api }
181
from locust.contrib.milvus import MilvusUser, MilvusClient
182
183
class MilvusClient:
184
"""
185
Milvus vector database client for load testing.
186
187
Provides vector operations with request event integration
188
for performance monitoring of vector database workloads.
189
"""
190
191
class MilvusUser(User):
192
"""
193
User class for Milvus vector database load testing.
194
195
Supports vector search, insertion, and other Milvus operations
196
with automatic statistics collection and connection management.
197
198
Attributes:
199
client (MilvusClient): Milvus client instance
200
"""
201
```
202
203
### OpenAI API Support
204
205
Load testing for OpenAI API endpoints including chat completions and embeddings.
206
207
```python { .api }
208
from locust.contrib.oai import OpenAIUser, OpenAIClient
209
210
class OpenAIClient:
211
"""
212
OpenAI API client for load testing.
213
214
Provides OpenAI API operations with request event integration
215
for performance monitoring of AI service workloads.
216
"""
217
218
class OpenAIUser(User):
219
"""
220
User class for OpenAI API load testing.
221
222
Supports chat completions, embeddings, and other OpenAI operations
223
with automatic statistics collection and rate limiting handling.
224
225
Attributes:
226
client (OpenAIClient): OpenAI API client instance
227
"""
228
```
229
230
## Usage Examples
231
232
### MongoDB Load Testing
233
234
```python
235
from locust import task, between
236
from locust.contrib.mongodb import MongoDBUser
237
import random
238
239
class MongoUser(MongoDBUser):
240
wait_time = between(1, 3)
241
host = "mongodb://localhost:27017"
242
243
def on_start(self):
244
# Connect to specific database and collection
245
self.db = self.client.get_database("testdb")
246
self.collection = self.db.get_collection("users")
247
248
@task(3)
249
def read_user(self):
250
"""Read random user document"""
251
user_id = random.randint(1, 10000)
252
user = self.collection.find_one({"user_id": user_id})
253
254
@task(2)
255
def find_users(self):
256
"""Find users by criteria"""
257
age_limit = random.randint(18, 65)
258
users = list(self.collection.find({"age": {"$gte": age_limit}}).limit(10))
259
260
@task(1)
261
def insert_user(self):
262
"""Insert new user document"""
263
user_data = {
264
"user_id": random.randint(10001, 99999),
265
"name": f"User {random.randint(1, 1000)}",
266
"age": random.randint(18, 80),
267
"email": f"user{random.randint(1, 1000)}@example.com"
268
}
269
self.collection.insert_one(user_data)
270
271
@task(1)
272
def update_user(self):
273
"""Update existing user"""
274
user_id = random.randint(1, 10000)
275
self.collection.update_one(
276
{"user_id": user_id},
277
{"$set": {"last_login": datetime.utcnow()}}
278
)
279
```
280
281
### PostgreSQL Load Testing
282
283
```python
284
from locust import task, between
285
from locust.contrib.postgres import PostgresUser
286
import random
287
288
class PostgresUser(PostgresUser):
289
wait_time = between(0.5, 2)
290
291
# Database connection parameters
292
host = "localhost"
293
port = 5432
294
database = "testdb"
295
user = "testuser"
296
password = "testpass"
297
298
@task(5)
299
def select_query(self):
300
"""Execute SELECT query"""
301
user_id = random.randint(1, 10000)
302
query = "SELECT * FROM users WHERE id = %s"
303
self.client.execute(query, (user_id,))
304
305
@task(3)
306
def aggregate_query(self):
307
"""Execute aggregate query"""
308
query = """
309
SELECT department, COUNT(*), AVG(salary)
310
FROM employees
311
GROUP BY department
312
ORDER BY COUNT(*) DESC
313
"""
314
self.client.execute(query)
315
316
@task(2)
317
def insert_record(self):
318
"""Insert new record"""
319
query = """
320
INSERT INTO users (name, email, created_at)
321
VALUES (%s, %s, NOW())
322
"""
323
name = f"User {random.randint(1, 1000)}"
324
email = f"user{random.randint(1, 1000)}@example.com"
325
self.client.execute(query, (name, email))
326
327
@task(1)
328
def complex_join(self):
329
"""Execute complex JOIN query"""
330
query = """
331
SELECT u.name, p.title, c.name as category
332
FROM users u
333
JOIN posts p ON u.id = p.user_id
334
JOIN categories c ON p.category_id = c.id
335
WHERE u.created_at > %s
336
LIMIT 100
337
"""
338
cutoff_date = "2023-01-01"
339
self.client.execute(query, (cutoff_date,))
340
```
341
342
### WebSocket/SocketIO Load Testing
343
344
```python
345
from locust import task, between
346
from locust.contrib.socketio import SocketIOUser
347
import random
348
import json
349
350
class SocketIOUser(SocketIOUser):
351
wait_time = between(1, 5)
352
353
def on_start(self):
354
"""Connect to Socket.IO server"""
355
self.connect()
356
357
# Subscribe to channels
358
self.emit("join_room", {"room": "general"})
359
self.emit("subscribe", {"channel": "notifications"})
360
361
@task(3)
362
def send_message(self):
363
"""Send chat message"""
364
message_data = {
365
"room": "general",
366
"message": f"Hello from user {self.user_id}",
367
"timestamp": time.time()
368
}
369
self.emit("message", message_data)
370
371
@task(2)
372
def request_data(self):
373
"""Request data with response"""
374
try:
375
response = self.call("get_user_data", {"user_id": self.user_id}, timeout=10)
376
if response:
377
# Process response data
378
self.process_user_data(response)
379
except Exception as e:
380
print(f"Request failed: {e}")
381
382
@task(1)
383
def send_notification(self):
384
"""Send notification to random user"""
385
target_user = random.randint(1, 100)
386
notification = {
387
"target_user": target_user,
388
"type": "friend_request",
389
"from_user": self.user_id
390
}
391
self.emit("notification", notification)
392
393
def on_message(self, message):
394
"""Handle incoming messages"""
395
data = json.loads(message)
396
message_type = data.get("type")
397
398
if message_type == "chat":
399
# Handle chat message
400
self.handle_chat_message(data)
401
elif message_type == "notification":
402
# Handle notification
403
self.handle_notification(data)
404
405
def process_user_data(self, data):
406
"""Process received user data"""
407
# Custom processing logic
408
pass
409
410
def handle_chat_message(self, message):
411
"""Handle incoming chat message"""
412
# Simulate reading message
413
pass
414
415
def handle_notification(self, notification):
416
"""Handle incoming notification"""
417
# Simulate processing notification
418
pass
419
```
420
421
### FastHTTP Performance Testing
422
423
```python
424
from locust import task, between
425
from locust.contrib.fasthttp import FastHttpUser
426
import random
427
428
class HighPerformanceUser(FastHttpUser):
429
wait_time = between(0.1, 0.5) # Very fast requests
430
431
# FastHTTP configuration for maximum performance
432
connection_timeout = 60.0
433
network_timeout = 60.0
434
concurrency = 20
435
insecure = True # Skip SSL verification for performance
436
437
@task(10)
438
def fast_api_get(self):
439
"""High-frequency GET requests"""
440
endpoint_id = random.randint(1, 1000)
441
self.client.get(f"/api/fast/{endpoint_id}")
442
443
@task(5)
444
def batch_request(self):
445
"""Batch multiple requests quickly"""
446
for i in range(5):
447
self.client.get(f"/api/batch/{i}")
448
449
@task(3)
450
def rest_api_call(self):
451
"""REST API with JSON handling"""
452
data = self.rest("GET", f"/api/data/{random.randint(1, 100)}")
453
454
# Process response data
455
if data and "id" in data:
456
# Follow up with related request
457
self.client.get(f"/api/related/{data['id']}")
458
459
@task(2)
460
def post_with_validation(self):
461
"""POST request with response validation"""
462
payload = {
463
"user_id": random.randint(1, 1000),
464
"action": "fast_update",
465
"timestamp": time.time()
466
}
467
468
with self.rest_("POST", "/api/update", json=payload) as response:
469
if response.js.get("status") != "success":
470
response.failure("Update failed")
471
472
# Validate response time
473
if response.elapsed.total_seconds() > 0.1:
474
response.failure("Response too slow")
475
```
476
477
### Custom Protocol Implementation
478
479
```python
480
from locust import User, task, between, events
481
import socket
482
import time
483
484
class CustomProtocolClient:
485
"""Custom protocol client implementation"""
486
487
def __init__(self, host, port):
488
self.host = host
489
self.port = port
490
self.socket = None
491
492
def connect(self):
493
"""Connect to custom protocol server"""
494
start_time = time.time()
495
try:
496
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
497
self.socket.connect((self.host, self.port))
498
499
# Fire request event for statistics
500
events.request.fire(
501
request_type="CONNECT",
502
name="connect",
503
response_time=(time.time() - start_time) * 1000,
504
response_length=0
505
)
506
except Exception as e:
507
events.request.fire(
508
request_type="CONNECT",
509
name="connect",
510
response_time=(time.time() - start_time) * 1000,
511
response_length=0,
512
exception=e
513
)
514
raise
515
516
def send_command(self, command):
517
"""Send command over custom protocol"""
518
start_time = time.time()
519
try:
520
# Send command
521
self.socket.send(command.encode())
522
523
# Receive response
524
response = self.socket.recv(1024)
525
526
# Fire request event
527
events.request.fire(
528
request_type="CUSTOM",
529
name=command,
530
response_time=(time.time() - start_time) * 1000,
531
response_length=len(response)
532
)
533
534
return response.decode()
535
536
except Exception as e:
537
events.request.fire(
538
request_type="CUSTOM",
539
name=command,
540
response_time=(time.time() - start_time) * 1000,
541
response_length=0,
542
exception=e
543
)
544
raise
545
546
def disconnect(self):
547
"""Disconnect from server"""
548
if self.socket:
549
self.socket.close()
550
551
class CustomProtocolUser(User):
552
wait_time = between(1, 3)
553
554
def on_start(self):
555
"""Initialize custom protocol client"""
556
self.client = CustomProtocolClient("localhost", 9999)
557
self.client.connect()
558
559
@task(3)
560
def get_data(self):
561
"""Get data command"""
562
self.client.send_command("GET_DATA")
563
564
@task(2)
565
def set_data(self):
566
"""Set data command"""
567
value = random.randint(1, 1000)
568
self.client.send_command(f"SET_DATA:{value}")
569
570
@task(1)
571
def ping(self):
572
"""Ping command"""
573
self.client.send_command("PING")
574
575
def on_stop(self):
576
"""Cleanup connection"""
577
self.client.disconnect()
578
```
579
580
## Types
581
582
```python { .api }
583
from typing import Dict, Any, Optional, Callable, Union
584
import socket
585
586
# MongoDB types
587
MongoDocument = Dict[str, Any]
588
MongoQuery = Dict[str, Any]
589
MongoConnection = Any # PyMongo connection object
590
591
# PostgreSQL types
592
PostgresQuery = str
593
PostgresParams = Union[tuple, list, Dict[str, Any]]
594
PostgresConnection = Any # psycopg connection object
595
596
# WebSocket/SocketIO types
597
SocketMessage = Union[str, bytes, Dict[str, Any]]
598
SocketEvent = str
599
SocketCallback = Callable[[Any], None]
600
SocketConnection = Any # WebSocket connection object
601
602
# Custom protocol types
603
ProtocolMessage = Union[str, bytes]
604
ProtocolResponse = Union[str, bytes, Dict[str, Any]]
605
CustomSocket = socket.socket
606
```