CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-authlib

The ultimate Python library in building OAuth and OpenID Connect servers and clients.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

oauth1.mddocs/

OAuth 1.0 Implementation

Complete OAuth 1.0 implementation following RFC 5849 with support for all signature methods (HMAC-SHA1, RSA-SHA1, PLAINTEXT) and signature types (header, query, body). Provides both client and server implementations with comprehensive support for temporary credentials and token credentials.

Capabilities

OAuth 1.0 Client

High-level OAuth 1.0 client implementation for performing the complete authorization flow with automatic signature generation and request signing.

class OAuth1Client:
    """OAuth 1.0 client implementation."""
    
    def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
        """
        Initialize OAuth 1.0 client.
        
        Args:
            client_key: Client identifier
            client_secret: Client secret (not needed for RSA-SHA1)
            token: OAuth token
            token_secret: OAuth token secret
            redirect_uri: Callback URI for authorization
            rsa_key: RSA private key for RSA-SHA1 signature method
            verifier: OAuth verifier from authorization callback
            signature_method: Signature method (HMAC-SHA1, RSA-SHA1, PLAINTEXT)
            signature_type: Where to include signature (AUTH_HEADER, QUERY, BODY)
            force_include_body: Whether to always include body in signature
        """

    def fetch_request_token(self, uri: str, realm: str = None) -> dict:
        """
        Fetch temporary credentials (request token).
        
        Args:
            uri: Request token endpoint URI
            realm: Optional authorization realm
            
        Returns:
            Dictionary containing oauth_token and oauth_token_secret
        """

    def create_authorization_url(self, uri: str, **request_kwargs) -> str:
        """
        Create authorization URL for user to visit.
        
        Args:
            uri: Authorization endpoint URI
            **request_kwargs: Additional parameters for authorization
            
        Returns:
            Authorization URL with embedded token
        """

    def fetch_access_token(self, uri: str, verifier: str = None) -> dict:
        """
        Exchange authorization verifier for access token.
        
        Args:
            uri: Access token endpoint URI
            verifier: OAuth verifier from authorization callback
            
        Returns:
            Dictionary containing oauth_token and oauth_token_secret
        """

    def get_request_token_authorization_url(self, url: str, **request_kwargs) -> str:
        """Get authorization URL (alias for create_authorization_url)."""

    def parse_authorization_response(self, url: str) -> dict:
        """
        Parse authorization callback response.
        
        Args:
            url: Callback URL with parameters
            
        Returns:
            Dictionary with parsed parameters
        """

OAuth 1.0 Server Components

Server-side components for implementing OAuth 1.0 authorization and resource servers.

class AuthorizationServer:
    """OAuth 1.0 authorization server."""
    
    def __init__(self, query_client: callable, query_token: callable, save_request_token: callable, save_verifier: callable, save_access_token: callable) -> None:
        """
        Initialize authorization server.
        
        Args:
            query_client: Function to query client by client_key
            query_token: Function to query token by token value
            save_request_token: Function to save temporary credentials
            save_verifier: Function to save authorization verifier
            save_access_token: Function to save access token
        """

    def create_request_token_response(self, uri: str, http_method: str = 'POST', body: str = None, headers: dict = None) -> tuple:
        """
        Create response for request token endpoint.
        
        Args:
            uri: Request URI
            http_method: HTTP method
            body: Request body
            headers: Request headers
            
        Returns:
            Tuple of (headers, body, status_code)
        """

    def create_authorization_response(self, uri: str, http_method: str = 'GET', body: str = None, headers: dict = None, credentials: dict = None) -> tuple:
        """
        Create response for authorization endpoint.
        
        Args:
            uri: Request URI
            http_method: HTTP method
            body: Request body
            headers: Request headers
            credentials: User credentials dictionary
            
        Returns:
            Tuple of (headers, body, status_code)
        """

    def create_access_token_response(self, uri: str, http_method: str = 'POST', body: str = None, headers: dict = None, credentials: dict = None) -> tuple:
        """
        Create response for access token endpoint.
        
        Args:
            uri: Request URI
            http_method: HTTP method
            body: Request body
            headers: Request headers
            credentials: Optional user credentials
            
        Returns:
            Tuple of (headers, body, status_code)
        """

    def validate_request_token_request(self, request: OAuth1Request) -> None:
        """Validate request token request."""

    def validate_authorization_request(self, request: OAuth1Request) -> None:
        """Validate authorization request."""

    def validate_access_token_request(self, request: OAuth1Request) -> None:
        """Validate access token request."""

