ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Tools for testing ASGI applications, providing controlled communication channels and application lifecycle management. These utilities enable comprehensive testing of ASGI applications without requiring actual server infrastructure.
Test runner that provides controlled communication with ASGI applications, enabling testing of request/response cycles and connection handling.
class ApplicationCommunicator:
"""Test runner for ASGI applications."""
def __init__(self, application, scope):
"""
Initialize test communicator with application and scope.
Parameters:
- application: callable, ASGI application to test
- scope: dict, ASGI scope for the test connection
"""
async def wait(self, timeout=1):
"""
Wait for application to complete execution.
Parameters:
- timeout: float, maximum time to wait in seconds (default 1)
Returns:
None
Raises:
asyncio.TimeoutError: If application doesn't complete within timeout
"""
async def stop(self, exceptions=True):
"""
Stop the running application.
Parameters:
- exceptions: bool, whether to raise exceptions if application failed (default True)
Returns:
None
"""
async def send_input(self, message):
"""
Send message to the application's receive channel.
Parameters:
- message: dict, ASGI message to send to application
Returns:
None
"""
async def receive_output(self, timeout=1):
"""
Receive message from application's send channel.
Parameters:
- timeout: float, maximum time to wait for message in seconds (default 1)
Returns:
dict: ASGI message sent by the application
Raises:
asyncio.TimeoutError: If no message received within timeout
"""
async def receive_nothing(self, timeout=0.1, interval=0.01):
"""
Verify that no messages are pending from the application.
Parameters:
- timeout: float, time to wait for messages (default 0.1)
- interval: float, polling interval in seconds (default 0.01)
Returns:
None
Raises:
AssertionError: If unexpected messages are received
"""
input_queue: asyncio.Queue # Input queue for messages to application
output_queue: asyncio.Queue # Output queue for messages from application
future: asyncio.Future # Future representing the running applicationfrom asgiref.testing import ApplicationCommunicator
import asyncio
async def simple_http_app(scope, receive, send):
"""Simple HTTP application for testing."""
assert scope['type'] == 'http'
# Read request body
body = b''
while True:
message = await receive()
body += message.get('body', b'')
if not message.get('more_body', False):
break
# Send response
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': f'Echo: {body.decode()}'.encode(),
})
async def test_http_application():
"""Test HTTP application with ApplicationCommunicator."""
scope = {
'type': 'http',
'method': 'POST',
'path': '/echo',
'headers': [[b'content-type', b'text/plain']],
}
communicator = ApplicationCommunicator(simple_http_app, scope)
try:
# Send request body
await communicator.send_input({
'type': 'http.request',
'body': b'Hello, World!',
})
# Receive response start
response_start = await communicator.receive_output(timeout=1)
assert response_start['type'] == 'http.response.start'
assert response_start['status'] == 200
# Receive response body
response_body = await communicator.receive_output(timeout=1)
assert response_body['type'] == 'http.response.body'
assert response_body['body'] == b'Echo: Hello, World!'
# Verify no more messages
await communicator.receive_nothing()
print("HTTP test passed!")
finally:
await communicator.stop()
# asyncio.run(test_http_application())from asgiref.testing import ApplicationCommunicator
import asyncio
async def echo_websocket_app(scope, receive, send):
"""WebSocket echo application for testing."""
assert scope['type'] == 'websocket'
# Wait for connection
message = await receive()
assert message['type'] == 'websocket.connect'
# Accept connection
await send({'type': 'websocket.accept'})
# Echo messages until disconnect
while True:
message = await receive()
if message['type'] == 'websocket.disconnect':
break
elif message['type'] == 'websocket.receive':
if 'text' in message:
await send({
'type': 'websocket.send',
'text': f"Echo: {message['text']}"
})
async def test_websocket_application():
"""Test WebSocket application."""
scope = {
'type': 'websocket',
'path': '/ws',
}
communicator = ApplicationCommunicator(echo_websocket_app, scope)
try:
# Send connect event
await communicator.send_input({'type': 'websocket.connect'})
# Receive accept response
accept_message = await communicator.receive_output()
assert accept_message['type'] == 'websocket.accept'
# Send text message
await communicator.send_input({
'type': 'websocket.receive',
'text': 'Hello WebSocket!'
})
# Receive echo response
echo_message = await communicator.receive_output()
assert echo_message['type'] == 'websocket.send'
assert echo_message['text'] == 'Echo: Hello WebSocket!'
# Send disconnect
await communicator.send_input({
'type': 'websocket.disconnect',
'code': 1000
})
# Wait for application to complete
await communicator.wait()
print("WebSocket test passed!")
finally:
await communicator.stop()
# asyncio.run(test_websocket_application())from asgiref.testing import ApplicationCommunicator
import asyncio
async def lifespan_app(scope, receive, send):
"""Application with lifespan events."""
if scope['type'] == 'lifespan':
# Handle lifespan protocol
while True:
message = await receive()
if message['type'] == 'lifespan.startup':
try:
# Simulate startup tasks
await asyncio.sleep(0.1)
await send({'type': 'lifespan.startup.complete'})
except Exception:
await send({'type': 'lifespan.startup.failed'})
elif message['type'] == 'lifespan.shutdown':
try:
# Simulate cleanup tasks
await asyncio.sleep(0.1)
await send({'type': 'lifespan.shutdown.complete'})
except Exception:
await send({'type': 'lifespan.shutdown.failed'})
break
async def test_lifespan_application():
"""Test application lifespan events."""
scope = {'type': 'lifespan'}
communicator = ApplicationCommunicator(lifespan_app, scope)
try:
# Send startup event
await communicator.send_input({'type': 'lifespan.startup'})
# Receive startup complete
startup_response = await communicator.receive_output()
assert startup_response['type'] == 'lifespan.startup.complete'
# Send shutdown event
await communicator.send_input({'type': 'lifespan.shutdown'})
# Receive shutdown complete
shutdown_response = await communicator.receive_output()
assert shutdown_response['type'] == 'lifespan.shutdown.complete'
# Wait for application to complete
await communicator.wait()
print("Lifespan test passed!")
finally:
await communicator.stop()
# asyncio.run(test_lifespan_application())from asgiref.testing import ApplicationCommunicator
import asyncio
async def error_app(scope, receive, send):
"""Application that demonstrates error handling."""
if scope['path'] == '/error':
raise ValueError("Intentional error for testing")
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Success',
})
async def test_error_handling():
"""Test error handling in applications."""
# Test successful request
scope = {
'type': 'http',
'method': 'GET',
'path': '/success',
}
communicator = ApplicationCommunicator(error_app, scope)
try:
await communicator.send_input({'type': 'http.request', 'body': b''})
response_start = await communicator.receive_output()
assert response_start['status'] == 200
response_body = await communicator.receive_output()
assert response_body['body'] == b'Success'
await communicator.wait()
print("Success case passed!")
finally:
await communicator.stop()
# Test error case
error_scope = {
'type': 'http',
'method': 'GET',
'path': '/error',
}
error_communicator = ApplicationCommunicator(error_app, error_scope)
try:
await error_communicator.send_input({'type': 'http.request', 'body': b''})
# Application should fail
await error_communicator.wait(timeout=1)
except Exception as e:
print(f"Expected error caught: {e}")
finally:
await error_communicator.stop(exceptions=False)
# asyncio.run(test_error_handling())from asgiref.testing import ApplicationCommunicator
import asyncio
class TestMiddleware:
"""Middleware for testing."""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# Add custom header to responses
async def send_wrapper(message):
if message['type'] == 'http.response.start':
headers = list(message.get('headers', []))
headers.append([b'x-test-middleware', b'active'])
message = {**message, 'headers': headers}
await send(message)
await self.app(scope, receive, send_wrapper)
async def base_app(scope, receive, send):
"""Base application for middleware testing."""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Base response',
})
async def test_middleware():
"""Test middleware functionality."""
# Wrap application with middleware
app_with_middleware = TestMiddleware(base_app)
scope = {
'type': 'http',
'method': 'GET',
'path': '/',
}
communicator = ApplicationCommunicator(app_with_middleware, scope)
try:
await communicator.send_input({'type': 'http.request', 'body': b''})
response_start = await communicator.receive_output()
assert response_start['type'] == 'http.response.start'
# Check that middleware added the header
headers = dict(response_start['headers'])
assert headers[b'x-test-middleware'] == b'active'
response_body = await communicator.receive_output()
assert response_body['body'] == b'Base response'
await communicator.wait()
print("Middleware test passed!")
finally:
await communicator.stop()
# asyncio.run(test_middleware())from asgiref.testing import ApplicationCommunicator
import asyncio
import pytest
class ASGITestSuite:
"""Reusable test suite for ASGI applications."""
def __init__(self, application):
self.application = application
async def test_http_get(self, path='/', expected_status=200):
"""Test HTTP GET request."""
scope = {
'type': 'http',
'method': 'GET',
'path': path,
}
communicator = ApplicationCommunicator(self.application, scope)
try:
await communicator.send_input({'type': 'http.request', 'body': b''})
response_start = await communicator.receive_output()
assert response_start['status'] == expected_status
response_body = await communicator.receive_output()
return response_body['body']
finally:
await communicator.stop()
async def test_websocket_echo(self, path='/ws'):
"""Test WebSocket echo functionality."""
scope = {
'type': 'websocket',
'path': path,
}
communicator = ApplicationCommunicator(self.application, scope)
try:
# Connect
await communicator.send_input({'type': 'websocket.connect'})
accept_msg = await communicator.receive_output()
assert accept_msg['type'] == 'websocket.accept'
# Send and receive message
test_message = "Test message"
await communicator.send_input({
'type': 'websocket.receive',
'text': test_message
})
echo_msg = await communicator.receive_output()
return echo_msg.get('text', '')
finally:
await communicator.send_input({
'type': 'websocket.disconnect',
'code': 1000
})
await communicator.stop()
# Usage with any ASGI application
async def run_test_suite():
"""Run comprehensive test suite."""
test_suite = ASGITestSuite(simple_http_app)
# Test HTTP endpoints
response = await test_suite.test_http_get('/')
print(f"HTTP response: {response}")
# Additional tests...
print("All tests completed!")
# asyncio.run(run_test_suite())The ApplicationCommunicator provides:
Install with Tessl CLI
npx tessl i tessl/pypi-asgiref