CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vedro

Pragmatic Testing Framework for Python with BDD-style syntax and pluggable architecture

49

1.08x
Quality

Pending

Does it follow best practices?

Impact

49%

1.08x

Average score across 10 eval scenarios

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

context-cleanup.mddocs/

Context Management and Cleanup

Context providers and deferred cleanup actions for managing test state and resources.

Capabilities

Context Decorator

Mark functions as context providers for enhanced test organization and state management.

def context(fn: Callable) -> Callable:
    """
    Decorator to mark functions as context providers.
    
    Context functions provide setup, state, or resources that can be
    shared across test steps or scenarios.
    
    Args:
        fn: Function to mark as a context provider
        
    Returns:
        The function with context metadata attached
    """

Usage Example

from vedro import scenario, context, given, when, then, ensure

@scenario("Database operations with context")
def test_database_operations():
    
    @context
    def database_connection():
        """Provides a database connection for the test."""
        conn = create_test_database_connection()
        try:
            yield conn
        finally:
            conn.close()
    
    @context 
    def test_user(db_conn):
        """Provides a test user in the database."""
        user = create_test_user(db_conn, {
            "username": "testuser",
            "email": "test@example.com"
        })
        return user
    
    @given("database with test user")
    def setup(database_connection, test_user):
        return {
            "connection": database_connection,
            "user": test_user
        }
    
    @when("user data is updated")
    def action(context):
        updated_user = update_user(
            context["connection"],
            context["user"].id,
            {"email": "updated@example.com"}
        )
        return updated_user
    
    @then("update is persisted")
    def verification(updated_user, context):
        # Verify in database
        db_user = get_user(context["connection"], updated_user.id)
        ensure(db_user.email).equals("updated@example.com")

Deferred Cleanup

Schedule cleanup actions to be executed after scenario or global completion.

def defer(fn: Callable, *args, **kwargs) -> None:
    """
    Schedule a cleanup function to be called after the current scenario completes.
    
    Args:
        fn: Function to call for cleanup
        *args: Positional arguments to pass to the cleanup function
        **kwargs: Keyword arguments to pass to the cleanup function
    """

def defer_global(fn: Callable, *args, **kwargs) -> None:
    """
    Schedule a cleanup function to be called after all tests complete.
    
    Args:
        fn: Function to call for cleanup
        *args: Positional arguments to pass to the cleanup function  
        **kwargs: Keyword arguments to pass to the cleanup function
    """

Usage Example - Scenario Cleanup

from vedro import scenario, given, when, then, defer, ensure
import tempfile
import os

@scenario("File operations with cleanup")
def test_file_operations():
    
    @given("temporary files")
    def setup():
        # Create temporary files for testing
        temp_files = []
        
        for i in range(3):
            fd, filepath = tempfile.mkstemp(suffix=f"_test_{i}.txt")
            os.close(fd)  # Close file descriptor
            
            # Write test content
            with open(filepath, 'w') as f:
                f.write(f"Test content {i}")
            
            temp_files.append(filepath)
            
            # Schedule cleanup for each file
            defer(os.unlink, filepath)
        
        return {"temp_files": temp_files}
    
    @when("files are processed")
    def action(context):
        results = []
        
        for filepath in context["temp_files"]:
            # Read and process file
            with open(filepath, 'r') as f:
                content = f.read()
            
            processed_content = content.upper()
            
            # Create output file
            output_file = filepath + ".processed"
            with open(output_file, 'w') as f:
                f.write(processed_content)
            
            # Schedule cleanup for output file too
            defer(os.unlink, output_file)
            
            results.append({
                "input": filepath,
                "output": output_file,
                "content": processed_content
            })
        
        return results
    
    @then("processing completes successfully")
    def verification(results):
        ensure(len(results)).equals(3)
        
        for result in results:
            # Verify files exist during test
            ensure(os.path.exists(result["input"])).is_true()
            ensure(os.path.exists(result["output"])).is_true()
            
            # Verify content
            ensure(result["content"]).contains("TEST CONTENT")
        
        # Files will be cleaned up automatically after scenario ends

Usage Example - Global Cleanup

from vedro import scenario, defer_global, given, when, then, ensure
import subprocess
import signal
import time

