CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asgiref

ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

testing.mddocs/

Testing Utilities

Tools for testing ASGI applications, providing controlled communication channels and application lifecycle management. These utilities enable comprehensive testing of ASGI applications without requiring actual server infrastructure.

Capabilities

Application Communicator

Test runner that provides controlled communication with ASGI applications, enabling testing of request/response cycles and connection handling.

class ApplicationCommunicator:
    """Test runner for ASGI applications."""
    
    def __init__(self, application, scope):
        """
        Initialize test communicator with application and scope.
        
        Parameters:
        - application: callable, ASGI application to test
        - scope: dict, ASGI scope for the test connection
        """
    
    async def wait(self, timeout=1):
        """
        Wait for application to complete execution.
        
        Parameters:
        - timeout: float, maximum time to wait in seconds (default 1)
        
        Returns:
        None
        
        Raises:
        asyncio.TimeoutError: If application doesn't complete within timeout
        """
    
    async def stop(self, exceptions=True):
        """
        Stop the running application.
        
        Parameters:
        - exceptions: bool, whether to raise exceptions if application failed (default True)
        
        Returns:
        None
        """
    
    async def send_input(self, message):
        """
        Send message to the application's receive channel.
        
        Parameters:
        - message: dict, ASGI message to send to application
        
        Returns:
        None
        """
    
    async def receive_output(self, timeout=1):
        """
        Receive message from application's send channel.
        
        Parameters:
        - timeout: float, maximum time to wait for message in seconds (default 1)
        
        Returns:
        dict: ASGI message sent by the application
        
        Raises:
        asyncio.TimeoutError: If no message received within timeout
        """
    
    async def receive_nothing(self, timeout=0.1, interval=0.01):
        """
        Verify that no messages are pending from the application.
        
        Parameters:
        - timeout: float, time to wait for messages (default 0.1)
        - interval: float, polling interval in seconds (default 0.01)
        
        Returns:
        None
        
        Raises:
        AssertionError: If unexpected messages are received
        """
    
    input_queue: asyncio.Queue    # Input queue for messages to application
    output_queue: asyncio.Queue   # Output queue for messages from application  
    future: asyncio.Future        # Future representing the running application

Usage Examples

Testing HTTP Applications

from asgiref.testing import ApplicationCommunicator
import asyncio

async def simple_http_app(scope, receive, send):
    """Simple HTTP application for testing."""
    assert scope['type'] == 'http'
    
    # Read request body
    body = b''
    while True:
        message = await receive()
        body += message.get('body', b'')
        if not message.get('more_body', False):
            break
    
    # Send response
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [[b'content-type', b'text/plain']],
    })
    await send({
        'type': 'http.response.body',
        'body': f'Echo: {body.decode()}'.encode(),
    })

async def test_http_application():
    """Test HTTP application with ApplicationCommunicator."""
    scope = {
        'type': 'http',
        'method': 'POST',
        'path': '/echo',
        'headers': [[b'content-type', b'text/plain']],
    }
    
    communicator = ApplicationCommunicator(simple_http_app, scope)
    
    try:
        # Send request body
        await communicator.send_input({
            'type': 'http.request',
            'body': b'Hello, World!',
        })
        
        # Receive response start
        response_start = await communicator.receive_output(timeout=1)
        assert response_start['type'] == 'http.response.start'
        assert response_start['status'] == 200
        
        # Receive response body
        response_body = await communicator.receive_output(timeout=1)
        assert response_body['type'] == 'http.response.body'
        assert response_body['body'] == b'Echo: Hello, World!'
        
        # Verify no more messages
        await communicator.receive_nothing()
        
        print("HTTP test passed!")
        
    finally:
        await communicator.stop()

# asyncio.run(test_http_application())

Testing WebSocket Applications

from asgiref.testing import ApplicationCommunicator
import asyncio

async def echo_websocket_app(scope, receive, send):
    """WebSocket echo application for testing."""
    assert scope['type'] == 'websocket'
    
    # Wait for connection
    message = await receive()
    assert message['type'] == 'websocket.connect'
    
    # Accept connection
    await send({'type': 'websocket.accept'})
    
    # Echo messages until disconnect
    while True:
        message = await receive()
        
        if message['type'] == 'websocket.disconnect':
            break
        elif message['type'] == 'websocket.receive':
            if 'text' in message:
                await send({
                    'type': 'websocket.send',
                    'text': f"Echo: {message['text']}"
                })

