or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

contrib.mddebugging.mdevents.mdexceptions.mdindex.mdload-shapes.mdtasksets.mduser-classes.mdwait-time.md

contrib.mddocs/

0

# Protocol Extensions

1

2

Locust's contrib modules extend load testing capabilities beyond HTTP to support databases, WebSockets, and other protocols. These modules provide specialized user classes and clients for comprehensive system testing.

3

4

## Capabilities

5

6

### FastHTTP Support

7

8

High-performance HTTP client using geventhttpclient for better performance under high concurrency loads.

9

10

```python { .api }

11

from locust.contrib.fasthttp import FastHttpUser

12

13

class FastHttpUser(User):

14

"""

15

High-performance HTTP user using geventhttpclient.

16

17

Attributes:

18

client (FastHttpSession): Fast HTTP client

19

network_timeout (float): Network timeout in seconds

20

connection_timeout (float): Connection timeout in seconds

21

max_redirects (int): Maximum redirects to follow

22

max_retries (int): Maximum retry attempts

23

insecure (bool): Skip SSL verification

24

default_headers (dict): Default HTTP headers

25

concurrency (int): Connection concurrency level

26

proxy_host (str): Proxy hostname

27

proxy_port (int): Proxy port

28

"""

29

30

def rest(self, method, url, **kwargs):

31

"""Make REST API request with JSON handling."""

32

33

def rest_(self, method, url, **kwargs):

34

"""Make REST API request with response context manager."""

35

```

36

37

### MongoDB Support

38

39

Load testing for MongoDB databases with connection pooling and query operations.

40

41

```python { .api }

42

from locust.contrib.mongodb import MongoDBUser, MongoDBClient

43

44

class MongoDBClient:

45

"""

46

MongoDB client wrapper for load testing.

47

48

Provides MongoDB operations with request event integration

49

for statistics collection and performance monitoring.

50

"""

51

52

def __init__(self, host, port=27017, **kwargs):

53

"""

54

Initialize MongoDB client.

55

56

Args:

57

host (str): MongoDB host

58

port (int): MongoDB port (default: 27017)

59

**kwargs: Additional MongoDB connection parameters

60

"""

61

62

class MongoDBUser(User):

63

"""

64

User class for MongoDB load testing.

65

66

Provides MongoDB client with automatic statistics collection

67

and connection management for load testing scenarios.

68

69

Attributes:

70

client (MongoDBClient): MongoDB client instance

71

"""

72

```

73

74

### PostgreSQL Support

75

76

Load testing for PostgreSQL databases with connection pooling and SQL query execution.

77

78

```python { .api }

79

from locust.contrib.postgres import PostgresUser, PostgresClient

80

81

class PostgresClient:

82

"""

83

PostgreSQL client wrapper for load testing.

84

85

Provides SQL query execution with request event integration

86

for performance monitoring and statistics collection.

87

"""

88

89

def __init__(self, host, port=5432, database=None, user=None, password=None, **kwargs):

90

"""

91

Initialize PostgreSQL client.

92

93

Args:

94

host (str): PostgreSQL host

95

port (int): PostgreSQL port (default: 5432)

96

database (str): Database name

97

user (str): Username

98

password (str): Password

99

**kwargs: Additional connection parameters

100

"""

101

102

class PostgresUser(User):

103

"""

104

User class for PostgreSQL load testing.

105

106

Provides PostgreSQL client with automatic connection management

107

and statistics collection for database load testing.

108

109

Attributes:

110

client (PostgresClient): PostgreSQL client instance

111

"""

112

```

113

114

### WebSocket/SocketIO Support

115

116

Load testing for real-time WebSocket and Socket.IO applications.

117

118

