0
# Context Management and Cleanup
1
2
Context providers and deferred cleanup actions for managing test state and resources.
3
4
## Capabilities
5
6
### Context Decorator
7
8
Mark functions as context providers for enhanced test organization and state management.
9
10
```python { .api }
11
def context(fn: Callable) -> Callable:
12
"""
13
Decorator to mark functions as context providers.
14
15
Context functions provide setup, state, or resources that can be
16
shared across test steps or scenarios.
17
18
Args:
19
fn: Function to mark as a context provider
20
21
Returns:
22
The function with context metadata attached
23
"""
24
```
25
26
#### Usage Example
27
28
```python
29
from vedro import scenario, context, given, when, then, ensure
30
31
@scenario("Database operations with context")
32
def test_database_operations():
33
34
@context
35
def database_connection():
36
"""Provides a database connection for the test."""
37
conn = create_test_database_connection()
38
try:
39
yield conn
40
finally:
41
conn.close()
42
43
@context
44
def test_user(db_conn):
45
"""Provides a test user in the database."""
46
user = create_test_user(db_conn, {
47
"username": "testuser",
48
"email": "test@example.com"
49
})
50
return user
51
52
@given("database with test user")
53
def setup(database_connection, test_user):
54
return {
55
"connection": database_connection,
56
"user": test_user
57
}
58
59
@when("user data is updated")
60
def action(context):
61
updated_user = update_user(
62
context["connection"],
63
context["user"].id,
64
{"email": "updated@example.com"}
65
)
66
return updated_user
67
68
@then("update is persisted")
69
def verification(updated_user, context):
70
# Verify in database
71
db_user = get_user(context["connection"], updated_user.id)
72
ensure(db_user.email).equals("updated@example.com")
73
```
74
75
### Deferred Cleanup
76
77
Schedule cleanup actions to be executed after scenario or global completion.
78
79
```python { .api }
80
def defer(fn: Callable, *args, **kwargs) -> None:
81
"""
82
Schedule a cleanup function to be called after the current scenario completes.
83
84
Args:
85
fn: Function to call for cleanup
86
*args: Positional arguments to pass to the cleanup function
87
**kwargs: Keyword arguments to pass to the cleanup function
88
"""
89
90
def defer_global(fn: Callable, *args, **kwargs) -> None:
91
"""
92
Schedule a cleanup function to be called after all tests complete.
93
94
Args:
95
fn: Function to call for cleanup
96
*args: Positional arguments to pass to the cleanup function
97
**kwargs: Keyword arguments to pass to the cleanup function
98
"""
99
```
100
101
#### Usage Example - Scenario Cleanup
102
103
```python
104
from vedro import scenario, given, when, then, defer, ensure
105
import tempfile
106
import os
107
108
@scenario("File operations with cleanup")
109
def test_file_operations():
110
111
@given("temporary files")
112
def setup():
113
# Create temporary files for testing
114
temp_files = []
115
116
for i in range(3):
117
fd, filepath = tempfile.mkstemp(suffix=f"_test_{i}.txt")
118
os.close(fd) # Close file descriptor
119
120
# Write test content
121
with open(filepath, 'w') as f:
122
f.write(f"Test content {i}")
123
124
temp_files.append(filepath)
125
126
# Schedule cleanup for each file
127
defer(os.unlink, filepath)
128
129
return {"temp_files": temp_files}
130
131
@when("files are processed")
132
def action(context):
133
results = []
134
135
for filepath in context["temp_files"]:
136
# Read and process file
137
with open(filepath, 'r') as f:
138
content = f.read()
139
140
processed_content = content.upper()
141
142
# Create output file
143
output_file = filepath + ".processed"
144
with open(output_file, 'w') as f:
145
f.write(processed_content)
146
147
# Schedule cleanup for output file too
148
defer(os.unlink, output_file)
149
150
results.append({
151
"input": filepath,
152
"output": output_file,
153
"content": processed_content
154
})
155
156
return results
157
158
@then("processing completes successfully")
159
def verification(results):
160
ensure(len(results)).equals(3)
161
162
for result in results:
163
# Verify files exist during test
164
ensure(os.path.exists(result["input"])).is_true()
165
ensure(os.path.exists(result["output"])).is_true()
166
167
# Verify content
168
ensure(result["content"]).contains("TEST CONTENT")
169
170
# Files will be cleaned up automatically after scenario ends
171
```
172
173
#### Usage Example - Global Cleanup
174
175
```python
176
from vedro import scenario, defer_global, given, when, then, ensure
177
import subprocess
178
import signal
179
import time
180
181
@scenario("Service lifecycle management")
182
def test_service_lifecycle():
183
184
@given("test service is started")
185
def setup():
186
# Start a test service process
187
service_process = subprocess.Popen([
188
"python", "-m", "http.server", "8999"
189
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
190
191
# Schedule global cleanup to stop the service
192
# This will run after ALL tests complete
193
defer_global(terminate_process_safely, service_process)
194
195
# Wait for service to start
196
time.sleep(2)
197
198
return {"service_process": service_process}
199
200
@when("service is accessed")
201
def action(context):
202
import requests
203
204
# Access the test service
205
response = requests.get("http://localhost:8999")
206
207
return {
208
"response": response,
209
"service_pid": context["service_process"].pid
210
}
211
212
@then("service responds correctly")
213
def verification(result):
214
ensure(result["response"].status_code).equals(200)
215
ensure(result["service_pid"]).is_greater_than(0)
216
217
# Service will continue running for other tests
218
# and be cleaned up globally at the end
219
220
def terminate_process_safely(process):
221
"""Helper function for safe process termination."""
222
try:
223
process.terminate()
224
process.wait(timeout=5)
225
except subprocess.TimeoutExpired:
226
process.kill()
227
process.wait()
228
```
229
230
### Resource Management Patterns
231
232
Combine context providers with deferred cleanup for robust resource management.
233
234
```python
235
@scenario("Complex resource management")
236
def test_complex_resources():
237
238
@context
239
def database_pool():
240
"""Provides a database connection pool."""
241
pool = create_connection_pool(
242
host="localhost",
243
database="test_db",
244
min_connections=2,
245
max_connections=10
246
)
247
248
# Schedule cleanup
249
defer(pool.close_all_connections)
250
251
return pool
252
253
@context
254
def cache_client():
255
"""Provides a cache client (Redis, Memcached, etc.)."""
256
client = create_cache_client("localhost:6379")
257
258
# Clean up cache data and close connection
259
defer(client.flushdb) # Clear test data
260
defer(client.close) # Close connection
261
262
return client
263
264
@context
265
def message_queue():
266
"""Provides a message queue for testing."""
267
queue = create_message_queue("test_queue")
268
269
# Clean up queue
270
defer(queue.purge)
271
defer(queue.close)
272
273
return queue
274
275
@given("all services are available")
276
def setup(database_pool, cache_client, message_queue):
277
# Verify all resources are ready
278
db_conn = database_pool.get_connection()
279
ensure(db_conn.is_connected()).is_true()
280
database_pool.return_connection(db_conn)
281
282
cache_client.set("health_check", "ok")
283
ensure(cache_client.get("health_check")).equals("ok")
284
285
message_queue.publish("health_check", {"status": "ready"})
286
287
return {
288
"db_pool": database_pool,
289
"cache": cache_client,
290
"queue": message_queue
291
}
292
293
@when("complex operation is performed")
294
def action(context):
295
# Use all resources in a coordinated operation
296
db_conn = context["db_pool"].get_connection()
297
298
try:
299
# Database operation
300
user_data = {"id": 123, "name": "Test User", "email": "test@example.com"}
301
create_user(db_conn, user_data)
302
303
# Cache operation
304
context["cache"].set(f"user:{user_data['id']}", json.dumps(user_data))
305
306
# Queue operation
307
context["queue"].publish("user_created", user_data)
308
309
return user_data
310
311
finally:
312
context["db_pool"].return_connection(db_conn)
313
314
@then("operation completes successfully across all services")
315
def verification(result, context):
316
# Verify database
317
db_conn = context["db_pool"].get_connection()
318
try:
319
user = get_user(db_conn, result["id"])
320
ensure(user.name).equals("Test User")
321
finally:
322
context["db_pool"].return_connection(db_conn)
323
324
# Verify cache
325
cached_data = context["cache"].get(f"user:{result['id']}")
326
ensure(cached_data).is_not_none()
327
cached_user = json.loads(cached_data)
328
ensure(cached_user["name"]).equals("Test User")
329
330
# Verify queue (check message was processed)
331
messages = context["queue"].get_recent_messages("user_created")
332
ensure(len(messages)).is_greater_than(0)
333
334
# All cleanup will happen automatically via deferred functions
335
```
336
337
## Advanced Patterns
338
339
### Nested Context Management
340
341
Create hierarchical context providers for complex setups:
342
343
```python
344
@scenario("Nested context management")
345
def test_nested_contexts():
346
347
@context
348
def test_environment():
349
"""Top-level environment setup."""
350
env = {
351
"name": "test",
352
"isolated": True,
353
"resources": []
354
}
355
356
# Global environment cleanup
357
defer_global(cleanup_test_environment, env)
358
359
return env
360
361
@context
362
def application_server(test_environment):
363
"""Application server within the test environment."""
364
server_config = {
365
"host": "localhost",
366
"port": 8000,
367
"environment": test_environment["name"]
368
}
369
370
server = start_application_server(server_config)
371
test_environment["resources"].append(server)
372
373
# Server-specific cleanup
374
defer(stop_application_server, server)
375
376
return server
377
378
@context
379
def test_client(application_server):
380
"""Test client connected to the application server."""
381
client = create_test_client(
382
base_url=f"http://{application_server.host}:{application_server.port}"
383
)
384
385
# Client cleanup
386
defer(client.close)
387
388
return client
389
390
@given("fully configured test environment")
391
def setup(test_environment, application_server, test_client):
392
# Wait for everything to be ready
393
ensure(application_server.is_healthy()).is_true()
394
ensure(test_client.can_connect()).is_true()
395
396
return {
397
"environment": test_environment,
398
"server": application_server,
399
"client": test_client
400
}
401
402
@when("application is tested")
403
def action(context):
404
# Perform application tests using the client
405
response = context["client"].get("/api/health")
406
407
return {"health_response": response}
408
409
@then("application responds correctly")
410
def verification(result):
411
ensure(result["health_response"].status_code).equals(200)
412
ensure(result["health_response"].json()["status"]).equals("healthy")
413
414
def cleanup_test_environment(env):
415
"""Clean up test environment resources."""
416
for resource in env["resources"]:
417
try:
418
resource.cleanup()
419
except Exception as e:
420
print(f"Warning: Failed to cleanup resource {resource}: {e}")
421
```
422
423
### Conditional Cleanup
424
425
Perform cleanup only under certain conditions:
426
427
```python
428
@scenario("Conditional resource cleanup")
429
def test_conditional_cleanup():
430
431
@given("conditional resources")
432
def setup():
433
# Create resources based on conditions
434
resources = []
435
436
if os.environ.get("CREATE_DATABASE"):
437
db = create_test_database()
438
resources.append(("database", db))
439
440
# Only clean up database if we created it
441
defer(cleanup_database, db)
442
443
if os.environ.get("START_SERVICES"):
444
services = start_test_services()
445
resources.append(("services", services))
446
447
# Conditional cleanup based on success
448
def conditional_service_cleanup():
449
if hasattr(services, 'failed') and services.failed:
450
# Keep services running for debugging if they failed
451
print("Keeping failed services for debugging")
452
else:
453
stop_test_services(services)
454
455
defer(conditional_service_cleanup)
456
457
return {"resources": resources}
458
459
@when("tests run with available resources")
460
def action(context):
461
results = {}
462
463
for resource_type, resource in context["resources"]:
464
if resource_type == "database":
465
results["db_test"] = test_database_operations(resource)
466
elif resource_type == "services":
467
results["service_test"] = test_service_operations(resource)
468
469
return results
470
471
@then("tests complete successfully")
472
def verification(results):
473
for test_name, result in results.items():
474
ensure(result.success).is_true()
475
476
# Mark services as successful to allow normal cleanup
477
if "service" in test_name and hasattr(result, 'service_ref'):
478
delattr(result.service_ref, 'failed')
479
```
480
481
### Cleanup Error Handling
482
483
Handle cleanup failures gracefully:
484
485
```python
486
def safe_cleanup(cleanup_func, *args, **kwargs):
487
"""Wrapper for safe cleanup that logs but doesn't fail."""
488
try:
489
cleanup_func(*args, **kwargs)
490
except Exception as e:
491
import logging
492
logging.warning(f"Cleanup failed for {cleanup_func.__name__}: {e}")
493
494
@scenario("Robust cleanup handling")
495
def test_robust_cleanup():
496
497
@given("resources with potential cleanup issues")
498
def setup():
499
# Create multiple resources, some may fail to clean up
500
resources = []
501
502
for i in range(3):
503
resource = create_test_resource(f"resource_{i}")
504
resources.append(resource)
505
506
# Use safe cleanup wrapper
507
defer(safe_cleanup, cleanup_resource, resource)
508
509
# Also schedule a critical cleanup that must succeed
510
critical_resource = create_critical_resource()
511
512
def critical_cleanup():
513
try:
514
cleanup_critical_resource(critical_resource)
515
except Exception as e:
516
# Log error and try alternative cleanup
517
logging.error(f"Critical cleanup failed: {e}")
518
alternative_cleanup(critical_resource)
519
520
defer(critical_cleanup)
521
522
return {"resources": resources, "critical": critical_resource}
523
524
@when("operations are performed")
525
def action(context):
526
# Use all resources
527
results = []
528
529
for resource in context["resources"]:
530
result = use_resource(resource)
531
results.append(result)
532
533
critical_result = use_critical_resource(context["critical"])
534
535
return {"regular_results": results, "critical_result": critical_result}
536
537
@then("operations succeed despite potential cleanup issues")
538
def verification(results):
539
# Test should pass even if some cleanup fails
540
ensure(len(results["regular_results"])).equals(3)
541
ensure(results["critical_result"].success).is_true()
542
543
# Cleanup errors will be logged but won't fail the test
544
```