CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-authlib

The ultimate Python library in building OAuth and OpenID Connect servers and clients.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

oauth2.mddocs/

OAuth 2.0 Implementation

Comprehensive OAuth 2.0 implementation following RFC 6749 and related RFCs. Supports all standard grant types: authorization code, implicit, resource owner password credentials, client credentials, and refresh token. Includes advanced features like PKCE (RFC 7636), device flow (RFC 8628), and bearer tokens (RFC 6750).

Capabilities

OAuth 2.0 Client

High-level OAuth 2.0 client implementation for performing authorization flows with automatic token management.

class OAuth2Client:
    """OAuth 2.0 client implementation."""
    
    def __init__(self, client_id: str, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
        """
        Initialize OAuth 2.0 client.
        
        Args:
            client_id: Client identifier
            client_secret: Client secret
            token_endpoint_auth_method: Token endpoint authentication method
            revocation_endpoint_auth_method: Revocation endpoint authentication method
            scope: Default scope
            redirect_uri: Default redirect URI
            token: Access token dictionary
            token_placement: Where to place token (header, body, uri)
            update_token: Callback for token updates
        """

    def create_authorization_url(self, authorization_endpoint: str, state: str = None, code_challenge: str = None, code_challenge_method: str = None, **kwargs) -> tuple:
        """
        Create authorization URL for authorization code flow.
        
        Args:
            authorization_endpoint: Authorization server's authorization endpoint
            state: CSRF protection state parameter
            code_challenge: PKCE code challenge
            code_challenge_method: PKCE code challenge method
            **kwargs: Additional authorization parameters
            
        Returns:
            Tuple of (authorization_url, state)
        """

    def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
        """
        Fetch access token from authorization server.
        
        Args:
            token_endpoint: Token endpoint URL
            code: Authorization code
            authorization_response: Full authorization response URL
            body: Additional request body
            auth: HTTP basic auth tuple
            username: Username for password grant
            password: Password for password grant
            **kwargs: Additional token parameters
            
        Returns:
            Token dictionary with access_token, token_type, etc.
        """

    def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
        """
        Refresh access token using refresh token.
        
        Args:
            token_endpoint: Token endpoint URL
            refresh_token: Refresh token (uses stored token if None)
            body: Additional request body
            auth: HTTP basic auth tuple
            **kwargs: Additional parameters
            
        Returns:
            New token dictionary
        """

    def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
        """
        Revoke access or refresh token.
        
        Args:
            revocation_endpoint: Revocation endpoint URL
            token: Token to revoke (uses stored token if None)
            token_type_hint: Hint about token type
            body: Additional request body
            auth: HTTP basic auth tuple
            **kwargs: Additional parameters
            
        Returns:
            Response dictionary
        """

    def introspect_token(self, introspection_endpoint: str, token: str = None, token_type_hint: str = None, **kwargs) -> dict:
        """
        Introspect token at authorization server.
        
        Args:
            introspection_endpoint: Introspection endpoint URL
            token: Token to introspect
            token_type_hint: Hint about token type
            **kwargs: Additional parameters
            
        Returns:
            Introspection response dictionary
        """

    def register_client(self, registration_endpoint: str, **kwargs) -> dict:
        """
        Register client dynamically.
        
        Args:
            registration_endpoint: Registration endpoint URL
            **kwargs: Client metadata
            
        Returns:
            Registration response with client credentials
        """

    @property
    def access_token(self) -> str:
        """Get current access token."""

    @property
    def token_type(self) -> str:
        """Get current token type."""

OAuth 2.0 Server Components

Server-side components for implementing OAuth 2.0 authorization servers.

class AuthorizationServer:
    """OAuth 2.0 authorization server."""
    
    def __init__(self, query_client: callable, save_token: callable = None) -> None:
        """
        Initialize authorization server.
        
        Args:
            query_client: Function to query client by client_id
            save_token: Function to save issued tokens
        """

    def register_grant(self, grant_cls: type, extensions: list = None) -> None:
        """
        Register a grant type.
        
        Args:
            grant_cls: Grant class to register
            extensions: List of grant extensions
        """

    def register_endpoint(self, endpoint: object) -> None:
        """
        Register an endpoint.
        
        Args:
            endpoint: Endpoint instance
        """

    def validate_consent_request(self, request: OAuth2Request, end_user: object = None) -> None:
        """
        Validate consent request at authorization endpoint.
        
        Args:
            request: OAuth2Request object
            end_user: End user object
        """

    def create_authorization_response(self, request: OAuth2Request, grant_user: callable) -> tuple:
        """
        Create authorization response.
        
        Args:
            request: OAuth2Request object
            grant_user: Function to grant authorization to user
            
        Returns:
            Tuple of (status_code, body, headers)
        """

    def create_token_response(self, request: OAuth2Request) -> tuple:
        """
        Create token response.
        
        Args:
            request: OAuth2Request object
            
        Returns:
            Tuple of (status_code, body, headers)
        """

    def create_revocation_response(self, request: OAuth2Request) -> tuple:
        """
        Create revocation response.
        
        Args:
            request: OAuth2Request object
            
        Returns:
            Tuple of (status_code, body, headers)
        """

class ResourceProtector:
    """OAuth 2.0 resource server protection."""
    
    def __init__(self) -> None:
        """Initialize resource protector."""

    def register_token_validator(self, validator: 'TokenValidator') -> None:
        """
        Register token validator.
        
        Args:
            validator: Token validator instance
        """

    def validate_request(self, scopes: list, request: OAuth2Request) -> 'OAuth2Token':
        """
        Validate OAuth 2.0 request.
        
        Args:
            scopes: Required scopes
            request: OAuth2Request object
            
        Returns:
            OAuth2Token object if valid
        """

    def acquire_token(self, scopes: list = None, request: OAuth2Request = None) -> 'OAuth2Token':
        """
        Acquire token from request.
        
        Args:
            scopes: Required scopes
            request: OAuth2Request object
            
        Returns:
            OAuth2Token object
        """

Request and Token Objects

Core objects for representing OAuth 2.0 requests and tokens.

class OAuth2Request:
    """OAuth 2.0 request wrapper."""
    
    def __init__(self, method: str, uri: str, body: str = None, headers: dict = None) -> None:
        """
        Initialize OAuth 2.0 request.
        
        Args:
            method: HTTP method
            uri: Request URI
            body: Request body
            headers: Request headers
        """

    @property
    def client_id(self) -> str:
        """Get client ID from request."""

    @property
    def client_secret(self) -> str:
        """Get client secret from request."""

    @property
    def redirect_uri(self) -> str:
        """Get redirect URI from request."""

    @property
    def scope(self) -> str:
        """Get scope from request."""

    @property
    def state(self) -> str:
        """Get state from request."""

    @property
    def response_type(self) -> str:
        """Get response type from request."""

    @property
    def grant_type(self) -> str:
        """Get grant type from request."""

class OAuth2Token:
    """OAuth 2.0 token representation."""
    
    def __init__(self, params: dict = None) -> None:
        """
        Initialize OAuth 2.0 token.
        
        Args:
            params: Token parameters dictionary
        """

    @property
    def access_token(self) -> str:
        """Get access token."""

    @property
    def token_type(self) -> str:
        """Get token type."""

    @property
    def refresh_token(self) -> str:
        """Get refresh token."""

    @property
    def expires_in(self) -> int:
        """Get token expiration time."""

    @property
    def scope(self) -> str:
        """Get token scope."""

    def is_expired(self) -> bool:
        """Check if token is expired."""

    def get_expires_at(self) -> int:
        """Get expiration timestamp."""

Grant Types

Implementation of standard OAuth 2.0 grant types.

class BaseGrant:
    """Base class for OAuth 2.0 grants."""
    
    def __init__(self, request: OAuth2Request, server: AuthorizationServer) -> None:
        """
        Initialize grant.
        
        Args:
            request: OAuth2Request object
            server: AuthorizationServer instance
        """

    def validate_request(self) -> None:
        """Validate the grant request."""

    def create_authorization_response(self, redirect_uri: str, grant_user: callable) -> dict:
        """Create authorization response."""

    def create_token_response(self) -> dict:
        """Create token response."""

class AuthorizationCodeGrant(BaseGrant):
    """Authorization code grant implementation."""
    
    GRANT_TYPE: str = 'authorization_code'
    RESPONSE_TYPES: list = ['code']
    
    def create_authorization_code(self, client: object, grant_user: callable, request: OAuth2Request) -> str:
        """
        Create authorization code.
        
        Args:
            client: Client object
            grant_user: Grant user function
            request: OAuth2Request object
            
        Returns:
            Authorization code string
        """

    def parse_authorization_code(self, code: str, client: object) -> object:
        """
        Parse and validate authorization code.
        
        Args:
            code: Authorization code
            client: Client object
            
        Returns:
            Authorization code object
        """

    def delete_authorization_code(self, authorization_code: object) -> None:
        """
        Delete authorization code after use.
        
        Args:
            authorization_code: Authorization code object
        """

    def authenticate_user(self, authorization_code: object) -> object:
        """
        Authenticate user from authorization code.
        
        Args:
            authorization_code: Authorization code object
            
        Returns:
            User object
        """

class ImplicitGrant(BaseGrant):
    """Implicit grant implementation."""
    
    GRANT_TYPE: str = None
    RESPONSE_TYPES: list = ['token']

class ResourceOwnerPasswordCredentialsGrant(BaseGrant):
    """Resource owner password credentials grant implementation."""
    
    GRANT_TYPE: str = 'password'
    
    def authenticate_user(self, username: str, password: str, client: object, request: OAuth2Request) -> object:
        """
        Authenticate user with username and password.
        
        Args:
            username: Username
            password: Password
            client: Client object
            request: OAuth2Request object
            
        Returns:
            User object if authenticated
        """

class ClientCredentialsGrant(BaseGrant):
    """Client credentials grant implementation."""
    
    GRANT_TYPE: str = 'client_credentials'

class RefreshTokenGrant(BaseGrant):
    """Refresh token grant implementation."""
    
    GRANT_TYPE: str = 'refresh_token'
    
    def authenticate_refresh_token(self, refresh_token: str, client: object, request: OAuth2Request) -> object:
        """
        Authenticate refresh token.
        
        Args:
            refresh_token: Refresh token string
            client: Client object
            request: OAuth2Request object
            
        Returns:
            Token object if valid
        """

    def authenticate_user(self, credential: object) -> object:
        """
        Authenticate user from refresh token credential.
        
        Args:
            credential: Token credential object
            
        Returns:
            User object
        """

    def revoke_old_credential(self, credential: object) -> None:
        """
        Revoke old refresh token.
        
        Args:
            credential: Token credential to revoke
        """

Bearer Token Support

RFC 6750 Bearer Token implementation.

class BearerTokenGenerator:
    """Generate bearer tokens."""
    
    def __init__(self, access_token_generator: callable = None, refresh_token_generator: callable = None, expires_generator: callable = None) -> None:
        """
        Initialize bearer token generator.
        
        Args:
            access_token_generator: Function to generate access tokens
            refresh_token_generator: Function to generate refresh tokens
            expires_generator: Function to generate expiration times
        """

    def generate(self, client: object, grant_type: str, user: object = None, scope: str = None, expires_in: int = None, include_refresh_token: bool = True) -> dict:
        """
        Generate bearer token.
        
        Args:
            client: Client object
            grant_type: Grant type used
            user: User object
            scope: Token scope
            expires_in: Token lifetime in seconds
            include_refresh_token: Whether to include refresh token
            
        Returns:
            Token dictionary
        """

class BearerTokenValidator:
    """Validate bearer tokens."""
    
    def authenticate_token(self, token_string: str) -> object:
        """
        Authenticate bearer token.
        
        Args:
            token_string: Bearer token string
            
        Returns:
            Token object if valid
        """

    def request_invalid(self, request: OAuth2Request) -> bool:
        """
        Check if request is invalid.
        
        Args:
            request: OAuth2Request object
            
        Returns:
            True if request is invalid
        """

    def token_revoked(self, token: object) -> bool:
        """
        Check if token is revoked.
        
        Args:
            token: Token object
            
        Returns:
            True if token is revoked
        """

def add_bearer_token(uri: str, http_method: str, body: str, headers: dict, token: dict) -> tuple:
    """
    Add bearer token to request.
    
    Args:
        uri: Request URI
        http_method: HTTP method
        body: Request body
        headers: Request headers
        token: Token dictionary
        
    Returns:
        Tuple of (uri, headers, body)
    """

PKCE Support

RFC 7636 Proof Key for Code Exchange implementation.

class CodeChallenge:
    """PKCE code challenge implementation."""
    
    def __init__(self, code_verifier: str, code_challenge_method: str = 'S256') -> None:
        """
        Initialize code challenge.
        
        Args:
            code_verifier: Code verifier string
            code_challenge_method: Challenge method (plain or S256)
        """

    def get_code_challenge(self) -> str:
        """Get code challenge string."""

    def get_code_challenge_method(self) -> str:
        """Get code challenge method."""

    def verify_code_verifier(self, code_verifier: str) -> bool:
        """
        Verify code verifier against challenge.
        
        Args:
            code_verifier: Code verifier to verify
            
        Returns:
            True if verifier is valid
        """

def create_s256_code_challenge(code_verifier: str) -> str:
    """
    Create S256 code challenge from verifier.
    
    Args:
        code_verifier: Code verifier string
        
    Returns:
        S256 code challenge string
    """

Device Authorization Flow

RFC 8628 Device Authorization Grant implementation.

class DeviceAuthorizationEndpoint:
    """Device authorization endpoint."""
    
    def __init__(self, server: AuthorizationServer) -> None:
        """
        Initialize device authorization endpoint.
        
        Args:
            server: AuthorizationServer instance
        """

    def create_endpoint_response(self, request: OAuth2Request) -> tuple:
        """
        Create device authorization response.
        
        Args:
            request: OAuth2Request object
            
        Returns:
            Tuple of (status_code, body, headers)
        """

class DeviceCodeGrant(BaseGrant):
    """Device code grant implementation."""
    
    GRANT_TYPE: str = 'urn:ietf:params:oauth:grant-type:device_code'
    
    def query_device_credential(self, device_code: str, client: object) -> object:
        """
        Query device credential by device code.
        
        Args:
            device_code: Device code
            client: Client object
            
        Returns:
            Device credential object
        """

    def should_slow_down(self, credential: object, now: int) -> bool:
        """
        Check if client should slow down polling.
        
        Args:
            credential: Device credential object
            now: Current timestamp
            
        Returns:
            True if client should slow down
        """

# Device flow constants
DEVICE_CODE_GRANT_TYPE: str = 'urn:ietf:params:oauth:grant-type:device_code'

# Device flow errors
class AuthorizationPendingError(OAuth2Error):
    """Authorization is pending user action."""
    error: str = 'authorization_pending'

class SlowDownError(OAuth2Error):
    """Client is polling too frequently."""
    error: str = 'slow_down'

Model Mixins

Mixins for database models to store OAuth 2.0 data.

class ClientMixin:
    """Mixin for OAuth 2.0 client model."""
    
    client_id: str  # Client identifier
    client_secret: str  # Client secret (may be None for public clients)
    client_id_issued_at: int  # Client ID issued timestamp
    client_secret_expires_at: int  # Client secret expiration timestamp
    
    def get_client_id(self) -> str:
        """Get client ID."""
    
    def get_default_redirect_uri(self) -> str:
        """Get default redirect URI."""
    
    def get_allowed_scope(self, scope: str) -> str:
        """Get allowed scope for client."""
    
    def check_redirect_uri(self, redirect_uri: str) -> bool:
        """Check if redirect URI is allowed."""
    
    def has_client_secret(self) -> bool:
        """Check if client has a secret."""
    
    def check_client_secret(self, client_secret: str) -> bool:
        """Verify client secret."""
    
    def check_token_endpoint_auth_method(self, method: str) -> bool:
        """Check if token endpoint auth method is supported."""
    
    def check_response_type(self, response_type: str) -> bool:
        """Check if response type is supported."""
    
    def check_grant_type(self, grant_type: str) -> bool:
        """Check if grant type is supported."""

class AuthorizationCodeMixin:
    """Mixin for authorization code model."""
    
    code: str  # Authorization code
    client_id: str  # Client identifier
    redirect_uri: str  # Redirect URI
    scope: str  # Authorized scope
    user_id: str  # User identifier
    code_challenge: str  # PKCE code challenge
    code_challenge_method: str  # PKCE challenge method
    
    def is_expired(self) -> bool:
        """Check if authorization code is expired."""
    
    def get_redirect_uri(self) -> str:
        """Get redirect URI."""
    
    def get_scope(self) -> str:
        """Get authorized scope."""
    
    def get_user_id(self) -> str:
        """Get user ID."""
    
    def get_code_challenge(self) -> str:
        """Get PKCE code challenge."""
    
    def get_code_challenge_method(self) -> str:
        """Get PKCE challenge method."""

class TokenMixin:
    """Mixin for access token model."""
    
    access_token: str  # Access token
    client_id: str  # Client identifier
    token_type: str  # Token type (usually 'Bearer')
    refresh_token: str  # Refresh token
    scope: str  # Token scope
    user_id: str  # User identifier
    issued_at: int  # Token issued timestamp
    expires_in: int  # Token lifetime in seconds
    
    def get_scope(self) -> str:
        """Get token scope."""
    
    def get_user_id(self) -> str:
        """Get user ID."""
    
    def is_expired(self) -> bool:
        """Check if token is expired."""
    
    def is_revoked(self) -> bool:
        """Check if token is revoked."""

Utility Functions

Helper functions for scope and parameter handling.

def scope_to_list(scope: str) -> list:
    """
    Convert scope string to list.
    
    Args:
        scope: Space-separated scope string
        
    Returns:
        List of scope values
    """

def list_to_scope(scopes: list) -> str:
    """
    Convert scope list to string.
    
    Args:
        scopes: List of scope values
        
    Returns:
        Space-separated scope string
    """

Usage Examples

Authorization Code Flow

from authlib.oauth2 import OAuth2Client

# Initialize client
client = OAuth2Client(
    client_id='your-client-id',
    client_secret='your-client-secret',
    scope='read write'
)

# Step 1: Generate authorization URL
authorization_url, state = client.create_authorization_url(
    'https://provider.com/authorize',
    state='random-state-string'
)
print(f"Visit: {authorization_url}")

# Step 2: Exchange code for token
token = client.fetch_token(
    'https://provider.com/token',
    code='authorization-code-from-callback'
)
print(f"Access token: {token['access_token']}")

# Step 3: Refresh token when needed
if 'refresh_token' in token:
    new_token = client.refresh_token(
        'https://provider.com/token',
        refresh_token=token['refresh_token']
    )

PKCE Flow

from authlib.oauth2 import OAuth2Client, create_s256_code_challenge
from authlib.common.security import generate_token

# Generate PKCE parameters
code_verifier = generate_token(128)
code_challenge = create_s256_code_challenge(code_verifier)

# Create authorization URL with PKCE
client = OAuth2Client(client_id='public-client-id')
auth_url, state = client.create_authorization_url(
    'https://provider.com/authorize',
    code_challenge=code_challenge,
    code_challenge_method='S256'
)

# Exchange code with verifier
token = client.fetch_token(
    'https://provider.com/token',
    code='authorization-code',
    code_verifier=code_verifier
)

Server Implementation

from authlib.oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749.grants import AuthorizationCodeGrant

# Define query functions
def query_client(client_id):
    return get_client_by_id(client_id)

def save_token(token, request, *args, **kwargs):
    store_access_token(token)

# Create authorization server
authorization_server = AuthorizationServer(
    query_client=query_client,
    save_token=save_token
)

# Register authorization code grant
authorization_server.register_grant(AuthorizationCodeGrant)

# Handle authorization endpoint
@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
    request = OAuth2Request(request.method, request.url, request.form, request.headers)
    
    if request.method == 'GET':
        # Show authorization form
        try:
            authorization_server.validate_consent_request(request, current_user)
            return render_template('authorize.html')
        except OAuth2Error as error:
            return {'error': error.error}, 400
    
    # Handle authorization
    def grant_user():
        return current_user
    
    status_code, body, headers = authorization_server.create_authorization_response(
        request, grant_user
    )
    return Response(body, status=status_code, headers=headers)

# Handle token endpoint
@app.route('/token', methods=['POST'])
def issue_token():
    request = OAuth2Request(request.method, request.url, request.form, request.headers)
    status_code, body, headers = authorization_server.create_token_response(request)
    return Response(body, status=status_code, headers=headers)

Install with Tessl CLI

npx tessl i tessl/pypi-authlib

docs

common-utilities.md

django-integration.md

flask-integration.md

http-clients.md

index.md

jose.md

oauth1.md

oauth2.md

oidc.md

tile.json