async def test_websocket_application():
    """Test WebSocket application."""
    scope = {
        'type': 'websocket',
        'path': '/ws',
    }
    
    communicator = ApplicationCommunicator(echo_websocket_app, scope)
    
    try:
        # Send connect event
        await communicator.send_input({'type': 'websocket.connect'})
        
        # Receive accept response
        accept_message = await communicator.receive_output()
        assert accept_message['type'] == 'websocket.accept'
        
        # Send text message
        await communicator.send_input({
            'type': 'websocket.receive',
            'text': 'Hello WebSocket!'
        })
        
        # Receive echo response
        echo_message = await communicator.receive_output()
        assert echo_message['type'] == 'websocket.send'
        assert echo_message['text'] == 'Echo: Hello WebSocket!'
        
        # Send disconnect
        await communicator.send_input({
            'type': 'websocket.disconnect',
            'code': 1000
        })
        
        # Wait for application to complete
        await communicator.wait()
        
        print("WebSocket test passed!")
        
    finally:
        await communicator.stop()

# asyncio.run(test_websocket_application())

Testing Application Lifecycle

from asgiref.testing import ApplicationCommunicator
import asyncio

async def lifespan_app(scope, receive, send):
    """Application with lifespan events."""
    if scope['type'] == 'lifespan':
        # Handle lifespan protocol
        while True:
            message = await receive()
            
            if message['type'] == 'lifespan.startup':
                try:
                    # Simulate startup tasks
                    await asyncio.sleep(0.1)
                    await send({'type': 'lifespan.startup.complete'})
                except Exception:
                    await send({'type': 'lifespan.startup.failed'})
                    
            elif message['type'] == 'lifespan.shutdown':
                try:
                    # Simulate cleanup tasks
                    await asyncio.sleep(0.1)
                    await send({'type': 'lifespan.shutdown.complete'})
                except Exception:
                    await send({'type': 'lifespan.shutdown.failed'})
                break

async def test_lifespan_application():
    """Test application lifespan events."""
    scope = {'type': 'lifespan'}
    
    communicator = ApplicationCommunicator(lifespan_app, scope)
    
    try:
        # Send startup event
        await communicator.send_input({'type': 'lifespan.startup'})
        
        # Receive startup complete
        startup_response = await communicator.receive_output()
        assert startup_response['type'] == 'lifespan.startup.complete'
        
        # Send shutdown event  
        await communicator.send_input({'type': 'lifespan.shutdown'})
        
        # Receive shutdown complete
        shutdown_response = await communicator.receive_output()
        assert shutdown_response['type'] == 'lifespan.shutdown.complete'
        
        # Wait for application to complete
        await communicator.wait()
        
        print("Lifespan test passed!")
        
    finally:
        await communicator.stop()

# asyncio.run(test_lifespan_application())

Testing Error Handling

from asgiref.testing import ApplicationCommunicator
import asyncio

async def error_app(scope, receive, send):
    """Application that demonstrates error handling."""
    if scope['path'] == '/error':
        raise ValueError("Intentional error for testing")
    
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [[b'content-type', b'text/plain']],
    })
    await send({
        'type': 'http.response.body',
        'body': b'Success',
    })

async def test_error_handling():
    """Test error handling in applications."""
    # Test successful request
    scope = {
        'type': 'http',
        'method': 'GET',
        'path': '/success',
    }
    
    communicator = ApplicationCommunicator(error_app, scope)
    
    try:
        await communicator.send_input({'type': 'http.request', 'body': b''})
        
        response_start = await communicator.receive_output()
        assert response_start['status'] == 200
        
        response_body = await communicator.receive_output()
        assert response_body['body'] == b'Success'
        
        await communicator.wait()
        print("Success case passed!")
        
    finally:
        await communicator.stop()
    
    # Test error case
    error_scope = {
        'type': 'http',
        'method': 'GET',
        'path': '/error',
    }
    
    error_communicator = ApplicationCommunicator(error_app, error_scope)
    
    try:
        await error_communicator.send_input({'type': 'http.request', 'body': b''})
        
        # Application should fail
        await error_communicator.wait(timeout=1)
        
    except Exception as e:
        print(f"Expected error caught: {e}")
        
    finally:
        await error_communicator.stop(exceptions=False)

