Pragmatic Testing Framework for Python with BDD-style syntax and pluggable architecture
49
Pending
Does it follow best practices?
Impact
49%
1.08xAverage score across 10 eval scenarios
Pending
The risk profile of this skill
Context providers and deferred cleanup actions for managing test state and resources.
Mark functions as context providers for enhanced test organization and state management.
def context(fn: Callable) -> Callable:
"""
Decorator to mark functions as context providers.
Context functions provide setup, state, or resources that can be
shared across test steps or scenarios.
Args:
fn: Function to mark as a context provider
Returns:
The function with context metadata attached
"""from vedro import scenario, context, given, when, then, ensure
@scenario("Database operations with context")
def test_database_operations():
@context
def database_connection():
"""Provides a database connection for the test."""
conn = create_test_database_connection()
try:
yield conn
finally:
conn.close()
@context
def test_user(db_conn):
"""Provides a test user in the database."""
user = create_test_user(db_conn, {
"username": "testuser",
"email": "test@example.com"
})
return user
@given("database with test user")
def setup(database_connection, test_user):
return {
"connection": database_connection,
"user": test_user
}
@when("user data is updated")
def action(context):
updated_user = update_user(
context["connection"],
context["user"].id,
{"email": "updated@example.com"}
)
return updated_user
@then("update is persisted")
def verification(updated_user, context):
# Verify in database
db_user = get_user(context["connection"], updated_user.id)
ensure(db_user.email).equals("updated@example.com")Schedule cleanup actions to be executed after scenario or global completion.
def defer(fn: Callable, *args, **kwargs) -> None:
"""
Schedule a cleanup function to be called after the current scenario completes.
Args:
fn: Function to call for cleanup
*args: Positional arguments to pass to the cleanup function
**kwargs: Keyword arguments to pass to the cleanup function
"""
def defer_global(fn: Callable, *args, **kwargs) -> None:
"""
Schedule a cleanup function to be called after all tests complete.
Args:
fn: Function to call for cleanup
*args: Positional arguments to pass to the cleanup function
**kwargs: Keyword arguments to pass to the cleanup function
"""from vedro import scenario, given, when, then, defer, ensure
import tempfile
import os
@scenario("File operations with cleanup")
def test_file_operations():
@given("temporary files")
def setup():
# Create temporary files for testing
temp_files = []
for i in range(3):
fd, filepath = tempfile.mkstemp(suffix=f"_test_{i}.txt")
os.close(fd) # Close file descriptor
# Write test content
with open(filepath, 'w') as f:
f.write(f"Test content {i}")
temp_files.append(filepath)
# Schedule cleanup for each file
defer(os.unlink, filepath)
return {"temp_files": temp_files}
@when("files are processed")
def action(context):
results = []
for filepath in context["temp_files"]:
# Read and process file
with open(filepath, 'r') as f:
content = f.read()
processed_content = content.upper()
# Create output file
output_file = filepath + ".processed"
with open(output_file, 'w') as f:
f.write(processed_content)
# Schedule cleanup for output file too
defer(os.unlink, output_file)
results.append({
"input": filepath,
"output": output_file,
"content": processed_content
})
return results
@then("processing completes successfully")
def verification(results):
ensure(len(results)).equals(3)
for result in results:
# Verify files exist during test
ensure(os.path.exists(result["input"])).is_true()
ensure(os.path.exists(result["output"])).is_true()
# Verify content
ensure(result["content"]).contains("TEST CONTENT")
# Files will be cleaned up automatically after scenario endsfrom vedro import scenario, defer_global, given, when, then, ensure
import subprocess
import signal
import time
@scenario("Service lifecycle management")
def test_service_lifecycle():
@given("test service is started")
def setup():
# Start a test service process
service_process = subprocess.Popen([
"python", "-m", "http.server", "8999"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Schedule global cleanup to stop the service
# This will run after ALL tests complete
defer_global(terminate_process_safely, service_process)
# Wait for service to start
time.sleep(2)
return {"service_process": service_process}
@when("service is accessed")
def action(context):
import requests
# Access the test service
response = requests.get("http://localhost:8999")
return {
"response": response,
"service_pid": context["service_process"].pid
}
@then("service responds correctly")
def verification(result):
ensure(result["response"].status_code).equals(200)
ensure(result["service_pid"]).is_greater_than(0)
# Service will continue running for other tests
# and be cleaned up globally at the end
def terminate_process_safely(process):
"""Helper function for safe process termination."""
try:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()Combine context providers with deferred cleanup for robust resource management.
@scenario("Complex resource management")
def test_complex_resources():
@context
def database_pool():
"""Provides a database connection pool."""
pool = create_connection_pool(
host="localhost",
database="test_db",
min_connections=2,
max_connections=10
)
# Schedule cleanup
defer(pool.close_all_connections)
return pool
@context
def cache_client():
"""Provides a cache client (Redis, Memcached, etc.)."""
client = create_cache_client("localhost:6379")
# Clean up cache data and close connection
defer(client.flushdb) # Clear test data
defer(client.close) # Close connection
return client
@context
def message_queue():
"""Provides a message queue for testing."""
queue = create_message_queue("test_queue")
# Clean up queue
defer(queue.purge)
defer(queue.close)
return queue
@given("all services are available")
def setup(database_pool, cache_client, message_queue):
# Verify all resources are ready
db_conn = database_pool.get_connection()
ensure(db_conn.is_connected()).is_true()
database_pool.return_connection(db_conn)
cache_client.set("health_check", "ok")
ensure(cache_client.get("health_check")).equals("ok")
message_queue.publish("health_check", {"status": "ready"})
return {
"db_pool": database_pool,
"cache": cache_client,
"queue": message_queue
}
@when("complex operation is performed")
def action(context):
# Use all resources in a coordinated operation
db_conn = context["db_pool"].get_connection()
try:
# Database operation
user_data = {"id": 123, "name": "Test User", "email": "test@example.com"}
create_user(db_conn, user_data)
# Cache operation
context["cache"].set(f"user:{user_data['id']}", json.dumps(user_data))
# Queue operation
context["queue"].publish("user_created", user_data)
return user_data
finally:
context["db_pool"].return_connection(db_conn)
@then("operation completes successfully across all services")
def verification(result, context):
# Verify database
db_conn = context["db_pool"].get_connection()
try:
user = get_user(db_conn, result["id"])
ensure(user.name).equals("Test User")
finally:
context["db_pool"].return_connection(db_conn)
# Verify cache
cached_data = context["cache"].get(f"user:{result['id']}")
ensure(cached_data).is_not_none()
cached_user = json.loads(cached_data)
ensure(cached_user["name"]).equals("Test User")
# Verify queue (check message was processed)
messages = context["queue"].get_recent_messages("user_created")
ensure(len(messages)).is_greater_than(0)
# All cleanup will happen automatically via deferred functionsCreate hierarchical context providers for complex setups:
@scenario("Nested context management")
def test_nested_contexts():
@context
def test_environment():
"""Top-level environment setup."""
env = {
"name": "test",
"isolated": True,
"resources": []
}
# Global environment cleanup
defer_global(cleanup_test_environment, env)
return env
@context
def application_server(test_environment):
"""Application server within the test environment."""
server_config = {
"host": "localhost",
"port": 8000,
"environment": test_environment["name"]
}
server = start_application_server(server_config)
test_environment["resources"].append(server)
# Server-specific cleanup
defer(stop_application_server, server)
return server
@context
def test_client(application_server):
"""Test client connected to the application server."""
client = create_test_client(
base_url=f"http://{application_server.host}:{application_server.port}"
)
# Client cleanup
defer(client.close)
return client
@given("fully configured test environment")
def setup(test_environment, application_server, test_client):
# Wait for everything to be ready
ensure(application_server.is_healthy()).is_true()
ensure(test_client.can_connect()).is_true()
return {
"environment": test_environment,
"server": application_server,
"client": test_client
}
@when("application is tested")
def action(context):
# Perform application tests using the client
response = context["client"].get("/api/health")
return {"health_response": response}
@then("application responds correctly")
def verification(result):
ensure(result["health_response"].status_code).equals(200)
ensure(result["health_response"].json()["status"]).equals("healthy")
def cleanup_test_environment(env):
"""Clean up test environment resources."""
for resource in env["resources"]:
try:
resource.cleanup()
except Exception as e:
print(f"Warning: Failed to cleanup resource {resource}: {e}")Perform cleanup only under certain conditions:
@scenario("Conditional resource cleanup")
def test_conditional_cleanup():
@given("conditional resources")
def setup():
# Create resources based on conditions
resources = []
if os.environ.get("CREATE_DATABASE"):
db = create_test_database()
resources.append(("database", db))
# Only clean up database if we created it
defer(cleanup_database, db)
if os.environ.get("START_SERVICES"):
services = start_test_services()
resources.append(("services", services))
# Conditional cleanup based on success
def conditional_service_cleanup():
if hasattr(services, 'failed') and services.failed:
# Keep services running for debugging if they failed
print("Keeping failed services for debugging")
else:
stop_test_services(services)
defer(conditional_service_cleanup)
return {"resources": resources}
@when("tests run with available resources")
def action(context):
results = {}
for resource_type, resource in context["resources"]:
if resource_type == "database":
results["db_test"] = test_database_operations(resource)
elif resource_type == "services":
results["service_test"] = test_service_operations(resource)
return results
@then("tests complete successfully")
def verification(results):
for test_name, result in results.items():
ensure(result.success).is_true()
# Mark services as successful to allow normal cleanup
if "service" in test_name and hasattr(result, 'service_ref'):
delattr(result.service_ref, 'failed')Handle cleanup failures gracefully:
def safe_cleanup(cleanup_func, *args, **kwargs):
"""Wrapper for safe cleanup that logs but doesn't fail."""
try:
cleanup_func(*args, **kwargs)
except Exception as e:
import logging
logging.warning(f"Cleanup failed for {cleanup_func.__name__}: {e}")
@scenario("Robust cleanup handling")
def test_robust_cleanup():
@given("resources with potential cleanup issues")
def setup():
# Create multiple resources, some may fail to clean up
resources = []
for i in range(3):
resource = create_test_resource(f"resource_{i}")
resources.append(resource)
# Use safe cleanup wrapper
defer(safe_cleanup, cleanup_resource, resource)
# Also schedule a critical cleanup that must succeed
critical_resource = create_critical_resource()
def critical_cleanup():
try:
cleanup_critical_resource(critical_resource)
except Exception as e:
# Log error and try alternative cleanup
logging.error(f"Critical cleanup failed: {e}")
alternative_cleanup(critical_resource)
defer(critical_cleanup)
return {"resources": resources, "critical": critical_resource}
@when("operations are performed")
def action(context):
# Use all resources
results = []
for resource in context["resources"]:
result = use_resource(resource)
results.append(result)
critical_result = use_critical_resource(context["critical"])
return {"regular_results": results, "critical_result": critical_result}
@then("operations succeed despite potential cleanup issues")
def verification(results):
# Test should pass even if some cleanup fails
ensure(len(results["regular_results"])).equals(3)
ensure(results["critical_result"].success).is_true()
# Cleanup errors will be logged but won't fail the testdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10