CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oauthlib

A comprehensive Python library for implementing OAuth 1.0 and OAuth 2.0 authentication protocols

Overview
Eval results
Files

request-validation.mddocs/

Request Validation

Comprehensive request validation interfaces for OAuth 1.0 and OAuth 2.0 flows. Provides extensible validation framework for implementing custom authentication logic, security policies, and token management.

Capabilities

OAuth 2.0 Request Validator

Base request validation interface that must be implemented to provide custom authentication and authorization logic for OAuth 2.0 servers.

class RequestValidator:
    def __init__(self): ...
    
    # Client Authentication
    def client_authentication_required(
        self,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Determine if client authentication is required.
        
        Returns:
        True if client must authenticate, False otherwise
        """
    
    def authenticate_client(self, request, *args, **kwargs) -> bool:
        """
        Authenticate client credentials.
        
        Parameters:
        - request: OAuth request object with client credentials
        
        Returns:
        True if client is authenticated, False otherwise
        """
    
    def authenticate_client_id(
        self,
        client_id: str,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Authenticate client by ID only (for public clients).
        
        Parameters:
        - client_id: Client identifier
        - request: OAuth request object
        
        Returns:
        True if client ID is valid, False otherwise
        """
    
    # Client Validation
    def validate_client_id(
        self,
        client_id: str,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate client identifier.
        
        Parameters:
        - client_id: Client identifier to validate
        - request: OAuth request object
        
        Returns:
        True if client ID is valid, False otherwise
        """
    
    def validate_redirect_uri(
        self,
        client_id: str,
        redirect_uri: str,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate redirect URI for client.
        
        Parameters:
        - client_id: Client identifier
        - redirect_uri: Redirect URI to validate
        - request: OAuth request object
        
        Returns:
        True if redirect URI is valid for client, False otherwise
        """
    
    def get_default_redirect_uri(
        self,
        client_id: str,
        request,
        *args,
        **kwargs,
    ) -> str:
        """
        Get default redirect URI for client.
        
        Parameters:
        - client_id: Client identifier
        - request: OAuth request object
        
        Returns:
        Default redirect URI for client
        """
    
    # Scope Validation
    def validate_scopes(
        self,
        client_id: str,
        scopes: list[str],
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate requested scopes for client.
        
        Parameters:
        - client_id: Client identifier
        - scopes: List of requested scopes
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if scopes are valid for client, False otherwise
        """
    
    def get_default_scopes(
        self,
        client_id: str,
        request,
        *args,
        **kwargs,
    ) -> list[str]:
        """
        Get default scopes for client.
        
        Parameters:
        - client_id: Client identifier
        - request: OAuth request object
        
        Returns:
        List of default scopes for client
        """
    
    # Response Type Validation
    def validate_response_type(
        self,
        client_id: str,
        response_type: str,
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate response type for client.
        
        Parameters:
        - client_id: Client identifier
        - response_type: Response type (code, token)
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if response type is valid for client, False otherwise
        """
    
    # Authorization Code Validation
    def validate_code(
        self,
        client_id: str,
        code: str,
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate authorization code.
        
        Parameters:
        - client_id: Client identifier
        - code: Authorization code to validate
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if code is valid, False otherwise
        """
    
    def confirm_redirect_uri(
        self,
        client_id: str,
        code: str,
        redirect_uri: str,
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Confirm redirect URI matches original authorization request.
        
        Parameters:
        - client_id: Client identifier
        - code: Authorization code
        - redirect_uri: Redirect URI to confirm
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if redirect URI matches, False otherwise
        """
    
    def save_authorization_code(
        self,
        client_id: str,
        code: dict,
        request,
        *args,
        **kwargs,
    ) -> None:
        """
        Store authorization code for later validation.
        
        Parameters:
        - client_id: Client identifier
        - code: Authorization code data
        - request: OAuth request object
        """
    
    def invalidate_authorization_code(
        self,
        client_id: str,
        code: str,
        request,
        *args,
        **kwargs,
    ) -> None:
        """
        Invalidate used authorization code.
        
        Parameters:
        - client_id: Client identifier
        - code: Authorization code to invalidate
        - request: OAuth request object
        """
    
    # Grant Type Validation
    def validate_grant_type(
        self,
        client_id: str,
        grant_type: str,
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate grant type for client.
        
        Parameters:
        - client_id: Client identifier
        - grant_type: Grant type to validate
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if grant type is valid for client, False otherwise
        """
    
    # User Credentials Validation (Password Grant)
    def validate_user(
        self,
        username: str,
        password: str,
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate user credentials for password grant.
        
        Parameters:
        - username: User's username
        - password: User's password
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if credentials are valid, False otherwise
        """
    
    # Token Validation
    def validate_bearer_token(
        self,
        token: str,
        scopes: list[str],
        request,
    ) -> bool:
        """
        Validate bearer token and required scopes.
        
        Parameters:
        - token: Bearer token to validate
        - scopes: Required scopes for resource access
        - request: OAuth request object
        
        Returns:
        True if token is valid and has required scopes, False otherwise
        """
    
    def validate_refresh_token(
        self,
        refresh_token: str,
        client,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Validate refresh token.
        
        Parameters:
        - refresh_token: Refresh token to validate
        - client: Client object
        - request: OAuth request object
        
        Returns:
        True if refresh token is valid, False otherwise
        """
    
    def get_original_scopes(
        self,
        refresh_token: str,
        request,
        *args,
        **kwargs,
    ) -> list[str]:
        """
        Get original scopes associated with refresh token.
        
        Parameters:
        - refresh_token: Refresh token
        - request: OAuth request object
        
        Returns:
        List of original scopes
        """
    
    def is_within_original_scope(
        self,
        request_scopes: list[str],
        refresh_token: str,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Check if requested scopes are within original scope.
        
        Parameters:
        - request_scopes: Requested scopes
        - refresh_token: Refresh token
        - request: OAuth request object
        
        Returns:
        True if requested scopes are subset of original, False otherwise
        """
    
    # Token Storage
    def save_token(
        self,
        token: dict,
        request,
        *args,
        **kwargs,
    ) -> None:
        """
        Store access token.
        
        Parameters:
        - token: Token data to store
        - request: OAuth request object
        """
    
    def save_bearer_token(
        self,
        token: dict,
        request,
        *args,
        **kwargs,
    ) -> None:
        """
        Store bearer token.
        
        Parameters:
        - token: Bearer token data to store
        - request: OAuth request object
        """
    
    # Token Management
    def revoke_token(
        self,
        token: str,
        token_type_hint: str,
        request,
        *args,
        **kwargs,
    ) -> None:
        """
        Revoke access or refresh token.
        
        Parameters:
        - token: Token to revoke
        - token_type_hint: Type of token (access_token, refresh_token)
        - request: OAuth request object
        """
    
    def rotate_refresh_token(self, request) -> bool:
        """
        Determine if refresh token should be rotated.
        
        Parameters:
        - request: OAuth request object
        
        Returns:
        True if refresh token should be rotated, False otherwise
        """
    
    def introspect_token(
        self,
        token: str,
        token_type_hint: str,
        request,
        *args,
        **kwargs,
    ) -> dict | None:
        """
        Get token metadata for introspection.
        
        Parameters:
        - token: Token to introspect
        - token_type_hint: Type of token
        - request: OAuth request object
        
        Returns:
        Dictionary with token metadata or None if invalid
        """
    
    # PKCE Support
    def is_pkce_required(self, client_id: str, request) -> bool:
        """
        Determine if PKCE is required for client.
        
        Parameters:
        - client_id: Client identifier
        - request: OAuth request object
        
        Returns:
        True if PKCE is required, False otherwise
        """
    
    def get_code_challenge(self, code: str, request) -> str:
        """
        Get PKCE code challenge for authorization code.
        
        Parameters:
        - code: Authorization code
        - request: OAuth request object
        
        Returns:
        PKCE code challenge
        """
    
    def get_code_challenge_method(self, code: str, request) -> str:
        """
        Get PKCE code challenge method for authorization code.
        
        Parameters:
        - code: Authorization code
        - request: OAuth request object
        
        Returns:
        PKCE code challenge method (plain, S256)
        """
    
    # CORS Support
    def is_origin_allowed(
        self,
        client_id: str,
        origin: str,
        request,
        *args,
        **kwargs,
    ) -> bool:
        """
        Check if origin is allowed for CORS requests.
        
        Parameters:
        - client_id: Client identifier
        - origin: Request origin
        - request: OAuth request object
        
        Returns:
        True if origin is allowed, False otherwise
        """

Implementation Example

from oauthlib.oauth2 import RequestValidator
import hashlib
import secrets
from datetime import datetime, timedelta

class DatabaseRequestValidator(RequestValidator):
    """Complete request validator implementation using database storage."""
    
    def __init__(self, db_session):
        self.db = db_session
    
    # Client Management
    def validate_client_id(self, client_id, request, *args, **kwargs):
        client = self.db.query(Client).filter_by(client_id=client_id).first()
        return client is not None and client.is_active
    
    def authenticate_client(self, request, *args, **kwargs):
        client_id = getattr(request, 'client_id', None)
        client_secret = getattr(request, 'client_secret', None)
        
        if not client_id or not client_secret:
            return False
        
        client = self.db.query(Client).filter_by(client_id=client_id).first()
        if not client or not client.is_active:
            return False
        
        # Use constant-time comparison for security
        return secrets.compare_digest(client.client_secret, client_secret)
    
    def client_authentication_required(self, request, *args, **kwargs):
        # Require authentication for confidential clients
        client = self.db.query(Client).filter_by(
            client_id=request.client_id
        ).first()
        return client and client.client_type == 'confidential'
    
    # Redirect URI Validation
    def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):
        client = self.db.query(Client).filter_by(client_id=client_id).first()
        if not client:
            return False
        
        return redirect_uri in [uri.uri for uri in client.redirect_uris]
    
    def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
        client = self.db.query(Client).filter_by(client_id=client_id).first()
        if client and client.redirect_uris:
            return client.redirect_uris[0].uri
        return None
    
    # Scope Management
    def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
        client_obj = self.db.query(Client).filter_by(client_id=client_id).first()
        if not client_obj:
            return False
        
        allowed_scopes = {scope.name for scope in client_obj.allowed_scopes}
        return all(scope in allowed_scopes for scope in scopes)
    
    def get_default_scopes(self, client_id, request, *args, **kwargs):
        client = self.db.query(Client).filter_by(client_id=client_id).first()
        if client:
            return [scope.name for scope in client.default_scopes]
        return []
    
    # Authorization Code Management
    def save_authorization_code(self, client_id, code, request, *args, **kwargs):
        auth_code = AuthorizationCode(
            client_id=client_id,
            user_id=request.user.id,
            code=code['code'],
            redirect_uri=request.redirect_uri,
            scopes=' '.join(request.scopes),
            code_challenge=getattr(request, 'code_challenge', None),
            code_challenge_method=getattr(request, 'code_challenge_method', None),
            expires_at=datetime.utcnow() + timedelta(minutes=10)
        )
        self.db.add(auth_code)
        self.db.commit()
    
    def validate_code(self, client_id, code, client, request, *args, **kwargs):
        auth_code = self.db.query(AuthorizationCode).filter_by(
            client_id=client_id,
            code=code
        ).first()
        
        if not auth_code:
            return False
        
        if auth_code.expires_at < datetime.utcnow():
            return False
        
        # Validate PKCE if present
        if auth_code.code_challenge:
            code_verifier = getattr(request, 'code_verifier', None)
            if not code_verifier:
                return False
            
            if auth_code.code_challenge_method == 'S256':
                challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
            else:
                challenge = code_verifier
            
            if not secrets.compare_digest(auth_code.code_challenge, challenge):
                return False
        
        return True
    
    def confirm_redirect_uri(self, client_id, code, redirect_uri, client, request, *args, **kwargs):
        auth_code = self.db.query(AuthorizationCode).filter_by(
            client_id=client_id,
            code=code
        ).first()
        
        return auth_code and auth_code.redirect_uri == redirect_uri
    
    def invalidate_authorization_code(self, client_id, code, request, *args, **kwargs):
        auth_code = self.db.query(AuthorizationCode).filter_by(
            client_id=client_id,
            code=code
        ).first()
        
        if auth_code:
            self.db.delete(auth_code)
            self.db.commit()
    
    # Token Management
    def save_bearer_token(self, token, request, *args, **kwargs):
        access_token = AccessToken(
            client_id=request.client_id,
            user_id=getattr(request, 'user_id', None),
            token=token['access_token'],
            scopes=token.get('scope', ''),
            expires_at=datetime.utcnow() + timedelta(seconds=token['expires_in'])
        )
        self.db.add(access_token)
        
        if 'refresh_token' in token:
            refresh_token = RefreshToken(
                client_id=request.client_id,
                user_id=getattr(request, 'user_id', None),
                token=token['refresh_token'],
                scopes=token.get('scope', ''),
                access_token=access_token
            )
            self.db.add(refresh_token)
        
        self.db.commit()
    
    def validate_bearer_token(self, token, scopes, request):
        access_token = self.db.query(AccessToken).filter_by(
            token=token
        ).first()
        
        if not access_token:
            return False
        
        if access_token.expires_at < datetime.utcnow():
            return False
        
        token_scopes = access_token.scopes.split()
        if not all(scope in token_scopes for scope in scopes):
            return False
        
        # Add user info to request
        request.user_id = access_token.user_id
        request.client_id = access_token.client_id
        request.scopes = token_scopes
        
        return True
    
    def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):
        token = self.db.query(RefreshToken).filter_by(
            token=refresh_token,
            client_id=request.client_id
        ).first()
        
        return token is not None
    
    # User Authentication (Password Grant)
    def validate_user(self, username, password, client, request, *args, **kwargs):
        user = self.db.query(User).filter_by(username=username).first()
        if not user or not user.is_active:
            return False
        
        # Use secure password verification
        if verify_password(password, user.password_hash):
            request.user_id = user.id
            return True
        
        return False
    
    # Grant Type Validation
    def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):
        client_obj = self.db.query(Client).filter_by(client_id=client_id).first()
        if not client_obj:
            return False
        
        allowed_grants = {grant.grant_type for grant in client_obj.allowed_grants}
        return grant_type in allowed_grants
    
    # PKCE Support
    def is_pkce_required(self, client_id, request):
        client = self.db.query(Client).filter_by(client_id=client_id).first()
        return client and client.require_pkce
    
    # Token Introspection
    def introspect_token(self, token, token_type_hint, request, *args, **kwargs):
        if token_type_hint == 'refresh_token':
            token_obj = self.db.query(RefreshToken).filter_by(token=token).first()
        else:
            token_obj = self.db.query(AccessToken).filter_by(token=token).first()
        
        if not token_obj:
            return {'active': False}
        
        if hasattr(token_obj, 'expires_at') and token_obj.expires_at < datetime.utcnow():
            return {'active': False}
        
        return {
            'active': True,
            'client_id': token_obj.client_id,
            'username': token_obj.user.username if token_obj.user else None,
            'scope': token_obj.scopes,
            'exp': int(token_obj.expires_at.timestamp()) if hasattr(token_obj, 'expires_at') else None,
            'token_type': 'Bearer' if isinstance(token_obj, AccessToken) else 'refresh_token'
        }

Install with Tessl CLI

npx tessl i tessl/pypi-oauthlib

docs

common-utilities.md

device-flow.md

error-handling.md

index.md

oauth1.md

oauth2-clients.md

oauth2-servers.md

openid-connect.md

request-validation.md

token-management.md

tile.json