```python { .api }

119

from locust.contrib.socketio import SocketIOUser

120

121

class SocketIOUser(User):

122

"""

123

User class for WebSocket/Socket.IO load testing.

124

125

Supports real-time bi-directional communication testing

126

with event handling and connection management.

127

"""

128

129

def connect(self):

130

"""

131

Establish WebSocket/Socket.IO connection.

132

133

Returns:

134

Connection object

135

"""

136

137

def send(self, data):

138

"""

139

Send data over WebSocket connection.

140

141

Args:

142

data: Data to send

143

"""

144

145

def emit(self, event, data=None):

146

"""

147

Emit Socket.IO event.

148

149

Args:

150

event (str): Event name

151

data: Event data (optional)

152

"""

153

154

def call(self, event, data=None, timeout=60):

155

"""

156

Emit event and wait for response.

157

158

Args:

159

event (str): Event name

160

data: Event data (optional)

161

timeout (int): Response timeout in seconds

162

163

Returns:

164

Response data

165

"""

166

167

def on_message(self, message):

168

"""

169

Handle incoming WebSocket message.

170

171

Args:

172

message: Received message

173

"""

174

```

175

176

### Milvus Vector Database Support

177

178

Load testing for Milvus vector database operations including vector search and insertion.

179

180

```python { .api }

181

from locust.contrib.milvus import MilvusUser, MilvusClient

182

183

class MilvusClient:

184

"""

185

Milvus vector database client for load testing.

186

187

Provides vector operations with request event integration

188

for performance monitoring of vector database workloads.

189

"""

190

191

class MilvusUser(User):

192

"""

193

User class for Milvus vector database load testing.

194

195

Supports vector search, insertion, and other Milvus operations

196

with automatic statistics collection and connection management.

197

198

Attributes:

199

client (MilvusClient): Milvus client instance

200

"""

201

```

202

203

### OpenAI API Support

204

205

Load testing for OpenAI API endpoints including chat completions and embeddings.

206

207

```python { .api }

208

from locust.contrib.oai import OpenAIUser, OpenAIClient

209

210

class OpenAIClient:

211

"""

212

OpenAI API client for load testing.

213

214

Provides OpenAI API operations with request event integration

215

for performance monitoring of AI service workloads.

216

"""

217

218

class OpenAIUser(User):

219

"""

220

User class for OpenAI API load testing.

221

222

Supports chat completions, embeddings, and other OpenAI operations

223

with automatic statistics collection and rate limiting handling.

224

225

Attributes:

226

client (OpenAIClient): OpenAI API client instance

227

"""

228

```

229

230

## Usage Examples

231

232

### MongoDB Load Testing

233

234

```python

235

from locust import task, between

236

from locust.contrib.mongodb import MongoDBUser

237

import random

238

239

class MongoUser(MongoDBUser):

240

wait_time = between(1, 3)

241

host = "mongodb://localhost:27017"

242

243

def on_start(self):

244

# Connect to specific database and collection

245

self.db = self.client.get_database("testdb")

246

self.collection = self.db.get_collection("users")

247

248

@task(3)

249

def read_user(self):

250

"""Read random user document"""

251

user_id = random.randint(1, 10000)

252

user = self.collection.find_one({"user_id": user_id})

253

254

@task(2)

255

def find_users(self):

256

"""Find users by criteria"""

257

age_limit = random.randint(18, 65)

258

users = list(self.collection.find({"age": {"$gte": age_limit}}).limit(10))

259

260

@task(1)

261

def insert_user(self):

262

"""Insert new user document"""

263

user_data = {

264

"user_id": random.randint(10001, 99999),

265

"name": f"User {random.randint(1, 1000)}",

266

"age": random.randint(18, 80),

267

"email": f"user{random.randint(1, 1000)}@example.com"

268

}

269

self.collection.insert_one(user_data)

270

271

@task(1)

272

def update_user(self):

273

"""Update existing user"""

274

user_id = random.randint(1, 10000)

275

self.collection.update_one(

276

{"user_id": user_id},

277

{"$set": {"last_login": datetime.utcnow()}}

278

)

279

```

280

281

### PostgreSQL Load Testing

282

283