# asyncio.run(test_error_handling())

Testing Middleware

from asgiref.testing import ApplicationCommunicator
import asyncio

class TestMiddleware:
    """Middleware for testing."""
    
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        # Add custom header to responses
        async def send_wrapper(message):
            if message['type'] == 'http.response.start':
                headers = list(message.get('headers', []))
                headers.append([b'x-test-middleware', b'active'])
                message = {**message, 'headers': headers}
            await send(message)
        
        await self.app(scope, receive, send_wrapper)

async def base_app(scope, receive, send):
    """Base application for middleware testing."""
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [[b'content-type', b'text/plain']],
    })
    await send({
        'type': 'http.response.body',
        'body': b'Base response',
    })

async def test_middleware():
    """Test middleware functionality."""
    # Wrap application with middleware
    app_with_middleware = TestMiddleware(base_app)
    
    scope = {
        'type': 'http',
        'method': 'GET',
        'path': '/',
    }
    
    communicator = ApplicationCommunicator(app_with_middleware, scope)
    
    try:
        await communicator.send_input({'type': 'http.request', 'body': b''})
        
        response_start = await communicator.receive_output()
        assert response_start['type'] == 'http.response.start'
        
        # Check that middleware added the header
        headers = dict(response_start['headers'])
        assert headers[b'x-test-middleware'] == b'active'
        
        response_body = await communicator.receive_output()
        assert response_body['body'] == b'Base response'
        
        await communicator.wait()
        print("Middleware test passed!")
        
    finally:
        await communicator.stop()

# asyncio.run(test_middleware())

Integration Test Suite

from asgiref.testing import ApplicationCommunicator
import asyncio
import pytest

class ASGITestSuite:
    """Reusable test suite for ASGI applications."""
    
    def __init__(self, application):
        self.application = application
    
    async def test_http_get(self, path='/', expected_status=200):
        """Test HTTP GET request."""
        scope = {
            'type': 'http',
            'method': 'GET',
            'path': path,
        }
        
        communicator = ApplicationCommunicator(self.application, scope)
        
        try:
            await communicator.send_input({'type': 'http.request', 'body': b''})
            
            response_start = await communicator.receive_output()
            assert response_start['status'] == expected_status
            
            response_body = await communicator.receive_output()
            return response_body['body']
            
        finally:
            await communicator.stop()
    
    async def test_websocket_echo(self, path='/ws'):
        """Test WebSocket echo functionality."""
        scope = {
            'type': 'websocket',
            'path': path,
        }
        
        communicator = ApplicationCommunicator(self.application, scope)
        
        try:
            # Connect
            await communicator.send_input({'type': 'websocket.connect'})
            accept_msg = await communicator.receive_output()
            assert accept_msg['type'] == 'websocket.accept'
            
            # Send and receive message
            test_message = "Test message"
            await communicator.send_input({
                'type': 'websocket.receive',
                'text': test_message
            })
            
            echo_msg = await communicator.receive_output()
            return echo_msg.get('text', '')
            
        finally:
            await communicator.send_input({
                'type': 'websocket.disconnect',
                'code': 1000
            })
            await communicator.stop()

# Usage with any ASGI application
async def run_test_suite():
    """Run comprehensive test suite."""
    test_suite = ASGITestSuite(simple_http_app)
    
    # Test HTTP endpoints
    response = await test_suite.test_http_get('/')
    print(f"HTTP response: {response}")
    
    # Additional tests...
    print("All tests completed!")

# asyncio.run(run_test_suite())

Key Testing Features

The ApplicationCommunicator provides:

  • Controlled Communication: Direct access to application's receive/send channels
  • Timeout Management: Configurable timeouts for all operations
  • Error Handling: Clean exception handling and application lifecycle management
  • Message Verification: Tools to verify expected message patterns
  • Async/Await Support: Full asyncio integration for modern testing patterns

Install with Tessl CLI

npx tessl i tessl/pypi-asgiref

docs

compatibility.md

current-thread-executor.md

index.md

local-storage.md

server-base.md

sync-async.md

testing.md

timeout.md

type-definitions.md

wsgi-integration.md

tile.json