@scenario("Service lifecycle management")
def test_service_lifecycle():
    
    @given("test service is started")
    def setup():
        # Start a test service process
        service_process = subprocess.Popen([
            "python", "-m", "http.server", "8999"
        ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        # Schedule global cleanup to stop the service
        # This will run after ALL tests complete
        defer_global(terminate_process_safely, service_process)
        
        # Wait for service to start
        time.sleep(2)
        
        return {"service_process": service_process}
    
    @when("service is accessed")
    def action(context):
        import requests
        
        # Access the test service
        response = requests.get("http://localhost:8999")
        
        return {
            "response": response,
            "service_pid": context["service_process"].pid
        }
    
    @then("service responds correctly")
    def verification(result):
        ensure(result["response"].status_code).equals(200)
        ensure(result["service_pid"]).is_greater_than(0)
        
        # Service will continue running for other tests
        # and be cleaned up globally at the end

def terminate_process_safely(process):
    """Helper function for safe process termination."""
    try:
        process.terminate()
        process.wait(timeout=5)
    except subprocess.TimeoutExpired:
        process.kill()
        process.wait()

Resource Management Patterns

Combine context providers with deferred cleanup for robust resource management.

@scenario("Complex resource management")
def test_complex_resources():
    
    @context
    def database_pool():
        """Provides a database connection pool."""
        pool = create_connection_pool(
            host="localhost",
            database="test_db",
            min_connections=2,
            max_connections=10
        )
        
        # Schedule cleanup
        defer(pool.close_all_connections)
        
        return pool
    
    @context
    def cache_client():
        """Provides a cache client (Redis, Memcached, etc.)."""
        client = create_cache_client("localhost:6379")
        
        # Clean up cache data and close connection
        defer(client.flushdb)  # Clear test data
        defer(client.close)    # Close connection
        
        return client
    
    @context
    def message_queue():
        """Provides a message queue for testing."""
        queue = create_message_queue("test_queue")
        
        # Clean up queue
        defer(queue.purge)
        defer(queue.close)
        
        return queue
    
    @given("all services are available")
    def setup(database_pool, cache_client, message_queue):
        # Verify all resources are ready
        db_conn = database_pool.get_connection()
        ensure(db_conn.is_connected()).is_true()
        database_pool.return_connection(db_conn)
        
        cache_client.set("health_check", "ok")
        ensure(cache_client.get("health_check")).equals("ok")
        
        message_queue.publish("health_check", {"status": "ready"})
        
        return {
            "db_pool": database_pool,
            "cache": cache_client,
            "queue": message_queue
        }
    
    @when("complex operation is performed")
    def action(context):
        # Use all resources in a coordinated operation
        db_conn = context["db_pool"].get_connection()
        
        try:
            # Database operation
            user_data = {"id": 123, "name": "Test User", "email": "test@example.com"}
            create_user(db_conn, user_data)
            
            # Cache operation
            context["cache"].set(f"user:{user_data['id']}", json.dumps(user_data))
            
            # Queue operation
            context["queue"].publish("user_created", user_data)
            
            return user_data
            
        finally:
            context["db_pool"].return_connection(db_conn)
    
    @then("operation completes successfully across all services")
    def verification(result, context):
        # Verify database
        db_conn = context["db_pool"].get_connection()
        try:
            user = get_user(db_conn, result["id"])
            ensure(user.name).equals("Test User")
        finally:
            context["db_pool"].return_connection(db_conn)
        
        # Verify cache
        cached_data = context["cache"].get(f"user:{result['id']}")
        ensure(cached_data).is_not_none()
        cached_user = json.loads(cached_data)
        ensure(cached_user["name"]).equals("Test User")
        
        # Verify queue (check message was processed)
        messages = context["queue"].get_recent_messages("user_created")
        ensure(len(messages)).is_greater_than(0)
        
        # All cleanup will happen automatically via deferred functions

Advanced Patterns

Nested Context Management

Create hierarchical context providers for complex setups:

@scenario("Nested context management")
def test_nested_contexts():
    
    @context
    def test_environment():
        """Top-level environment setup."""
        env = {
            "name": "test",
            "isolated": True,
            "resources": []
        }
        
        # Global environment cleanup
        defer_global(cleanup_test_environment, env)
        
        return env
    
    @context
    def application_server(test_environment):
        """Application server within the test environment."""
        server_config = {
            "host": "localhost",
            "port": 8000,
            "environment": test_environment["name"]
        }
        
        server = start_application_server(server_config)
        test_environment["resources"].append(server)
        
        # Server-specific cleanup
        defer(stop_application_server, server)
        
        return server
    
    @context
    def test_client(application_server):
        """Test client connected to the application server."""
        client = create_test_client(
            base_url=f"http://{application_server.host}:{application_server.port}"
        )
        
        # Client cleanup
        defer(client.close)
        
        return client
    
    @given("fully configured test environment")
    def setup(test_environment, application_server, test_client):
        # Wait for everything to be ready
        ensure(application_server.is_healthy()).is_true()
        ensure(test_client.can_connect()).is_true()
        
        return {
            "environment": test_environment,
            "server": application_server, 
            "client": test_client
        }
    
    @when("application is tested")
    def action(context):
        # Perform application tests using the client
        response = context["client"].get("/api/health")
        
        return {"health_response": response}
    
    @then("application responds correctly")
    def verification(result):
        ensure(result["health_response"].status_code).equals(200)
        ensure(result["health_response"].json()["status"]).equals("healthy")

def cleanup_test_environment(env):
    """Clean up test environment resources."""
    for resource in env["resources"]:
        try:
            resource.cleanup()
        except Exception as e:
            print(f"Warning: Failed to cleanup resource {resource}: {e}")

Conditional Cleanup

Perform cleanup only under certain conditions:

@scenario("Conditional resource cleanup")
def test_conditional_cleanup():
    
    @given("conditional resources")
    def setup():
        # Create resources based on conditions
        resources = []
        
        if os.environ.get("CREATE_DATABASE"):
            db = create_test_database()
            resources.append(("database", db))
            
            # Only clean up database if we created it
            defer(cleanup_database, db)
        
        if os.environ.get("START_SERVICES"):
            services = start_test_services()
            resources.append(("services", services))
            
            # Conditional cleanup based on success
            def conditional_service_cleanup():
                if hasattr(services, 'failed') and services.failed:
                    # Keep services running for debugging if they failed
                    print("Keeping failed services for debugging")
                else:
                    stop_test_services(services)
            
            defer(conditional_service_cleanup)
        
        return {"resources": resources}
    
    @when("tests run with available resources")
    def action(context):
        results = {}
        
        for resource_type, resource in context["resources"]:
            if resource_type == "database":
                results["db_test"] = test_database_operations(resource)
            elif resource_type == "services":
                results["service_test"] = test_service_operations(resource)
        
        return results
    
    @then("tests complete successfully")
    def verification(results):
        for test_name, result in results.items():
            ensure(result.success).is_true()
            
            # Mark services as successful to allow normal cleanup
            if "service" in test_name and hasattr(result, 'service_ref'):
                delattr(result.service_ref, 'failed')

Cleanup Error Handling

Handle cleanup failures gracefully:

def safe_cleanup(cleanup_func, *args, **kwargs):
    """Wrapper for safe cleanup that logs but doesn't fail."""
    try:
        cleanup_func(*args, **kwargs)
    except Exception as e:
        import logging
        logging.warning(f"Cleanup failed for {cleanup_func.__name__}: {e}")

@scenario("Robust cleanup handling")
def test_robust_cleanup():
    
    @given("resources with potential cleanup issues")
    def setup():
        # Create multiple resources, some may fail to clean up
        resources = []
        
        for i in range(3):
            resource = create_test_resource(f"resource_{i}")
            resources.append(resource)
            
            # Use safe cleanup wrapper
            defer(safe_cleanup, cleanup_resource, resource)
        
        # Also schedule a critical cleanup that must succeed
        critical_resource = create_critical_resource()
        
        def critical_cleanup():
            try:
                cleanup_critical_resource(critical_resource)
            except Exception as e:
                # Log error and try alternative cleanup
                logging.error(f"Critical cleanup failed: {e}")
                alternative_cleanup(critical_resource)
        
        defer(critical_cleanup)
        
        return {"resources": resources, "critical": critical_resource}
    
    @when("operations are performed")
    def action(context):
        # Use all resources
        results = []
        
        for resource in context["resources"]:
            result = use_resource(resource)
            results.append(result)
        
        critical_result = use_critical_resource(context["critical"])
        
        return {"regular_results": results, "critical_result": critical_result}
    
    @then("operations succeed despite potential cleanup issues")
    def verification(results):
        # Test should pass even if some cleanup fails
        ensure(len(results["regular_results"])).equals(3)
        ensure(results["critical_result"].success).is_true()
        
        # Cleanup errors will be logged but won't fail the test

docs

artifacts-files.md

assertions.md

cli.md

configuration.md

context-cleanup.md

events.md

execution-control.md

index.md

parameterization.md

test-definition.md

tile.json