CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oauthlib

A comprehensive Python library for implementing OAuth 1.0 and OAuth 2.0 authentication protocols

Overview
Eval results
Files

oauth2-servers.mddocs/

OAuth 2.0 Servers

Complete server-side OAuth 2.0 implementation providing authorization servers, resource servers, and token endpoints. Supports all standard grant types, token introspection, revocation, and comprehensive request validation.

Capabilities

Authorization Endpoint

OAuth 2.0 authorization endpoint handling authorization requests and user consent. Manages the authorization code flow by presenting authorization pages to users and issuing authorization codes.

class AuthorizationEndpoint:
    def __init__(self, default_response_type: str, default_token_type: str, response_types: dict[str, callable]): ...
    
    def create_authorization_response(
        self,
        uri: str,
        http_method: str = "GET",
        body: str | None = None,
        headers: dict[str, str] | None = None,
        scopes: list[str] | None = None,
        credentials: dict[str, str] | None = None,
    ) -> tuple[dict[str, str], str, int]:
        """
        Create authorization response.
        
        Parameters:
        - uri: Authorization request URI
        - http_method: HTTP method
        - body: Request body
        - headers: Request headers
        - scopes: Requested scopes
        - credentials: User credentials and consent info
        
        Returns:
        Tuple of (headers, body, status_code) for authorization response
        """
    
    def validate_authorization_request(
        self,
        uri: str,
        http_method: str = "GET",
        body: str | None = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        """Validate authorization request parameters."""

Token Endpoint

OAuth 2.0 token endpoint handling token requests for all grant types. Issues access tokens, refresh tokens, and manages token exchange operations.

class TokenEndpoint:
    def __init__(
        self,
        default_grant_type=None,
        default_token_type=None,
        token_generator=None,
        refresh_token_generator=None,
        expires_in=None,
    ): ...
    
    def create_token_response(
        self,
        uri: str,
        http_method: str = "POST",
        body: str | None = None,
        headers: dict[str, str] | None = None,
        credentials: dict[str, str] | None = None,
        grant_type_handler=None,
        **kwargs,
    ) -> tuple[dict[str, str], str, int]:
        """
        Create token response for any grant type.
        
        Parameters:
        - uri: Token request URI
        - http_method: HTTP method
        - body: Request body
        - headers: Request headers  
        - credentials: Client credentials
        - grant_type_handler: Custom grant type handler
        
        Returns:
        Tuple of (headers, body, status_code) for token response
        """
    
    def validate_token_request(self, request) -> None:
        """Validate token request parameters."""

Resource Endpoint

OAuth 2.0 resource endpoint for validating access tokens when accessing protected resources. Verifies token validity, scope, and expiration.

class ResourceEndpoint:
    def __init__(self, default_token=None, token_generator=None, expires_in=None): ...
    
    def validate_protected_resource_request(
        self,
        uri: str,
        http_method: str = "GET",
        body: str | None = None,
        headers: dict[str, str] | None = None,
        scopes: list[str] | None = None,
    ) -> tuple[bool, dict]:
        """
        Validate protected resource request.
        
        Parameters:
        - uri: Resource request URI
        - http_method: HTTP method
        - body: Request body
        - headers: Request headers
        - scopes: Required scopes
        
        Returns:
        Tuple of (valid, request) where valid indicates if token is valid
        """

Revocation Endpoint

OAuth 2.0 token revocation endpoint (RFC 7009) for invalidating access and refresh tokens. Allows clients to revoke tokens when they're no longer needed.

class RevocationEndpoint:
    def __init__(self, request_validator, supported_token_types=None, enable_jsonp=False): ...
    
    def create_revocation_response(
        self,
        uri: str,
        http_method: str = "POST",
        body: str | None = None,
        headers: dict[str, str] | None = None,
    ) -> tuple[dict[str, str], str, int]:
        """
        Create token revocation response.
        
        Parameters:
        - uri: Revocation request URI
        - http_method: HTTP method
        - body: Request body containing token to revoke
        - headers: Request headers
        
        Returns:
        Tuple of (headers, body, status_code) for revocation response
        """
    
    def validate_revocation_request(self, request) -> None:
        """Validate token revocation request."""

Introspection Endpoint

OAuth 2.0 token introspection endpoint (RFC 7662) for checking token status and metadata. Allows resource servers to query token information.

class IntrospectEndpoint:
    def __init__(self, request_validator, supported_token_types=None): ...
    
    def create_introspect_response(
        self,
        uri: str,
        http_method: str = "POST",
        body: str | None = None,
        headers: dict[str, str] | None = None,
    ) -> tuple[dict[str, str], str, int]:
        """
        Create token introspection response.
        
        Parameters:
        - uri: Introspection request URI
        - http_method: HTTP method
        - body: Request body containing token to introspect
        - headers: Request headers
        
        Returns:
        Tuple of (headers, body, status_code) with token metadata
        """
    
    def validate_introspect_request(self, request) -> None:
        """Validate token introspection request."""

Metadata Endpoint

OAuth 2.0 authorization server metadata endpoint (RFC 8414) for publishing server capabilities and configuration. Enables automatic client configuration.

class MetadataEndpoint:
    def __init__(self, endpoints, claims=None, **kwargs): ...
    
    def create_metadata_response(
        self,
        uri: str,
        http_method: str = "GET",
        body: str | None = None,
        headers: dict[str, str] | None = None,
    ) -> tuple[dict[str, str], str, int]:
        """
        Create authorization server metadata response.
        
        Returns:
        Tuple of (headers, body, status_code) with server metadata JSON
        """

Pre-configured Servers

Complete OAuth 2.0 server implementations combining multiple endpoints for common deployment scenarios.

Web Application Server

class WebApplicationServer:
    def __init__(
        self,
        request_validator,
        token_expires_in=None,
        token_generator=None,
        refresh_token_generator=None,
        **kwargs,
    ):
        """
        Complete OAuth 2.0 server for web applications.
        
        Combines AuthorizationEndpoint, TokenEndpoint, and ResourceEndpoint
        to support the authorization code grant flow.
        """
    
    # Inherits methods from AuthorizationEndpoint, TokenEndpoint, ResourceEndPoint
    def create_authorization_response(self, uri, http_method="GET", body=None, headers=None, scopes=None, credentials=None): ...
    def create_token_response(self, uri, http_method="POST", body=None, headers=None, credentials=None, **kwargs): ...
    def validate_protected_resource_request(self, uri, http_method="GET", body=None, headers=None, scopes=None): ...

Mobile Application Server

class MobileApplicationServer:
    def __init__(
        self,
        request_validator,
        token_expires_in=None,
        token_generator=None,
        **kwargs,
    ):
        """
        OAuth 2.0 server for mobile applications.
        
        Supports the implicit grant flow for public clients
        that cannot securely store credentials.
        """

Legacy Application Server

class LegacyApplicationServer:
    def __init__(
        self,
        request_validator,
        token_expires_in=None,
        token_generator=None,
        refresh_token_generator=None,
        **kwargs,
    ):
        """
        OAuth 2.0 server for legacy applications.
        
        Supports the password credentials grant flow for
        trusted first-party applications.
        """

Backend Application Server

class BackendApplicationServer:
    def __init__(
        self,
        request_validator,
        token_expires_in=None,
        token_generator=None,
        **kwargs,
    ):
        """
        OAuth 2.0 server for backend applications.
        
        Supports the client credentials grant flow for
        machine-to-machine authentication.
        """

Base Server

class Server:
    def __init__(
        self,
        request_validator,
        token_expires_in=None,
        token_generator=None,
        refresh_token_generator=None,
        **kwargs,
    ):
        """
        Base OAuth 2.0 server supporting all grant types.
        
        Provides a flexible foundation for custom server implementations
        with support for authorization code, implicit, password credentials,
        and client credentials grants.
        """

Usage Examples

Authorization Code Flow Server

from oauthlib.oauth2 import WebApplicationServer, RequestValidator
from oauthlib.common import generate_token

class MyRequestValidator(RequestValidator):
    def validate_client_id(self, client_id, request, *args, **kwargs):
        # Check if client_id exists in your database
        return client_id in ['your-client-id', 'another-client']
    
    def authenticate_client(self, request, *args, **kwargs):
        # Authenticate client credentials
        client_id = getattr(request, 'client_id', None)
        client_secret = getattr(request, 'client_secret', None)
        return verify_client_credentials(client_id, client_secret)
    
    def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):
        # Validate redirect URI is registered for this client
        return redirect_uri in get_registered_redirect_uris(client_id)
    
    def get_default_scopes(self, client_id, request, *args, **kwargs):
        # Return default scopes for client
        return ['read']
    
    def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
        # Validate requested scopes
        allowed_scopes = get_client_scopes(client_id)
        return all(scope in allowed_scopes for scope in scopes)
    
    def save_authorization_code(self, client_id, code, request, *args, **kwargs):
        # Store authorization code in database
        store_authorization_code(client_id, code, request)
    
    def validate_code(self, client_id, code, client, request, *args, **kwargs):
        # Validate authorization code
        return is_valid_authorization_code(client_id, code)
    
    def confirm_redirect_uri(self, client_id, code, redirect_uri, client, request, *args, **kwargs):
        # Confirm redirect URI matches original request
        return get_code_redirect_uri(code) == redirect_uri
    
    def save_token(self, token, request, *args, **kwargs):
        # Store access token in database
        store_access_token(token, request)
    
    def validate_bearer_token(self, token, scopes, request):
        # Validate bearer token and scopes
        return is_valid_bearer_token(token, scopes)
    
    def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
        # Return default redirect URI for client
        return get_client_default_redirect_uri(client_id)