```python

284

from locust import task, between

285

from locust.contrib.postgres import PostgresUser

286

import random

287

288

class PostgresUser(PostgresUser):

289

wait_time = between(0.5, 2)

290

291

# Database connection parameters

292

host = "localhost"

293

port = 5432

294

database = "testdb"

295

user = "testuser"

296

password = "testpass"

297

298

@task(5)

299

def select_query(self):

300

"""Execute SELECT query"""

301

user_id = random.randint(1, 10000)

302

query = "SELECT * FROM users WHERE id = %s"

303

self.client.execute(query, (user_id,))

304

305

@task(3)

306

def aggregate_query(self):

307

"""Execute aggregate query"""

308

query = """

309

SELECT department, COUNT(*), AVG(salary)

310

FROM employees

311

GROUP BY department

312

ORDER BY COUNT(*) DESC

313

"""

314

self.client.execute(query)

315

316

@task(2)

317

def insert_record(self):

318

"""Insert new record"""

319

query = """

320

INSERT INTO users (name, email, created_at)

321

VALUES (%s, %s, NOW())

322

"""

323

name = f"User {random.randint(1, 1000)}"

324

email = f"user{random.randint(1, 1000)}@example.com"

325

self.client.execute(query, (name, email))

326

327

@task(1)

328

def complex_join(self):

329

"""Execute complex JOIN query"""

330

query = """

331

SELECT u.name, p.title, c.name as category

332

FROM users u

333

JOIN posts p ON u.id = p.user_id

334

JOIN categories c ON p.category_id = c.id

335

WHERE u.created_at > %s

336

LIMIT 100

337

"""

338

cutoff_date = "2023-01-01"

339

self.client.execute(query, (cutoff_date,))

340

```

341

342

### WebSocket/SocketIO Load Testing

343

344

```python

345

from locust import task, between

346

from locust.contrib.socketio import SocketIOUser

347

import random

348

import json

349

350

class SocketIOUser(SocketIOUser):

351

wait_time = between(1, 5)

352

353

def on_start(self):

354

"""Connect to Socket.IO server"""

355

self.connect()

356

357

# Subscribe to channels

358

self.emit("join_room", {"room": "general"})

359

self.emit("subscribe", {"channel": "notifications"})

360

361

@task(3)

362

def send_message(self):

363

"""Send chat message"""

364

message_data = {

365

"room": "general",

366

"message": f"Hello from user {self.user_id}",

367

"timestamp": time.time()

368

}

369

self.emit("message", message_data)

370

371

@task(2)

372

def request_data(self):

373

"""Request data with response"""

374

try:

375

response = self.call("get_user_data", {"user_id": self.user_id}, timeout=10)

376

if response:

377

# Process response data

378

self.process_user_data(response)

379

except Exception as e:

380

print(f"Request failed: {e}")

381

382

@task(1)

383

def send_notification(self):

384

"""Send notification to random user"""

385

target_user = random.randint(1, 100)

386

notification = {

387

"target_user": target_user,

388

"type": "friend_request",

389

"from_user": self.user_id

390

}

391

self.emit("notification", notification)

392

393

def on_message(self, message):

394

"""Handle incoming messages"""

395

data = json.loads(message)

396

message_type = data.get("type")

397

398

if message_type == "chat":

399

# Handle chat message

400

self.handle_chat_message(data)

401

elif message_type == "notification":

402

# Handle notification

403

self.handle_notification(data)

404

405

def process_user_data(self, data):

406

"""Process received user data"""

407

# Custom processing logic

408

pass

409

410

def handle_chat_message(self, message):

411

"""Handle incoming chat message"""

412

# Simulate reading message

413

pass

414

415

def handle_notification(self, notification):

416

"""Handle incoming notification"""

417

# Simulate processing notification

418

pass

419

```

420

421

### FastHTTP Performance Testing

422

423

```python

424

from locust import task, between

425

from locust.contrib.fasthttp import FastHttpUser

426

import random

427

428

class HighPerformanceUser(FastHttpUser):

429

wait_time = between(0.1, 0.5) # Very fast requests

430

431

# FastHTTP configuration for maximum performance

432

connection_timeout = 60.0

433

network_timeout = 60.0

434

concurrency = 20

435

insecure = True # Skip SSL verification for performance

436

437

@task(10)

438

def fast_api_get(self):

439

"""High-frequency GET requests"""

440

endpoint_id = random.randint(1, 1000)

441

self.client.get(f"/api/fast/{endpoint_id}")

442

443

@task(5)

444

def batch_request(self):

445

"""Batch multiple requests quickly"""

446

for i in range(5):

447

self.client.get(f"/api/batch/{i}")

448

449

@task(3)

450

def rest_api_call(self):

451

"""REST API with JSON handling"""

452

data = self.rest("GET", f"/api/data/{random.randint(1, 100)}")

453

454

# Process response data

455

if data and "id" in data:

456

# Follow up with related request

457

self.client.get(f"/api/related/{data['id']}")

458

459

@task(2)

460

def post_with_validation(self):

461

"""POST request with response validation"""

462

payload = {

463

"user_id": random.randint(1, 1000),

464

"action": "fast_update",

465

"timestamp": time.time()

466

}

467

468

with self.rest_("POST", "/api/update", json=payload) as response:

469

if response.js.get("status") != "success":

470

response.failure("Update failed")

471

472

# Validate response time

473

if response.elapsed.total_seconds() > 0.1:

474

response.failure("Response too slow")

475

```