class ResourceProtector:
    """OAuth 1.0 resource server protection."""
    
    def __init__(self, query_client: callable, query_token: callable) -> None:
        """
        Initialize resource protector.
        
        Args:
            query_client: Function to query client by client_key
            query_token: Function to query token by token value
        """

    def validate_request(self, uri: str, http_method: str, body: str = None, headers: dict = None) -> OAuth1Request:
        """
        Validate OAuth 1.0 signed request.
        
        Args:
            uri: Request URI
            http_method: HTTP method
            body: Request body
            headers: Request headers
            
        Returns:
            Validated OAuth1Request object
        """

Request Wrapper

OAuth 1.0 request representation with signature validation capabilities.

class OAuth1Request:
    """OAuth 1.0 request wrapper."""
    
    def __init__(self, uri: str, http_method: str = 'GET', body: str = None, headers: dict = None) -> None:
        """
        Initialize OAuth 1.0 request.
        
        Args:
            uri: Request URI
            http_method: HTTP method
            body: Request body
            headers: Request headers
        """

    @property
    def client_key(self) -> str:
        """Get client key from request."""

    @property
    def signature(self) -> str:
        """Get signature from request."""

    @property
    def signature_method(self) -> str:
        """Get signature method from request."""

    @property
    def token(self) -> str:
        """Get token from request."""

    def validate_timestamp_and_nonce(self, timestamp_lifetime: int = 600) -> None:
        """
        Validate timestamp and nonce.
        
        Args:
            timestamp_lifetime: Maximum age of timestamp in seconds
        """

    def validate_signature(self, client_secret: str, token_secret: str = '', rsa_key=None) -> None:
        """
        Validate request signature.
        
        Args:
            client_secret: Client secret
            token_secret: Token secret
            rsa_key: RSA public key for RSA-SHA1 verification
        """

Client Authentication

OAuth 1.0 client authentication for use with HTTP libraries.

class ClientAuth:
    """OAuth 1.0 client authentication."""
    
    def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
        """
        Initialize client authentication.
        
        Args:
            client_key: Client identifier
            client_secret: Client secret
            token: OAuth token
            token_secret: OAuth token secret
            signature_method: Signature method
            signature_type: Signature type
            rsa_key: RSA private key
            verifier: OAuth verifier
        """

    def __call__(self, request) -> object:
        """
        Apply OAuth 1.0 signature to request.
        
        Args:
            request: HTTP request object
            
        Returns:
            Modified request with OAuth signature
        """

Model Mixins

Mixins for database models to store OAuth 1.0 credentials.

class ClientMixin:
    """Mixin for OAuth 1.0 client model."""
    
    client_key: str  # Client identifier
    client_secret: str  # Client secret (may be None for RSA-SHA1)
    default_redirect_uri: str  # Default callback URI
    default_realms: list  # Default authorization realms
    
    def get_default_redirect_uri(self) -> str:
        """Get default redirect URI for this client."""
    
    def get_default_realms(self) -> list:
        """Get default realms for this client."""
    
    def validate_redirect_uri(self, redirect_uri: str) -> bool:
        """Validate if redirect URI is allowed for this client."""

class TemporaryCredentialMixin:
    """Mixin for temporary credential (request token) model."""
    
    client_key: str  # Client identifier
    oauth_token: str  # Temporary token
    oauth_token_secret: str  # Temporary token secret
    oauth_callback: str  # Callback URI
    oauth_verifier: str  # Authorization verifier
    
    def get_redirect_uri(self) -> str:
        """Get callback URI for this temporary credential."""
    
    def get_oauth_verifier(self) -> str:
        """Get OAuth verifier for this temporary credential."""

