CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-connexion

OpenAPI/Swagger spec-first web framework for Python with automatic request validation and response serialization

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

security.mddocs/

Security & Authentication

Built-in security handlers for common authentication methods with support for Bearer tokens, Basic auth, API keys, and OAuth2. Connexion provides automatic security validation based on OpenAPI security schemes.

Capabilities

Built-in Security Handlers

Pre-implemented security handlers for common authentication patterns.

def bearer_auth(token: str) -> dict:
    """
    Bearer token authentication handler.
    
    Parameters:
    - token: Bearer token from Authorization header
    
    Returns:
    dict: Token information or user context
    
    Raises:
    UnauthorizedProblem: If token is invalid
    """

def basic_auth(username: str, password: str) -> dict:
    """
    HTTP Basic authentication handler.
    
    Parameters:
    - username: Username from Authorization header
    - password: Password from Authorization header
    
    Returns:
    dict: User information or authentication context
    
    Raises:
    UnauthorizedProblem: If credentials are invalid
    """

def api_key_auth(api_key: str, required_scopes: list = None) -> dict:
    """
    API key authentication handler.
    
    Parameters:
    - api_key: API key from header, query, or cookie
    - required_scopes: List of required scopes for the operation
    
    Returns:
    dict: API key information or user context
    
    Raises:
    UnauthorizedProblem: If API key is invalid
    ForbiddenProblem: If required scopes are not met
    """

def oauth2_auth(token: str, required_scopes: list) -> dict:
    """
    OAuth2 authentication handler.
    
    Parameters:
    - token: OAuth2 access token
    - required_scopes: List of required OAuth2 scopes
    
    Returns:
    dict: Token information with user context
    
    Raises:
    UnauthorizedProblem: If token is invalid
    ForbiddenProblem: If required scopes are not met
    """

Security Configuration

Configure security handlers for different authentication schemes.

# Security handler mapping
SECURITY_HANDLERS = {
    'bearerAuth': bearer_auth,
    'basicAuth': basic_auth,
    'apiKeyAuth': api_key_auth,
    'oauth2': oauth2_auth
}

def configure_security(app, security_map: dict = None):
    """
    Configure security handlers for the application.
    
    Parameters:
    - app: Connexion application instance
    - security_map: Custom security handler mapping
    """

Authentication Context

Access authentication information in endpoint functions.

from connexion.context import request

# Access authenticated user information
user_info = request.context.get('user')
token_info = request.context.get('token_info')

Security Exceptions

Exceptions for authentication and authorization failures.

class UnauthorizedProblem(ProblemException):
    """401 Unauthorized - Authentication required or failed"""
    def __init__(self, title: str = "Unauthorized", **kwargs):
        super().__init__(status=401, title=title, **kwargs)

class ForbiddenProblem(ProblemException):
    """403 Forbidden - Insufficient permissions"""
    def __init__(self, title: str = "Forbidden", **kwargs):
        super().__init__(status=403, title=title, **kwargs)

Usage Examples

Bearer Token Authentication

def bearer_auth(token):
    """Validate bearer token and return user info"""
    # Validate token (e.g., JWT verification)
    try:
        user_id = verify_jwt_token(token)
        user = get_user_by_id(user_id)
        
        if not user:
            raise UnauthorizedProblem("Invalid token")
        
        return {
            'user_id': user.id,
            'username': user.username,
            'roles': user.roles
        }
    except InvalidTokenError:
        raise UnauthorizedProblem("Invalid or expired token")

# OpenAPI security scheme:
# securitySchemes:
#   bearerAuth:
#     type: http
#     scheme: bearer
#     bearerFormat: JWT

# Operation security:
# security:
#   - bearerAuth: []

# Endpoint implementation
def get_protected_resource():
    # Access authenticated user
    user = request.context['user']
    return {"message": f"Hello {user['username']}"}

API Key Authentication

def api_key_auth(api_key, required_scopes=None):
    """Validate API key and check scopes"""
    # Look up API key in database
    key_info = get_api_key_info(api_key)
    
    if not key_info or not key_info.active:
        raise UnauthorizedProblem("Invalid API key")
    
    # Check if key has expired
    if key_info.expires_at and key_info.expires_at < datetime.utcnow():
        raise UnauthorizedProblem("API key has expired")
    
    # Check required scopes
    if required_scopes:
        if not set(required_scopes).issubset(set(key_info.scopes)):
            raise ForbiddenProblem("Insufficient API key permissions")
    
    return {
        'api_key_id': key_info.id,
        'scopes': key_info.scopes,
        'client_name': key_info.client_name
    }

# OpenAPI security scheme:
# securitySchemes:
#   apiKey:
#     type: apiKey
#     in: header
#     name: X-API-Key

# Usage in endpoints
def create_resource():
    # Access API key info
    api_info = request.context['token_info']
    client_name = api_info['client_name']
    
    # Create resource...
    return {"created_by": client_name}, 201

OAuth2 Authentication

