or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifacts-files.mdassertions.mdcli.mdconfiguration.mdcontext-cleanup.mdevents.mdexecution-control.mdindex.mdparameterization.mdtest-definition.md

context-cleanup.mddocs/

0

# Context Management and Cleanup

1

2

Context providers and deferred cleanup actions for managing test state and resources.

3

4

## Capabilities

5

6

### Context Decorator

7

8

Mark functions as context providers for enhanced test organization and state management.

9

10

```python { .api }

11

def context(fn: Callable) -> Callable:

12

"""

13

Decorator to mark functions as context providers.

14

15

Context functions provide setup, state, or resources that can be

16

shared across test steps or scenarios.

17

18

Args:

19

fn: Function to mark as a context provider

20

21

Returns:

22

The function with context metadata attached

23

"""

24

```

25

26

#### Usage Example

27

28

```python

29

from vedro import scenario, context, given, when, then, ensure

30

31

@scenario("Database operations with context")

32

def test_database_operations():

33

34

@context

35

def database_connection():

36

"""Provides a database connection for the test."""

37

conn = create_test_database_connection()

38

try:

39

yield conn

40

finally:

41

conn.close()

42

43

@context

44

def test_user(db_conn):

45

"""Provides a test user in the database."""

46

user = create_test_user(db_conn, {

47

"username": "testuser",

48

"email": "test@example.com"

49

})

50

return user

51

52

@given("database with test user")

53

def setup(database_connection, test_user):

54

return {

55

"connection": database_connection,

56

"user": test_user

57

}

58

59

@when("user data is updated")

60

def action(context):

61

updated_user = update_user(

62

context["connection"],

63

context["user"].id,

64

{"email": "updated@example.com"}

65

)

66

return updated_user

67

68

@then("update is persisted")

69

def verification(updated_user, context):

70

# Verify in database

71

db_user = get_user(context["connection"], updated_user.id)

72

ensure(db_user.email).equals("updated@example.com")

73

```

74

75

### Deferred Cleanup

76

77

Schedule cleanup actions to be executed after scenario or global completion.

78

79

```python { .api }

80

def defer(fn: Callable, *args, **kwargs) -> None:

81

"""

82

Schedule a cleanup function to be called after the current scenario completes.

83

84

Args:

85

fn: Function to call for cleanup

86

*args: Positional arguments to pass to the cleanup function

87

**kwargs: Keyword arguments to pass to the cleanup function

88

"""

89

90

def defer_global(fn: Callable, *args, **kwargs) -> None:

91

"""

92

Schedule a cleanup function to be called after all tests complete.

93

94

Args:

95

fn: Function to call for cleanup

96

*args: Positional arguments to pass to the cleanup function

97

**kwargs: Keyword arguments to pass to the cleanup function

98

"""

99

```

100

101

#### Usage Example - Scenario Cleanup

102

103

```python

104

from vedro import scenario, given, when, then, defer, ensure

105

import tempfile

106

import os

107

108

@scenario("File operations with cleanup")

109

def test_file_operations():

110

111

@given("temporary files")

112

def setup():

113

# Create temporary files for testing

114

temp_files = []

115

116

for i in range(3):

117

fd, filepath = tempfile.mkstemp(suffix=f"_test_{i}.txt")

118

os.close(fd) # Close file descriptor

119

120

# Write test content

121

with open(filepath, 'w') as f:

122

f.write(f"Test content {i}")

123

124

temp_files.append(filepath)

125

126

# Schedule cleanup for each file

127

defer(os.unlink, filepath)

128

129

return {"temp_files": temp_files}

130

131

@when("files are processed")

132

def action(context):

133

results = []

134

135

for filepath in context["temp_files"]:

136

# Read and process file

137

with open(filepath, 'r') as f:

138

content = f.read()

139

140

processed_content = content.upper()

141

142

# Create output file

143

output_file = filepath + ".processed"

144

with open(output_file, 'w') as f:

145

f.write(processed_content)

146

147

# Schedule cleanup for output file too

148

defer(os.unlink, output_file)

149

150

results.append({

151

"input": filepath,

152

"output": output_file,

153

"content": processed_content

154

})

155

156

return results

157

158

@then("processing completes successfully")

159

def verification(results):

160

ensure(len(results)).equals(3)

161

162

for result in results:

163

# Verify files exist during test

164

ensure(os.path.exists(result["input"])).is_true()

165

ensure(os.path.exists(result["output"])).is_true()

166

167

# Verify content

168

ensure(result["content"]).contains("TEST CONTENT")

169

170

# Files will be cleaned up automatically after scenario ends

171

```

172

173

#### Usage Example - Global Cleanup

174

175