476

477

### Custom Protocol Implementation

478

479

```python

480

from locust import User, task, between, events

481

import socket

482

import time

483

484

class CustomProtocolClient:

485

"""Custom protocol client implementation"""

486

487

def __init__(self, host, port):

488

self.host = host

489

self.port = port

490

self.socket = None

491

492

def connect(self):

493

"""Connect to custom protocol server"""

494

start_time = time.time()

495

try:

496

self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

497

self.socket.connect((self.host, self.port))

498

499

# Fire request event for statistics

500

events.request.fire(

501

request_type="CONNECT",

502

name="connect",

503

response_time=(time.time() - start_time) * 1000,

504

response_length=0

505

)

506

except Exception as e:

507

events.request.fire(

508

request_type="CONNECT",

509

name="connect",

510

response_time=(time.time() - start_time) * 1000,

511

response_length=0,

512

exception=e

513

)

514

raise

515

516

def send_command(self, command):

517

"""Send command over custom protocol"""

518

start_time = time.time()

519

try:

520

# Send command

521

self.socket.send(command.encode())

522

523

# Receive response

524

response = self.socket.recv(1024)

525

526

# Fire request event

527

events.request.fire(

528

request_type="CUSTOM",

529

name=command,

530

response_time=(time.time() - start_time) * 1000,

531

response_length=len(response)

532

)

533

534

return response.decode()

535

536

except Exception as e:

537

events.request.fire(

538

request_type="CUSTOM",

539

name=command,

540

response_time=(time.time() - start_time) * 1000,

541

response_length=0,

542

exception=e

543

)

544

raise

545

546

def disconnect(self):

547

"""Disconnect from server"""

548

if self.socket:

549

self.socket.close()

550

551

class CustomProtocolUser(User):

552

wait_time = between(1, 3)

553

554

def on_start(self):

555

"""Initialize custom protocol client"""

556

self.client = CustomProtocolClient("localhost", 9999)

557

self.client.connect()

558

559

@task(3)

560

def get_data(self):

561

"""Get data command"""

562

self.client.send_command("GET_DATA")

563

564

@task(2)

565

def set_data(self):

566

"""Set data command"""

567

value = random.randint(1, 1000)

568

self.client.send_command(f"SET_DATA:{value}")

569

570

@task(1)

571

def ping(self):

572

"""Ping command"""

573

self.client.send_command("PING")

574

575

def on_stop(self):

576

"""Cleanup connection"""

577

self.client.disconnect()

578

```

579

580

## Types

581

582

```python { .api }

583

from typing import Dict, Any, Optional, Callable, Union

584

import socket

585

586

# MongoDB types

587

MongoDocument = Dict[str, Any]

588

MongoQuery = Dict[str, Any]

589

MongoConnection = Any # PyMongo connection object

590

591

# PostgreSQL types

592

PostgresQuery = str

593

PostgresParams = Union[tuple, list, Dict[str, Any]]

594

PostgresConnection = Any # psycopg connection object

595

596

# WebSocket/SocketIO types

597

SocketMessage = Union[str, bytes, Dict[str, Any]]

598

SocketEvent = str

599

SocketCallback = Callable[[Any], None]

600

SocketConnection = Any # WebSocket connection object

601

602

# Custom protocol types

603

ProtocolMessage = Union[str, bytes]

604

ProtocolResponse = Union[str, bytes, Dict[str, Any]]

605

CustomSocket = socket.socket

606

```