OpenAPI/Swagger spec-first web framework for Python with automatic request validation and response serialization
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Built-in security handlers for common authentication methods with support for Bearer tokens, Basic auth, API keys, and OAuth2. Connexion provides automatic security validation based on OpenAPI security schemes.
Pre-implemented security handlers for common authentication patterns.
def bearer_auth(token: str) -> dict:
"""
Bearer token authentication handler.
Parameters:
- token: Bearer token from Authorization header
Returns:
dict: Token information or user context
Raises:
UnauthorizedProblem: If token is invalid
"""
def basic_auth(username: str, password: str) -> dict:
"""
HTTP Basic authentication handler.
Parameters:
- username: Username from Authorization header
- password: Password from Authorization header
Returns:
dict: User information or authentication context
Raises:
UnauthorizedProblem: If credentials are invalid
"""
def api_key_auth(api_key: str, required_scopes: list = None) -> dict:
"""
API key authentication handler.
Parameters:
- api_key: API key from header, query, or cookie
- required_scopes: List of required scopes for the operation
Returns:
dict: API key information or user context
Raises:
UnauthorizedProblem: If API key is invalid
ForbiddenProblem: If required scopes are not met
"""
def oauth2_auth(token: str, required_scopes: list) -> dict:
"""
OAuth2 authentication handler.
Parameters:
- token: OAuth2 access token
- required_scopes: List of required OAuth2 scopes
Returns:
dict: Token information with user context
Raises:
UnauthorizedProblem: If token is invalid
ForbiddenProblem: If required scopes are not met
"""Configure security handlers for different authentication schemes.
# Security handler mapping
SECURITY_HANDLERS = {
'bearerAuth': bearer_auth,
'basicAuth': basic_auth,
'apiKeyAuth': api_key_auth,
'oauth2': oauth2_auth
}
def configure_security(app, security_map: dict = None):
"""
Configure security handlers for the application.
Parameters:
- app: Connexion application instance
- security_map: Custom security handler mapping
"""Access authentication information in endpoint functions.
from connexion.context import request
# Access authenticated user information
user_info = request.context.get('user')
token_info = request.context.get('token_info')Exceptions for authentication and authorization failures.
class UnauthorizedProblem(ProblemException):
"""401 Unauthorized - Authentication required or failed"""
def __init__(self, title: str = "Unauthorized", **kwargs):
super().__init__(status=401, title=title, **kwargs)
class ForbiddenProblem(ProblemException):
"""403 Forbidden - Insufficient permissions"""
def __init__(self, title: str = "Forbidden", **kwargs):
super().__init__(status=403, title=title, **kwargs)def bearer_auth(token):
"""Validate bearer token and return user info"""
# Validate token (e.g., JWT verification)
try:
user_id = verify_jwt_token(token)
user = get_user_by_id(user_id)
if not user:
raise UnauthorizedProblem("Invalid token")
return {
'user_id': user.id,
'username': user.username,
'roles': user.roles
}
except InvalidTokenError:
raise UnauthorizedProblem("Invalid or expired token")
# OpenAPI security scheme:
# securitySchemes:
# bearerAuth:
# type: http
# scheme: bearer
# bearerFormat: JWT
# Operation security:
# security:
# - bearerAuth: []
# Endpoint implementation
def get_protected_resource():
# Access authenticated user
user = request.context['user']
return {"message": f"Hello {user['username']}"}def api_key_auth(api_key, required_scopes=None):
"""Validate API key and check scopes"""
# Look up API key in database
key_info = get_api_key_info(api_key)
if not key_info or not key_info.active:
raise UnauthorizedProblem("Invalid API key")
# Check if key has expired
if key_info.expires_at and key_info.expires_at < datetime.utcnow():
raise UnauthorizedProblem("API key has expired")
# Check required scopes
if required_scopes:
if not set(required_scopes).issubset(set(key_info.scopes)):
raise ForbiddenProblem("Insufficient API key permissions")
return {
'api_key_id': key_info.id,
'scopes': key_info.scopes,
'client_name': key_info.client_name
}
# OpenAPI security scheme:
# securitySchemes:
# apiKey:
# type: apiKey
# in: header
# name: X-API-Key
# Usage in endpoints
def create_resource():
# Access API key info
api_info = request.context['token_info']
client_name = api_info['client_name']
# Create resource...
return {"created_by": client_name}, 201def oauth2_auth(token, required_scopes):
"""Validate OAuth2 token and check scopes"""
# Verify token with OAuth2 provider
try:
token_info = verify_oauth2_token(token)
# Check if token has required scopes
token_scopes = set(token_info.get('scope', '').split())
required_scopes_set = set(required_scopes)
if not required_scopes_set.issubset(token_scopes):
raise ForbiddenProblem(
"Insufficient scope",
detail=f"Required: {required_scopes}, Available: {list(token_scopes)}"
)
return {
'user_id': token_info['sub'],
'scopes': list(token_scopes),
'client_id': token_info['client_id']
}
except TokenVerificationError:
raise UnauthorizedProblem("Invalid OAuth2 token")
# OpenAPI security scheme:
# securitySchemes:
# oauth2:
# type: oauth2
# flows:
# authorizationCode:
# authorizationUrl: https://auth.example.com/oauth/authorize
# tokenUrl: https://auth.example.com/oauth/token
# scopes:
# read: Read access
# write: Write access
# admin: Admin accessdef basic_auth(username, password):
"""Validate username/password credentials"""
# Authenticate user
user = authenticate_user(username, password)
if not user:
raise UnauthorizedProblem("Invalid credentials")
if not user.active:
raise ForbiddenProblem("Account disabled")
return {
'user_id': user.id,
'username': user.username,
'roles': user.roles
}
def authenticate_user(username, password):
"""Authenticate user credentials"""
user = get_user_by_username(username)
if user and verify_password(password, user.password_hash):
return user
return None
# OpenAPI security scheme:
# securitySchemes:
# basicAuth:
# type: http
# scheme: basicdef custom_jwt_auth(token):
"""Custom JWT authentication with additional claims"""
try:
# Decode JWT with custom logic
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=['HS256'],
options={'verify_exp': True}
)
# Check custom claims
if payload.get('iss') != 'your-app':
raise UnauthorizedProblem("Invalid token issuer")
# Check user permissions
user_id = payload['sub']
permissions = get_user_permissions(user_id)
return {
'user_id': user_id,
'permissions': permissions,
'token_claims': payload
}
except jwt.ExpiredSignatureError:
raise UnauthorizedProblem("Token has expired")
except jwt.InvalidTokenError:
raise UnauthorizedProblem("Invalid token")
# Register custom handler
security_map = {
'customJWT': custom_jwt_auth
}
app.add_api('api.yaml', security_map=security_map)def require_role(required_role):
"""Decorator for role-based access control"""
def decorator(func):
def wrapper(*args, **kwargs):
user = request.context.get('user', {})
user_roles = user.get('roles', [])
if required_role not in user_roles:
raise ForbiddenProblem(
f"Role '{required_role}' required"
)
return func(*args, **kwargs)
return wrapper
return decorator
# Usage in endpoints
@require_role('admin')
def delete_user(user_id):
# Only admin users can delete users
delete_user_by_id(user_id)
return NoContent, 204
# Alternative: Check in function
def update_user(user_id):
user = request.context['user']
# Users can only update their own profile unless they're admin
if user['user_id'] != user_id and 'admin' not in user['roles']:
raise ForbiddenProblem("Can only update your own profile")
# Update user...
return {"status": "updated"}# Support multiple authentication methods in OpenAPI
# security:
# - bearerAuth: []
# - apiKey: []
# - basicAuth: []
def flexible_auth_handler():
"""Handle multiple authentication methods"""
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
return bearer_auth(token)
elif auth_header.startswith('Basic '):
# Extract credentials
import base64
credentials = base64.b64decode(auth_header[6:]).decode()
username, password = credentials.split(':', 1)
return basic_auth(username, password)
elif 'X-API-Key' in request.headers:
api_key = request.headers['X-API-Key']
return api_key_auth(api_key)
else:
raise UnauthorizedProblem("Authentication required")
# Configure multiple security handlers
security_map = {
'bearerAuth': bearer_auth,
'apiKey': api_key_auth,
'basicAuth': basic_auth,
'flexibleAuth': flexible_auth_handler
}
app.add_api('api.yaml', security_map=security_map)from connexion.middleware import SecurityMiddleware
class CustomSecurityMiddleware(SecurityMiddleware):
"""Custom security middleware with additional features"""
async def __call__(self, scope, receive, send):
# Add custom security headers
async def send_wrapper(message):
if message['type'] == 'http.response.start':
headers = dict(message.get('headers', []))
headers[b'x-frame-options'] = b'DENY'
headers[b'x-content-type-options'] = b'nosniff'
message['headers'] = list(headers.items())
await send(message)
await super().__call__(scope, receive, send_wrapper)
# Use custom middleware
app = AsyncApp(
__name__,
middlewares=[CustomSecurityMiddleware()]
)Install with Tessl CLI
npx tessl i tessl/pypi-connexion