# Create server
validator = MyRequestValidator()
server = WebApplicationServer(validator)

# Handle authorization request
def handle_authorization(request):
    try:
        # Extract request details
        uri = request.url
        http_method = request.method
        body = request.body
        headers = dict(request.headers)
        
        # Check if user is authenticated and consented
        if not user_authenticated(request):
            return redirect_to_login()
        
        if not user_consented(request):
            return show_consent_form()
        
        # Create authorization response
        headers, body, status = server.create_authorization_response(
            uri, http_method, body, headers,
            scopes=request.args.get('scope', '').split(),
            credentials={'user_id': get_user_id(request)}
        )
        
        return Response(body, status=status, headers=headers)
    
    except OAuth2Error as e:
        return Response(e.json, status=e.status_code, 
                       headers={'Content-Type': 'application/json'})

# Handle token request
def handle_token(request):
    try:
        uri = request.url
        http_method = request.method
        body = request.body
        headers = dict(request.headers)
        
        headers, body, status = server.create_token_response(
            uri, http_method, body, headers
        )
        
        return Response(body, status=status, headers=headers)
    
    except OAuth2Error as e:
        return Response(e.json, status=e.status_code,
                       headers={'Content-Type': 'application/json'})

Protected Resource Validation

from oauthlib.oauth2 import ResourceEndpoint
from functools import wraps