def oauth2_auth(token, required_scopes):
    """Validate OAuth2 token and check scopes"""
    # Verify token with OAuth2 provider
    try:
        token_info = verify_oauth2_token(token)
        
        # Check if token has required scopes
        token_scopes = set(token_info.get('scope', '').split())
        required_scopes_set = set(required_scopes)
        
        if not required_scopes_set.issubset(token_scopes):
            raise ForbiddenProblem(
                "Insufficient scope",
                detail=f"Required: {required_scopes}, Available: {list(token_scopes)}"
            )
        
        return {
            'user_id': token_info['sub'],
            'scopes': list(token_scopes),
            'client_id': token_info['client_id']
        }
    
    except TokenVerificationError:
        raise UnauthorizedProblem("Invalid OAuth2 token")

# OpenAPI security scheme:
# securitySchemes:
#   oauth2:
#     type: oauth2
#     flows:
#       authorizationCode:
#         authorizationUrl: https://auth.example.com/oauth/authorize
#         tokenUrl: https://auth.example.com/oauth/token
#         scopes:
#           read: Read access
#           write: Write access
#           admin: Admin access

Basic Authentication

def basic_auth(username, password):
    """Validate username/password credentials"""
    # Authenticate user
    user = authenticate_user(username, password)
    
    if not user:
        raise UnauthorizedProblem("Invalid credentials")
    
    if not user.active:
        raise ForbiddenProblem("Account disabled")
    
    return {
        'user_id': user.id,
        'username': user.username,
        'roles': user.roles
    }

def authenticate_user(username, password):
    """Authenticate user credentials"""
    user = get_user_by_username(username)
    if user and verify_password(password, user.password_hash):
        return user
    return None

# OpenAPI security scheme:
# securitySchemes:
#   basicAuth:
#     type: http
#     scheme: basic

Custom Security Handler

def custom_jwt_auth(token):
    """Custom JWT authentication with additional claims"""
    try:
        # Decode JWT with custom logic
        payload = jwt.decode(
            token,
            JWT_SECRET,
            algorithms=['HS256'],
            options={'verify_exp': True}
        )
        
        # Check custom claims
        if payload.get('iss') != 'your-app':
            raise UnauthorizedProblem("Invalid token issuer")
        
        # Check user permissions
        user_id = payload['sub']
        permissions = get_user_permissions(user_id)
        
        return {
            'user_id': user_id,
            'permissions': permissions,
            'token_claims': payload
        }
    
    except jwt.ExpiredSignatureError:
        raise UnauthorizedProblem("Token has expired")
    except jwt.InvalidTokenError:
        raise UnauthorizedProblem("Invalid token")

# Register custom handler
security_map = {
    'customJWT': custom_jwt_auth
}

app.add_api('api.yaml', security_map=security_map)

Role-Based Access Control

def require_role(required_role):
    """Decorator for role-based access control"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            user = request.context.get('user', {})
            user_roles = user.get('roles', [])
            
            if required_role not in user_roles:
                raise ForbiddenProblem(
                    f"Role '{required_role}' required"
                )
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

# Usage in endpoints
@require_role('admin')
def delete_user(user_id):
    # Only admin users can delete users
    delete_user_by_id(user_id)
    return NoContent, 204

# Alternative: Check in function
def update_user(user_id):
    user = request.context['user']
    
    # Users can only update their own profile unless they're admin
    if user['user_id'] != user_id and 'admin' not in user['roles']:
        raise ForbiddenProblem("Can only update your own profile")
    
    # Update user...
    return {"status": "updated"}

Multiple Authentication Methods

# Support multiple authentication methods in OpenAPI
# security:
#   - bearerAuth: []
#   - apiKey: []
#   - basicAuth: []

def flexible_auth_handler():
    """Handle multiple authentication methods"""
    auth_header = request.headers.get('Authorization', '')
    
    if auth_header.startswith('Bearer '):
        token = auth_header[7:]
        return bearer_auth(token)
    elif auth_header.startswith('Basic '):
        # Extract credentials
        import base64
        credentials = base64.b64decode(auth_header[6:]).decode()
        username, password = credentials.split(':', 1)
        return basic_auth(username, password)
    elif 'X-API-Key' in request.headers:
        api_key = request.headers['X-API-Key']
        return api_key_auth(api_key)
    else:
        raise UnauthorizedProblem("Authentication required")

# Configure multiple security handlers
security_map = {
    'bearerAuth': bearer_auth,
    'apiKey': api_key_auth,
    'basicAuth': basic_auth,
    'flexibleAuth': flexible_auth_handler
}

app.add_api('api.yaml', security_map=security_map)

Security Middleware

from connexion.middleware import SecurityMiddleware

class CustomSecurityMiddleware(SecurityMiddleware):
    """Custom security middleware with additional features"""
    
    async def __call__(self, scope, receive, send):
        # Add custom security headers
        async def send_wrapper(message):
            if message['type'] == 'http.response.start':
                headers = dict(message.get('headers', []))
                headers[b'x-frame-options'] = b'DENY'
                headers[b'x-content-type-options'] = b'nosniff'
                message['headers'] = list(headers.items())
            await send(message)
        
        await super().__call__(scope, receive, send_wrapper)

# Use custom middleware
app = AsyncApp(
    __name__,
    middlewares=[CustomSecurityMiddleware()]
)

Install with Tessl CLI

npx tessl i tessl/pypi-connexion

docs

applications.md

cli.md

exceptions.md

index.md

operation-resolution.md

request-response.md

security.md

validation.md

tile.json