class TokenCredentialMixin:
    """Mixin for token credential (access token) model."""
    
    client_key: str  # Client identifier
    oauth_token: str  # Access token
    oauth_token_secret: str  # Access token secret
    user_id: str  # User identifier
    
    def get_user_id(self) -> str:
        """Get user ID associated with this token."""

class TemporaryCredential:
    """Temporary credential representation."""
    
    def __init__(self, client_key: str, oauth_token: str, oauth_token_secret: str, oauth_callback: str = None) -> None:
        """
        Initialize temporary credential.
        
        Args:
            client_key: Client identifier
            oauth_token: Temporary token
            oauth_token_secret: Temporary token secret
            oauth_callback: Callback URI
        """

Constants

OAuth 1.0 signature methods and types.

# Signature Methods
SIGNATURE_HMAC_SHA1: str = 'HMAC-SHA1'
SIGNATURE_RSA_SHA1: str = 'RSA-SHA1'  
SIGNATURE_PLAINTEXT: str = 'PLAINTEXT'

# Signature Types
SIGNATURE_TYPE_HEADER: str = 'AUTH_HEADER'
SIGNATURE_TYPE_QUERY: str = 'QUERY'
SIGNATURE_TYPE_BODY: str = 'BODY'

Usage Examples

Client Flow

from authlib.oauth1 import OAuth1Client

# Step 1: Initialize client
client = OAuth1Client(
    client_key='your-client-key',
    client_secret='your-client-secret',
    signature_method='HMAC-SHA1'
)

# Step 2: Get request token
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
print(f"Request token: {request_token['oauth_token']}")

# Step 3: Get authorization URL
client.token = request_token['oauth_token']
client.token_secret = request_token['oauth_token_secret']
auth_url = client.create_authorization_url('https://provider.com/oauth/authorize')
print(f"Visit: {auth_url}")

# Step 4: After user authorization, exchange for access token
client.verifier = 'verifier-from-callback'
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
print(f"Access token: {access_token['oauth_token']}")

# Step 5: Make authenticated requests
client.token = access_token['oauth_token']
client.token_secret = access_token['oauth_token_secret']
# Use client with requests library or make manual signed requests

Server Implementation

from authlib.oauth1 import AuthorizationServer, ResourceProtector

# Define callback functions
def query_client(client_key):
    # Return client object or None
    return get_client_by_key(client_key)

def query_token(token):
    # Return token object or None
    return get_token_by_value(token)

def save_request_token(token, request):
    # Save temporary credential
    store_request_token(token)

def save_verifier(token, verifier, request):
    # Save authorization verifier
    update_token_verifier(token, verifier)

def save_access_token(token, request):
    # Save access token
    store_access_token(token)

# Create authorization server
authorization_server = AuthorizationServer(
    query_client=query_client,
    query_token=query_token,
    save_request_token=save_request_token,
    save_verifier=save_verifier,
    save_access_token=save_access_token
)

# Handle request token endpoint
headers, body, status = authorization_server.create_request_token_response(
    uri='/oauth/request_token',
    http_method='POST',
    headers=request.headers
)

# Handle authorization endpoint
headers, body, status = authorization_server.create_authorization_response(
    uri='/oauth/authorize',
    http_method='GET',
    credentials={'user_id': current_user.id}
)

# Handle access token endpoint
headers, body, status = authorization_server.create_access_token_response(
    uri='/oauth/access_token',
    http_method='POST',
    headers=request.headers
)

Resource Protection

from authlib.oauth1 import ResourceProtector

# Create resource protector
resource_protector = ResourceProtector(
    query_client=query_client,
    query_token=query_token
)

# Protect API endpoints
def protected_resource():
    try:
        # Validate OAuth 1.0 request
        oauth_request = resource_protector.validate_request(
            uri=request.url,
            http_method=request.method,
            body=request.get_data(),
            headers=request.headers
        )
        
        # Access token is valid, process request
        token = oauth_request.token
        return f"Hello, token: {token}"
        
    except OAuth1Error as error:
        return {'error': error.error}, 401

Install with Tessl CLI

npx tessl i tessl/pypi-authlib

docs

common-utilities.md

django-integration.md

flask-integration.md

http-clients.md

index.md

jose.md

oauth1.md

oauth2.md

oidc.md

tile.json