```python

176

from vedro import scenario, defer_global, given, when, then, ensure

177

import subprocess

178

import signal

179

import time

180

181

@scenario("Service lifecycle management")

182

def test_service_lifecycle():

183

184

@given("test service is started")

185

def setup():

186

# Start a test service process

187

service_process = subprocess.Popen([

188

"python", "-m", "http.server", "8999"

189

], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

190

191

# Schedule global cleanup to stop the service

192

# This will run after ALL tests complete

193

defer_global(terminate_process_safely, service_process)

194

195

# Wait for service to start

196

time.sleep(2)

197

198

return {"service_process": service_process}

199

200

@when("service is accessed")

201

def action(context):

202

import requests

203

204

# Access the test service

205

response = requests.get("http://localhost:8999")

206

207

return {

208

"response": response,

209

"service_pid": context["service_process"].pid

210

}

211

212

@then("service responds correctly")

213

def verification(result):

214

ensure(result["response"].status_code).equals(200)

215

ensure(result["service_pid"]).is_greater_than(0)

216

217

# Service will continue running for other tests

218

# and be cleaned up globally at the end

219

220

def terminate_process_safely(process):

221

"""Helper function for safe process termination."""

222

try:

223

process.terminate()

224

process.wait(timeout=5)

225

except subprocess.TimeoutExpired:

226

process.kill()

227

process.wait()

228

```

229

230

### Resource Management Patterns

231

232

Combine context providers with deferred cleanup for robust resource management.

233

234

```python

235

@scenario("Complex resource management")

236

def test_complex_resources():

237

238

@context

239

def database_pool():

240

"""Provides a database connection pool."""

241

pool = create_connection_pool(

242

host="localhost",

243

database="test_db",

244

min_connections=2,

245

max_connections=10

246

)

247

248

# Schedule cleanup

249

defer(pool.close_all_connections)

250

251

return pool

252

253

@context

254

def cache_client():

255

"""Provides a cache client (Redis, Memcached, etc.)."""

256

client = create_cache_client("localhost:6379")

257

258

# Clean up cache data and close connection

259

defer(client.flushdb) # Clear test data

260

defer(client.close) # Close connection

261

262

return client

263

264

@context

265

def message_queue():

266

"""Provides a message queue for testing."""

267

queue = create_message_queue("test_queue")

268

269

# Clean up queue

270

defer(queue.purge)

271

defer(queue.close)

272

273

return queue

274

275

@given("all services are available")

276

def setup(database_pool, cache_client, message_queue):

277

# Verify all resources are ready

278

db_conn = database_pool.get_connection()

279

ensure(db_conn.is_connected()).is_true()

280

database_pool.return_connection(db_conn)

281

282

cache_client.set("health_check", "ok")

283

ensure(cache_client.get("health_check")).equals("ok")

284

285

message_queue.publish("health_check", {"status": "ready"})

286

287

return {

288

"db_pool": database_pool,

289

"cache": cache_client,

290

"queue": message_queue

291

}

292

293

@when("complex operation is performed")

294

def action(context):

295

# Use all resources in a coordinated operation

296

db_conn = context["db_pool"].get_connection()

297

298

try:

299

# Database operation

300

user_data = {"id": 123, "name": "Test User", "email": "test@example.com"}

301

create_user(db_conn, user_data)

302

303

# Cache operation

304

context["cache"].set(f"user:{user_data['id']}", json.dumps(user_data))

305

306

# Queue operation

307

context["queue"].publish("user_created", user_data)

308

309

return user_data

310

311

finally:

312

context["db_pool"].return_connection(db_conn)

313

314

@then("operation completes successfully across all services")

315

def verification(result, context):

316

# Verify database

317

db_conn = context["db_pool"].get_connection()

318

try:

319

user = get_user(db_conn, result["id"])

320

ensure(user.name).equals("Test User")

321

finally:

322

context["db_pool"].return_connection(db_conn)

323

324

# Verify cache

325

cached_data = context["cache"].get(f"user:{result['id']}")

326

ensure(cached_data).is_not_none()

327

cached_user = json.loads(cached_data)

328

ensure(cached_user["name"]).equals("Test User")

329

330

# Verify queue (check message was processed)

331

messages = context["queue"].get_recent_messages("user_created")

332

ensure(len(messages)).is_greater_than(0)

333

334

# All cleanup will happen automatically via deferred functions

335

```

336

337

## Advanced Patterns

338

339

### Nested Context Management

340

341

Create hierarchical context providers for complex setups:

342

343

