Async http client/server framework (asyncio)
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive testing infrastructure for aiohttp applications including test servers, test clients, pytest integration, and utilities for both client and server testing. Provides complete testing capabilities with proper async support and resource management.
Test server implementations for testing web applications and request handlers in controlled environments.
class BaseTestServer:
def __init__(self, *, scheme='http', host='127.0.0.1', port=None, **kwargs):
"""
Base test server.
Parameters:
- scheme (str): URL scheme (http/https)
- host (str): Server host
- port (int): Server port (auto-assigned if None)
"""
async def start_server(self, **kwargs):
"""Start test server."""
async def close(self):
"""Close test server."""
@property
def host(self):
"""Server host."""
@property
def port(self):
"""Server port."""
@property
def scheme(self):
"""URL scheme."""
def make_url(self, path):
"""Create URL for path."""
class TestServer(BaseTestServer):
def __init__(self, app, *, port=None, **kwargs):
"""
Test server for web applications.
Parameters:
- app: Web application instance
- port (int): Server port
"""
class RawTestServer(BaseTestServer):
def __init__(self, handler, *, port=None, **kwargs):
"""
Raw test server for request handlers.
Parameters:
- handler: Request handler function
- port (int): Server port
"""HTTP client specifically designed for testing with automatic server integration and simplified request methods.
class TestClient:
def __init__(self, server, *, cookie_jar=None, **kwargs):
"""
Test HTTP client.
Parameters:
- server: Test server instance
- cookie_jar: Cookie jar for client
"""
async def start_server(self):
"""Start associated test server."""
async def close(self):
"""Close client and server."""
def make_url(self, path):
"""Create URL for path."""
async def request(self, method, path, **kwargs):
"""Make HTTP request to test server."""
async def get(self, path, **kwargs):
"""Make GET request."""
async def post(self, path, **kwargs):
"""Make POST request."""
async def put(self, path, **kwargs):
"""Make PUT request."""
async def patch(self, path, **kwargs):
"""Make PATCH request."""
async def delete(self, path, **kwargs):
"""Make DELETE request."""
async def head(self, path, **kwargs):
"""Make HEAD request."""
async def options(self, path, **kwargs):
"""Make OPTIONS request."""
async def ws_connect(self, path, **kwargs):
"""Establish WebSocket connection."""
@property
def session(self):
"""Underlying HTTP session."""
@property
def host(self):
"""Server host."""
@property
def port(self):
"""Server port."""Base test case class providing utilities and setup for aiohttp application testing.
class AioHTTPTestCase:
"""Base test case for aiohttp applications."""
async def get_application(self):
"""
Create application for testing.
Must be implemented by subclasses.
Returns:
Application: Web application instance
"""
async def setUpAsync(self):
"""Async setup method."""
async def tearDownAsync(self):
"""Async teardown method."""
async def get_server(self, app):
"""
Create test server for application.
Parameters:
- app: Web application
Returns:
TestServer: Test server instance
"""
async def get_client(self, server):
"""
Create test client for server.
Parameters:
- server: Test server
Returns:
TestClient: Test client instance
"""
@property
def client(self):
"""Test client instance."""
@property
def server(self):
"""Test server instance."""
@property
def app(self):
"""Web application instance."""Utility functions for creating mocked objects and managing test environments.
def unused_port():
"""
Get unused TCP port.
Returns:
int: Available port number
"""
def make_mocked_request(
method,
path,
headers=None,
*,
version=None,
closing=False,
app=None,
writer=None,
protocol=None,
transport=None,
payload=None,
sslcontext=None,
client_max_size=1024**2,
loop=None,
match_info=None
):
"""
Create mocked request for testing.
Parameters:
- method (str): HTTP method
- path (str): Request path
- headers (dict): Request headers
- version: HTTP version
- closing (bool): Connection closing
- app: Application instance
- writer: Response writer
- protocol: Request protocol
- transport: Network transport
- payload: Request payload
- sslcontext: SSL context
- client_max_size (int): Max client payload size
- loop: Event loop
- match_info: URL match info
Returns:
Request: Mocked request object
"""
def make_mocked_coro(return_value=None, raise_exception=None):
"""
Create mocked coroutine.
Parameters:
- return_value: Value to return from coroutine
- raise_exception: Exception to raise from coroutine
Returns:
Mock: Mocked coroutine function
"""
def setup_test_loop(loop_factory=None):
"""
Setup event loop for testing.
Parameters:
- loop_factory: Factory function for creating loop
Returns:
Event loop instance
"""
def teardown_test_loop(loop, fast=False):
"""
Teardown test event loop.
Parameters:
- loop: Event loop to teardown
- fast (bool): Fast teardown mode
"""
def loop_context(loop_factory=None, fast=False):
"""
Context manager for test event loop.
Parameters:
- loop_factory: Factory function for creating loop
- fast (bool): Fast teardown mode
Returns:
Context manager yielding event loop
"""
def unittest_run_loop(func):
"""
Decorator to run test function in event loop.
Parameters:
- func: Test function to run
Returns:
Wrapped test function
"""Pytest fixtures and utilities for seamless integration with pytest testing framework.
# Pytest fixtures (available when using aiohttp.pytest_plugin)
def loop():
"""
Event loop fixture.
Yields:
Event loop for test
"""
def unused_port():
"""
Unused port fixture.
Returns:
int: Available port number
"""
def aiohttp_client():
"""
Test client factory fixture.
Returns:
Function: Factory function for creating test clients
"""
def aiohttp_server():
"""
Test server factory fixture.
Returns:
Function: Factory function for creating test servers
"""
def aiohttp_raw_server():
"""
Raw test server factory fixture.
Returns:
Function: Factory function for creating raw test servers
"""import pytest
from aiohttp import web
import aiohttp
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
class TestWebApp(AioHTTPTestCase):
async def get_application(self):
"""Create test application."""
async def hello(request):
return web.Response(text='Hello, World!')
async def json_handler(request):
return web.json_response({'message': 'Hello, JSON!'})
app = web.Application()
app.router.add_get('/', hello)
app.router.add_get('/json', json_handler)
return app
@unittest_run_loop
async def test_hello(self):
"""Test hello endpoint."""
resp = await self.client.request('GET', '/')
self.assertEqual(resp.status, 200)
text = await resp.text()
self.assertEqual(text, 'Hello, World!')
@unittest_run_loop
async def test_json(self):
"""Test JSON endpoint."""
resp = await self.client.request('GET', '/json')
self.assertEqual(resp.status, 200)
data = await resp.json()
self.assertEqual(data['message'], 'Hello, JSON!')import pytest
from aiohttp import web
async def hello_handler(request):
name = request.query.get('name', 'World')
return web.Response(text=f'Hello, {name}!')
async def create_app():
app = web.Application()
app.router.add_get('/hello', hello_handler)
return app
@pytest.fixture
async def client(aiohttp_client):
"""Create test client."""
app = await create_app()
return await aiohttp_client(app)
async def test_hello_default(client):
"""Test hello with default name."""
resp = await client.get('/hello')
assert resp.status == 200
text = await resp.text()
assert text == 'Hello, World!'
async def test_hello_custom_name(client):
"""Test hello with custom name."""
resp = await client.get('/hello?name=Alice')
assert resp.status == 200
text = await resp.text()
assert text == 'Hello, Alice!'
async def test_hello_post_method(client):
"""Test unsupported POST method."""
resp = await client.post('/hello')
assert resp.status == 405 # Method Not Allowedimport pytest
from aiohttp import web, WSMsgType
import json
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'echo':
await ws.send_json({
'type': 'echo_response',
'message': data['message']
})
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
return ws
@pytest.fixture
async def websocket_client(aiohttp_client):
"""Create WebSocket test client."""
app = web.Application()
app.router.add_get('/ws', websocket_handler)
client = await aiohttp_client(app)
return client
async def test_websocket_echo(websocket_client):
"""Test WebSocket echo functionality."""
async with websocket_client.ws_connect('/ws') as ws:
# Send echo request
await ws.send_json({
'type': 'echo',
'message': 'Hello, WebSocket!'
})
# Receive echo response
msg = await ws.receive()
assert msg.type == WSMsgType.TEXT
data = json.loads(msg.data)
assert data['type'] == 'echo_response'
assert data['message'] == 'Hello, WebSocket!'import pytest
from aiohttp import web
import aiohttp
import asyncio
import sqlite3
from contextlib import asynccontextmanager
class UserService:
def __init__(self, db_path=':memory:'):
self.db_path = db_path
self.connection = None
async def initialize(self):
"""Initialize database."""
# In real applications, use aiopg, aiomysql, etc.
self.connection = sqlite3.connect(self.db_path)
self.connection.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
self.connection.commit()
async def create_user(self, name, email):
"""Create new user."""
cursor = self.connection.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
self.connection.commit()
return cursor.lastrowid
async def get_user(self, user_id):
"""Get user by ID."""
cursor = self.connection.execute(
'SELECT id, name, email FROM users WHERE id = ?',
(user_id,)
)
row = cursor.fetchone()
if row:
return {'id': row[0], 'name': row[1], 'email': row[2]}
return None
async def close(self):
"""Close database connection."""
if self.connection:
self.connection.close()
async def create_user_handler(request):
"""Create new user."""
data = await request.json()
user_service = request.app['user_service']
try:
user_id = await user_service.create_user(
data['name'],
data['email']
)
return web.json_response({
'id': user_id,
'name': data['name'],
'email': data['email']
}, status=201)
except sqlite3.IntegrityError:
raise web.HTTPConflict(text='Email already exists')
async def get_user_handler(request):
"""Get user by ID."""
user_id = int(request.match_info['user_id'])
user_service = request.app['user_service']
user = await user_service.get_user(user_id)
if user:
return web.json_response(user)
else:
raise web.HTTPNotFound()
async def create_app():
"""Create application with user service."""
app = web.Application()
# Initialize user service
user_service = UserService()
await user_service.initialize()
app['user_service'] = user_service
# Add routes
app.router.add_post('/users', create_user_handler)
app.router.add_get('/users/{user_id}', get_user_handler)
# Cleanup
async def cleanup_user_service(app):
await app['user_service'].close()
app.on_cleanup.append(cleanup_user_service)
return app
@pytest.fixture
async def client(aiohttp_client):
"""Create test client with database."""
app = await create_app()
return await aiohttp_client(app)
async def test_create_user(client):
"""Test user creation."""
user_data = {
'name': 'John Doe',
'email': 'john@example.com'
}
resp = await client.post('/users', json=user_data)
assert resp.status == 201
data = await resp.json()
assert data['name'] == 'John Doe'
assert data['email'] == 'john@example.com'
assert 'id' in data
async def test_get_user(client):
"""Test user retrieval."""
# First create a user
user_data = {
'name': 'Jane Doe',
'email': 'jane@example.com'
}
create_resp = await client.post('/users', json=user_data)
created_user = await create_resp.json()
user_id = created_user['id']
# Then retrieve the user
get_resp = await client.get(f'/users/{user_id}')
assert get_resp.status == 200
user = await get_resp.json()
assert user['id'] == user_id
assert user['name'] == 'Jane Doe'
assert user['email'] == 'jane@example.com'
async def test_get_nonexistent_user(client):
"""Test getting non-existent user."""
resp = await client.get('/users/999')
assert resp.status == 404
async def test_duplicate_email(client):
"""Test creating user with duplicate email."""
user_data = {
'name': 'User One',
'email': 'duplicate@example.com'
}
# Create first user
resp1 = await client.post('/users', json=user_data)
assert resp1.status == 201
# Try to create user with same email
user_data['name'] = 'User Two'
resp2 = await client.post('/users', json=user_data)
assert resp2.status == 409 # Conflictimport asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
async def load_test_endpoint(url, num_requests=100, concurrency=10):
"""
Perform load testing on an endpoint.
Parameters:
- url (str): Endpoint URL to test
- num_requests (int): Total number of requests
- concurrency (int): Concurrent request limit
"""
semaphore = asyncio.Semaphore(concurrency)
results = []
async def make_request(session, request_id):
async with semaphore:
start_time = time.time()
try:
async with session.get(url) as response:
status = response.status
# Consume response to complete request
await response.read()
end_time = time.time()
return {
'request_id': request_id,
'status': status,
'duration': end_time - start_time,
'success': 200 <= status < 300
}
except Exception as e:
end_time = time.time()
return {
'request_id': request_id,
'status': 0,
'duration': end_time - start_time,
'success': False,
'error': str(e)
}
# Perform load test
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
make_request(session, i)
for i in range(num_requests)
]
results = await asyncio.gather(*tasks)
end_time = time.time()
total_duration = end_time - start_time
# Calculate statistics
successful_requests = [r for r in results if r['success']]
failed_requests = [r for r in results if not r['success']]
if successful_requests:
durations = [r['duration'] for r in successful_requests]
avg_duration = sum(durations) / len(durations)
min_duration = min(durations)
max_duration = max(durations)
else:
avg_duration = min_duration = max_duration = 0
return {
'total_requests': num_requests,
'successful_requests': len(successful_requests),
'failed_requests': len(failed_requests),
'success_rate': len(successful_requests) / num_requests * 100,
'total_duration': total_duration,
'requests_per_second': num_requests / total_duration,
'avg_response_time': avg_duration,
'min_response_time': min_duration,
'max_response_time': max_duration
}
# Usage example
async def test_api_performance():
"""Test API performance under load."""
results = await load_test_endpoint(
'http://localhost:8080/api/test',
num_requests=1000,
concurrency=50
)
print(f"Load Test Results:")
print(f"Total Requests: {results['total_requests']}")
print(f"Successful: {results['successful_requests']}")
print(f"Failed: {results['failed_requests']}")
print(f"Success Rate: {results['success_rate']:.2f}%")
print(f"Requests/sec: {results['requests_per_second']:.2f}")
print(f"Avg Response Time: {results['avg_response_time']:.3f}s")
print(f"Min Response Time: {results['min_response_time']:.3f}s")
print(f"Max Response Time: {results['max_response_time']:.3f}s")
# Run load test
asyncio.run(test_api_performance())Install with Tessl CLI
npx tessl i tessl/pypi-aiohttp