# Create resource endpoint
resource_endpoint = ResourceEndpoint()

def require_oauth(*required_scopes):
    """Decorator to protect API endpoints with OAuth."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Extract request details
            uri = request.url
            http_method = request.method
            body = request.body
            headers = dict(request.headers)
            
            try:
                # Validate token and scopes
                valid, oauth_request = resource_endpoint.validate_protected_resource_request(
                    uri, http_method, body, headers, required_scopes
                )
                
                if not valid:
                    return Response('Unauthorized', status=401)
                
                # Add OAuth info to request context
                request.oauth = oauth_request
                
                return f(*args, **kwargs)
            
            except OAuth2Error as e:
                return Response(e.json, status=e.status_code,
                               headers={'Content-Type': 'application/json'})
        
        return decorated_function
    return decorator

# Usage in API endpoints
@app.route('/api/user/profile')
@require_oauth('profile:read')
def get_user_profile():
    user_id = request.oauth.user_id
    return jsonify(get_user_data(user_id))

@app.route('/api/user/settings', methods=['POST'])
@require_oauth('profile:write')
def update_user_settings():
    user_id = request.oauth.user_id
    update_user_data(user_id, request.json)
    return jsonify({'status': 'updated'})

Token Introspection

from oauthlib.oauth2 import IntrospectEndpoint

# Create introspection endpoint
introspect_endpoint = IntrospectEndpoint(validator)

def handle_introspect(request):
    """Handle token introspection requests from resource servers."""
    try:
        uri = request.url
        http_method = request.method
        body = request.body
        headers = dict(request.headers)
        
        headers, body, status = introspect_endpoint.create_introspect_response(
            uri, http_method, body, headers
        )
        
        return Response(body, status=status, headers=headers)
    
    except OAuth2Error as e:
        return Response(e.json, status=e.status_code,
                       headers={'Content-Type': 'application/json'})

Authorization Server Metadata

from oauthlib.oauth2 import MetadataEndpoint

# Create metadata endpoint
metadata = MetadataEndpoint([
    ('authorization_endpoint', 'https://auth.example.com/authorize'),
    ('token_endpoint', 'https://auth.example.com/token'),
    ('revocation_endpoint', 'https://auth.example.com/revoke'),
    ('introspection_endpoint', 'https://auth.example.com/introspect'),
], claims=[
    ('issuer', 'https://auth.example.com'),
    ('response_types_supported', ['code', 'token']),
    ('grant_types_supported', ['authorization_code', 'implicit', 'client_credentials']),
    ('token_endpoint_auth_methods_supported', ['client_secret_basic', 'client_secret_post']),
    ('scopes_supported', ['read', 'write', 'admin']),
])

def handle_metadata(request):
    """Serve authorization server metadata."""
    headers, body, status = metadata.create_metadata_response(
        request.url, request.method
    )
    return Response(body, status=status, headers=headers)

Server Configuration

Token Expiration

# Configure token expiration times
server = WebApplicationServer(
    validator,
    token_expires_in=3600,  # Access tokens expire in 1 hour
)

# Or use a function for dynamic expiration
def token_expires_in(request):
    if 'long_lived' in request.scopes:
        return 86400  # 24 hours for long-lived tokens
    return 3600  # 1 hour for regular tokens

server = WebApplicationServer(validator, token_expires_in=token_expires_in)

Custom Token Generators

from oauthlib.common import generate_token
import jwt

def custom_token_generator(request):
    """Generate JWT access tokens."""
    payload = {
        'user_id': request.user_id,
        'client_id': request.client_id,
        'scopes': request.scopes,
        'exp': datetime.utcnow() + timedelta(hours=1)
    }
    return jwt.encode(payload, 'secret-key', algorithm='HS256')

server = WebApplicationServer(
    validator,
    token_generator=custom_token_generator,
    refresh_token_generator=generate_token
)

Install with Tessl CLI

npx tessl i tessl/pypi-oauthlib

docs

common-utilities.md

device-flow.md

error-handling.md

index.md

oauth1.md

oauth2-clients.md

oauth2-servers.md

openid-connect.md

request-validation.md

token-management.md

tile.json