```python

344

@scenario("Nested context management")

345

def test_nested_contexts():

346

347

@context

348

def test_environment():

349

"""Top-level environment setup."""

350

env = {

351

"name": "test",

352

"isolated": True,

353

"resources": []

354

}

355

356

# Global environment cleanup

357

defer_global(cleanup_test_environment, env)

358

359

return env

360

361

@context

362

def application_server(test_environment):

363

"""Application server within the test environment."""

364

server_config = {

365

"host": "localhost",

366

"port": 8000,

367

"environment": test_environment["name"]

368

}

369

370

server = start_application_server(server_config)

371

test_environment["resources"].append(server)

372

373

# Server-specific cleanup

374

defer(stop_application_server, server)

375

376

return server

377

378

@context

379

def test_client(application_server):

380

"""Test client connected to the application server."""

381

client = create_test_client(

382

base_url=f"http://{application_server.host}:{application_server.port}"

383

)

384

385

# Client cleanup

386

defer(client.close)

387

388

return client

389

390

@given("fully configured test environment")

391

def setup(test_environment, application_server, test_client):

392

# Wait for everything to be ready

393

ensure(application_server.is_healthy()).is_true()

394

ensure(test_client.can_connect()).is_true()

395

396

return {

397

"environment": test_environment,

398

"server": application_server,

399

"client": test_client

400

}

401

402

@when("application is tested")

403

def action(context):

404

# Perform application tests using the client

405

response = context["client"].get("/api/health")

406

407

return {"health_response": response}

408

409

@then("application responds correctly")

410

def verification(result):

411

ensure(result["health_response"].status_code).equals(200)

412

ensure(result["health_response"].json()["status"]).equals("healthy")

413

414

def cleanup_test_environment(env):

415

"""Clean up test environment resources."""

416

for resource in env["resources"]:

417

try:

418

resource.cleanup()

419

except Exception as e:

420

print(f"Warning: Failed to cleanup resource {resource}: {e}")

421

```

422

423

### Conditional Cleanup

424

425

Perform cleanup only under certain conditions:

426

427

```python

428

@scenario("Conditional resource cleanup")

429

def test_conditional_cleanup():

430

431

@given("conditional resources")

432

def setup():

433

# Create resources based on conditions

434

resources = []

435

436

if os.environ.get("CREATE_DATABASE"):

437

db = create_test_database()

438

resources.append(("database", db))

439

440

# Only clean up database if we created it

441

defer(cleanup_database, db)

442

443

if os.environ.get("START_SERVICES"):

444

services = start_test_services()

445

resources.append(("services", services))

446

447

# Conditional cleanup based on success

448

def conditional_service_cleanup():

449

if hasattr(services, 'failed') and services.failed:

450

# Keep services running for debugging if they failed

451

print("Keeping failed services for debugging")

452

else:

453

stop_test_services(services)

454

455

defer(conditional_service_cleanup)

456

457

return {"resources": resources}

458

459

@when("tests run with available resources")

460

def action(context):

461

results = {}

462

463

for resource_type, resource in context["resources"]:

464

if resource_type == "database":

465

results["db_test"] = test_database_operations(resource)

466

elif resource_type == "services":

467

results["service_test"] = test_service_operations(resource)

468

469

return results

470

471

@then("tests complete successfully")

472

def verification(results):

473

for test_name, result in results.items():

474

ensure(result.success).is_true()

475

476

# Mark services as successful to allow normal cleanup

477

if "service" in test_name and hasattr(result, 'service_ref'):

478

delattr(result.service_ref, 'failed')

479

```

480

481

### Cleanup Error Handling

482

483

Handle cleanup failures gracefully:

484

485

```python

486

def safe_cleanup(cleanup_func, *args, **kwargs):

487

"""Wrapper for safe cleanup that logs but doesn't fail."""

488

try:

489

cleanup_func(*args, **kwargs)

490

except Exception as e:

491

import logging

492

logging.warning(f"Cleanup failed for {cleanup_func.__name__}: {e}")

493

494

@scenario("Robust cleanup handling")

495

def test_robust_cleanup():

496

497

@given("resources with potential cleanup issues")

498

def setup():

499

# Create multiple resources, some may fail to clean up

500

resources = []

501

502

for i in range(3):

503

resource = create_test_resource(f"resource_{i}")

504

resources.append(resource)

505

506

# Use safe cleanup wrapper

507

defer(safe_cleanup, cleanup_resource, resource)

508

509

# Also schedule a critical cleanup that must succeed

510

critical_resource = create_critical_resource()

511

512

def critical_cleanup():

513

try:

514

cleanup_critical_resource(critical_resource)

515

except Exception as e:

516

# Log error and try alternative cleanup

517

logging.error(f"Critical cleanup failed: {e}")

518

alternative_cleanup(critical_resource)

519

520

defer(critical_cleanup)

521

522

return {"resources": resources, "critical": critical_resource}

523

524

@when("operations are performed")

525

def action(context):

526

# Use all resources

527

results = []

528

529

for resource in context["resources"]:

530

result = use_resource(resource)

531

results.append(result)

532

533

critical_result = use_critical_resource(context["critical"])

534

535

return {"regular_results": results, "critical_result": critical_result}

536

537

@then("operations succeed despite potential cleanup issues")

538

def verification(results):

539

# Test should pass even if some cleanup fails

540

ensure(len(results["regular_results"])).equals(3)

541

ensure(results["critical_result"].success).is_true()

542

543

# Cleanup errors will be logged but won't fail the test

544

```