CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-authlib

The ultimate Python library in building OAuth and OpenID Connect servers and clients.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

http-clients.mddocs/

HTTP Client Integrations

Seamless integration with popular HTTP clients including Requests and HTTPX for both synchronous and asynchronous OAuth operations. Provides automatic token management, request signing, and support for all OAuth flows with minimal configuration.

Capabilities

Requests Integration

Complete OAuth 1.0 and OAuth 2.0 integration with the popular Requests library.

class OAuth1Session:
    """Requests OAuth 1.0 session."""
    
    def __init__(self, client_key: str, client_secret: str = None, resource_owner_key: str = None, resource_owner_secret: str = None, callback_uri: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, client_class=None, force_include_body: bool = False, **request_kwargs) -> None:
        """
        Initialize OAuth 1.0 session.
        
        Args:
            client_key: Client identifier
            client_secret: Client secret
            resource_owner_key: Resource owner token
            resource_owner_secret: Resource owner token secret
            callback_uri: Callback URI for authorization
            signature_method: Signature method (HMAC-SHA1, RSA-SHA1, PLAINTEXT)
            signature_type: Signature type (AUTH_HEADER, QUERY, BODY)
            rsa_key: RSA private key for RSA-SHA1
            verifier: OAuth verifier
            client_class: Custom OAuth1Client class
            force_include_body: Force include body in signature
            **request_kwargs: Additional requests session arguments
        """

    def fetch_request_token(self, request_token_url: str, realm: str = None) -> dict:
        """
        Fetch request token from authorization server.
        
        Args:
            request_token_url: Request token endpoint URL
            realm: Authorization realm
            
        Returns:
            Dictionary with oauth_token and oauth_token_secret
        """

    def parse_authorization_response_url(self, authorization_response_url: str) -> dict:
        """
        Parse authorization callback response.
        
        Args:
            authorization_response_url: Full callback URL with parameters
            
        Returns:
            Dictionary with parsed parameters
        """

    def fetch_access_token(self, access_token_url: str, verifier: str = None) -> dict:
        """
        Exchange authorization verifier for access token.
        
        Args:
            access_token_url: Access token endpoint URL
            verifier: OAuth verifier from callback
            
        Returns:
            Dictionary with oauth_token and oauth_token_secret
        """

class OAuth1Auth:
    """Requests OAuth 1.0 authentication handler."""
    
    def __init__(self, client_key: str, client_secret: str = None, resource_owner_key: str = None, resource_owner_secret: str = None, callback_uri: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
        """
        Initialize OAuth 1.0 auth handler.
        
        Args:
            client_key: Client identifier
            client_secret: Client secret
            resource_owner_key: Resource owner token
            resource_owner_secret: Resource owner token secret
            callback_uri: Callback URI
            signature_method: Signature method
            signature_type: Signature type
            rsa_key: RSA private key
            verifier: OAuth verifier
        """

    def __call__(self, request: PreparedRequest) -> PreparedRequest:
        """
        Apply OAuth 1.0 signature to request.
        
        Args:
            request: Prepared request object
            
        Returns:
            Request with OAuth signature applied
        """

class OAuth2Session(requests.Session):
    """Requests OAuth 2.0 session."""
    
    def __init__(self, client_id: str = None, client: OAuth2Client = None, auto_refresh_url: str = None, auto_refresh_kwargs: dict = None, token_updater: callable = None, **kwargs) -> None:
        """
        Initialize OAuth 2.0 session.
        
        Args:
            client_id: Client identifier
            client: OAuth2Client instance
            auto_refresh_url: URL for automatic token refresh
            auto_refresh_kwargs: Arguments for token refresh
            token_updater: Callback for token updates
            **kwargs: Additional session arguments
        """

    def new_state(self) -> str:
        """
        Generate new state parameter for CSRF protection.
        
        Returns:
            Random state string
        """

    def authorization_url(self, authorization_endpoint: str, state: str = None, **kwargs) -> tuple:
        """
        Generate authorization URL.
        
        Args:
            authorization_endpoint: Authorization server's authorization endpoint
            state: CSRF protection state (generated if None)
            **kwargs: Additional authorization parameters
            
        Returns:
            Tuple of (authorization_url, state)
        """

    def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, method: str = 'POST', timeout: int = None, headers: dict = None, verify: bool = True, proxies: dict = None, include_client_id: bool = None, **kwargs) -> dict:
        """
        Fetch access token from authorization server.
        
        Args:
            token_endpoint: Token endpoint URL
            code: Authorization code
            authorization_response: Full authorization response URL
            body: Additional request body
            auth: HTTP basic auth tuple
            username: Username for password grant
            password: Password for password grant
            method: HTTP method
            timeout: Request timeout
            headers: HTTP headers
            verify: SSL verification
            proxies: HTTP proxies
            include_client_id: Include client ID in request
            **kwargs: Additional token parameters
            
        Returns:
            Token dictionary
        """

    def token_from_fragment(self, authorization_response_url: str) -> dict:
        """
        Extract token from URL fragment (implicit flow).
        
        Args:
            authorization_response_url: Authorization response URL with fragment
            
        Returns:
            Token dictionary
        """

    def refresh_token(self, token_url: str, refresh_token: str = None, body: str = '', auth: tuple = None, timeout: int = None, headers: dict = None, verify: bool = True, **kwargs) -> dict:
        """
        Refresh access token.
        
        Args:
            token_url: Token endpoint URL
            refresh_token: Refresh token (uses stored if None)
            body: Additional request body
            auth: HTTP basic auth tuple
            timeout: Request timeout
            headers: HTTP headers
            verify: SSL verification
            **kwargs: Additional parameters
            
        Returns:
            New token dictionary
        """

    def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, timeout: int = None, headers: dict = None, verify: bool = True, **kwargs) -> dict:
        """
        Revoke access or refresh token.
        
        Args:
            revocation_endpoint: Revocation endpoint URL
            token: Token to revoke
            token_type_hint: Hint about token type
            body: Additional request body
            auth: HTTP basic auth tuple
            timeout: Request timeout
            headers: HTTP headers
            verify: SSL verification
            **kwargs: Additional parameters
            
        Returns:
            Response dictionary
        """

class OAuth2Auth:
    """Requests OAuth 2.0 authentication handler."""
    
    def __init__(self, client_id: str = None, client: OAuth2Client = None, token: dict = None) -> None:
        """
        Initialize OAuth 2.0 auth handler.
        
        Args:
            client_id: Client identifier
            client: OAuth2Client instance
            token: Access token dictionary
        """

    def __call__(self, request: PreparedRequest) -> PreparedRequest:
        """
        Apply OAuth 2.0 token to request.
        
        Args:
            request: Prepared request object
            
        Returns:
            Request with OAuth token applied
        """

class AssertionSession:
    """JWT assertion session for OAuth 2.0."""
    
    def __init__(self, grant_type: str, assertion: str, issuer: str, audience: str, subject: str = None, **kwargs) -> None:
        """
        Initialize JWT assertion session.
        
        Args:
            grant_type: Assertion grant type
            assertion: JWT assertion string
            issuer: Token issuer
            audience: Token audience
            subject: Token subject
            **kwargs: Additional session arguments
        """

    def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
        """
        Fetch token using JWT assertion.
        
        Args:
            token_endpoint: Token endpoint URL
            **kwargs: Additional parameters
            
        Returns:
            Token dictionary
        """

HTTPX Integration

Complete OAuth 1.0 and OAuth 2.0 integration with HTTPX for both sync and async operations.

class OAuth1Client(httpx.Client):
    """HTTPX OAuth 1.0 client."""
    
    def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
        """
        Initialize HTTPX OAuth 1.0 client.
        
        Args:
            client_key: Client identifier
            client_secret: Client secret
            token: OAuth token
            token_secret: OAuth token secret
            redirect_uri: Redirect URI
            rsa_key: RSA private key
            verifier: OAuth verifier
            signature_method: Signature method
            signature_type: Signature type
            force_include_body: Force include body in signature
            **kwargs: Additional HTTPX client arguments
        """

    def fetch_request_token(self, url: str, realm: str = None, **kwargs) -> dict:
        """
        Fetch request token.
        
        Args:
            url: Request token endpoint URL
            realm: Authorization realm
            **kwargs: Additional request arguments
            
        Returns:
            Dictionary with oauth_token and oauth_token_secret
        """

    def create_authorization_url(self, url: str, **kwargs) -> str:
        """
        Create authorization URL.
        
        Args:
            url: Authorization endpoint URL
            **kwargs: Additional authorization parameters
            
        Returns:
            Authorization URL
        """

    def fetch_access_token(self, url: str, verifier: str = None, **kwargs) -> dict:
        """
        Fetch access token.
        
        Args:
            url: Access token endpoint URL
            verifier: OAuth verifier
            **kwargs: Additional request arguments
            
        Returns:
            Dictionary with oauth_token and oauth_token_secret
        """

class AsyncOAuth1Client(httpx.AsyncClient):
    """HTTPX async OAuth 1.0 client."""
    
    def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
        """Initialize async OAuth 1.0 client with same parameters as sync version."""

    async def fetch_request_token(self, url: str, realm: str = None, **kwargs) -> dict:
        """Async version of fetch_request_token."""

    async def fetch_access_token(self, url: str, verifier: str = None, **kwargs) -> dict:
        """Async version of fetch_access_token."""

class OAuth1Auth:
    """HTTPX OAuth 1.0 authentication handler."""
    
    def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
        """
        Initialize OAuth 1.0 auth handler.
        
        Args:
            client_key: Client identifier
            client_secret: Client secret
            token: OAuth token
            token_secret: OAuth token secret
            signature_method: Signature method
            signature_type: Signature type
            rsa_key: RSA private key
            verifier: OAuth verifier
        """

    def auth_flow(self, request: httpx.Request) -> httpx.Request:
        """
        Apply OAuth 1.0 signature to HTTPX request.
        
        Args:
            request: HTTPX request object
            
        Returns:
            Request with OAuth signature applied
        """

class OAuth2Client(httpx.Client):
    """HTTPX OAuth 2.0 client."""
    
    def __init__(self, client_id: str = None, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
        """
        Initialize HTTPX OAuth 2.0 client.
        
        Args:
            client_id: Client identifier
            client_secret: Client secret
            token_endpoint_auth_method: Token endpoint auth method
            revocation_endpoint_auth_method: Revocation endpoint auth method
            scope: Default scope
            redirect_uri: Default redirect URI
            token: Access token dictionary
            token_placement: Token placement (header, body, uri)
            update_token: Token update callback
            **kwargs: Additional HTTPX client arguments
        """

    def create_authorization_url(self, authorization_endpoint: str, state: str = None, code_challenge: str = None, code_challenge_method: str = None, **kwargs) -> tuple:
        """
        Create authorization URL.
        
        Args:
            authorization_endpoint: Authorization endpoint URL
            state: CSRF state parameter
            code_challenge: PKCE code challenge
            code_challenge_method: PKCE challenge method
            **kwargs: Additional authorization parameters
            
        Returns:
            Tuple of (authorization_url, state)
        """

    def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
        """
        Fetch access token.
        
        Args:
            token_endpoint: Token endpoint URL
            code: Authorization code
            authorization_response: Authorization response URL
            body: Additional request body
            auth: HTTP basic auth tuple
            username: Username for password grant
            password: Password for password grant
            **kwargs: Additional token parameters
            
        Returns:
            Token dictionary
        """

    def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
        """
        Refresh access token.
        
        Args:
            token_endpoint: Token endpoint URL
            refresh_token: Refresh token
            body: Additional request body
            auth: HTTP basic auth tuple
            **kwargs: Additional parameters
            
        Returns:
            New token dictionary
        """

    def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
        """
        Revoke token.
        
        Args:
            revocation_endpoint: Revocation endpoint URL
            token: Token to revoke
            token_type_hint: Token type hint
            body: Additional request body
            auth: HTTP basic auth tuple
            **kwargs: Additional parameters
            
        Returns:
            Response dictionary
        """

class AsyncOAuth2Client(httpx.AsyncClient):
    """HTTPX async OAuth 2.0 client."""
    
    def __init__(self, client_id: str = None, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
        """Initialize async OAuth 2.0 client with same parameters as sync version."""

    async def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
        """Async version of fetch_token."""

    async def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
        """Async version of refresh_token."""

    async def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
        """Async version of revoke_token."""

class OAuth2Auth:
    """HTTPX OAuth 2.0 authentication handler."""
    
    def __init__(self, client_id: str = None, client_secret: str = None, token: dict = None, token_placement: str = 'header', **kwargs) -> None:
        """
        Initialize OAuth 2.0 auth handler.
        
        Args:
            client_id: Client identifier
            client_secret: Client secret
            token: Access token dictionary
            token_placement: Token placement (header, body, uri)
        """

    def auth_flow(self, request: httpx.Request) -> httpx.Request:
        """
        Apply OAuth 2.0 token to HTTPX request.
        
        Args:
            request: HTTPX request object
            
        Returns:
            Request with OAuth token applied
        """

class OAuth2ClientAuth:
    """HTTPX OAuth 2.0 client authentication."""
    
    def __init__(self, client_id: str, client_secret: str = None, method: str = 'client_secret_basic') -> None:
        """
        Initialize client authentication.
        
        Args:
            client_id: Client identifier
            client_secret: Client secret
            method: Authentication method
        """

    def auth_flow(self, request: httpx.Request) -> httpx.Request:
        """
        Apply client authentication to request.
        
        Args:
            request: HTTPX request object
            
        Returns:
            Request with client authentication applied
        """

class AssertionClient(httpx.Client):
    """HTTPX JWT assertion client."""
    
    def __init__(self, grant_type: str, assertion: str, **kwargs) -> None:
        """
        Initialize JWT assertion client.
        
        Args:
            grant_type: Assertion grant type
            assertion: JWT assertion string
            **kwargs: Additional client arguments
        """

    def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
        """
        Fetch token using JWT assertion.
        
        Args:
            token_endpoint: Token endpoint URL
            **kwargs: Additional parameters
            
        Returns:
            Token dictionary
        """

class AsyncAssertionClient(httpx.AsyncClient):
    """HTTPX async JWT assertion client."""
    
    def __init__(self, grant_type: str, assertion: str, **kwargs) -> None:
        """Initialize async assertion client with same parameters as sync version."""

    async def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
        """Async version of fetch_token."""

Usage Examples

Requests OAuth 2.0 Client

from authlib.integrations.requests_client import OAuth2Session
import requests

# Basic OAuth 2.0 flow
client = OAuth2Session(
    client_id='your-client-id',
    redirect_uri='https://your-app.com/callback'
)

# Step 1: Get authorization URL
authorization_url, state = client.authorization_url(
    'https://provider.com/authorize',
    scope='read write'
)
print(f'Please visit: {authorization_url}')

# Step 2: Exchange authorization code for token
authorization_response = input('Paste the full redirect URL here:')
token = client.fetch_token(
    'https://provider.com/token',
    authorization_response=authorization_response,
    client_secret='your-client-secret'
)

# Step 3: Make authenticated requests
response = client.get('https://api.provider.com/user')
user_data = response.json()

# Automatic token refresh
def token_saver(token):
    # Save token to database or file
    save_token_to_storage(token)

client = OAuth2Session(
    client_id='your-client-id',
    token=stored_token,
    auto_refresh_url='https://provider.com/token',
    auto_refresh_kwargs={'client_id': 'your-client-id', 'client_secret': 'your-client-secret'},
    token_updater=token_saver
)

# Token will be automatically refreshed if expired
response = client.get('https://api.provider.com/user')

Requests OAuth 1.0 Client

from authlib.integrations.requests_client import OAuth1Session

# OAuth 1.0 flow
client = OAuth1Session(
    client_key='your-client-key',
    client_secret='your-client-secret'
)

# Step 1: Fetch request token
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')

# Step 2: Get authorization URL
authorization_url = f"https://provider.com/oauth/authorize?oauth_token={request_token['oauth_token']}"
print(f'Please visit: {authorization_url}')

# Step 3: Get verifier from user
verifier = input('Please enter the verifier:')
client.verifier = verifier

# Step 4: Fetch access token
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')

# Step 5: Make authenticated requests
response = client.get('https://api.provider.com/user')
user_data = response.json()

HTTPX OAuth 2.0 Client

from authlib.integrations.httpx_client import OAuth2Client
import httpx

# Synchronous OAuth 2.0 client
with OAuth2Client(
    client_id='your-client-id',
    client_secret='your-client-secret'
) as client:
    
    # Authorization code flow  
    authorization_url, state = client.create_authorization_url(
        'https://provider.com/authorize',
        scope='read write'
    )
    
    # After user authorization
    token = client.fetch_token(
        'https://provider.com/token',
        code='authorization-code-from-callback'
    )
    
    # Make authenticated requests
    response = client.get('https://api.provider.com/user')
    user_data = response.json()

# Asynchronous OAuth 2.0 client
import asyncio

async def oauth_flow():
    async with AsyncOAuth2Client(
        client_id='your-client-id',
        client_secret='your-client-secret'
    ) as client:
        
        # Fetch token using client credentials
        token = await client.fetch_token(
            'https://provider.com/token',
            grant_type='client_credentials',
            scope='api:read'
        )
        
        # Make authenticated requests
        response = await client.get('https://api.provider.com/data')
        data = response.json()
        return data

# Run async function
data = asyncio.run(oauth_flow())

HTTPX OAuth 1.0 Client

from authlib.integrations.httpx_client import OAuth1Client, AsyncOAuth1Client

# Synchronous OAuth 1.0
with OAuth1Client(
    client_key='your-client-key',
    client_secret='your-client-secret'
) as client:
    
    # Complete OAuth 1.0 flow
    request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
    
    auth_url = client.create_authorization_url(
        'https://provider.com/oauth/authorize'
    )
    
    # After user authorization
    client.verifier = 'user-provided-verifier'
    access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
    
    # Make authenticated requests
    response = client.get('https://api.provider.com/user')

# Asynchronous OAuth 1.0
async def async_oauth1_flow():
    async with AsyncOAuth1Client(
        client_key='your-client-key',
        client_secret='your-client-secret'
    ) as client:
        
        request_token = await client.fetch_request_token(
            'https://provider.com/oauth/request_token'
        )
        
        # Handle authorization...
        
        access_token = await client.fetch_access_token(
            'https://provider.com/oauth/access_token',
            verifier='user-verifier'
        )
        
        response = await client.get('https://api.provider.com/user')
        return response.json()

user_data = asyncio.run(async_oauth1_flow())

Custom Authentication Handlers

import requests
from authlib.integrations.requests_client import OAuth2Auth

# Using OAuth2Auth with regular requests
auth = OAuth2Auth(token={'access_token': 'your-access-token', 'token_type': 'Bearer'})

response = requests.get('https://api.provider.com/user', auth=auth)

# Custom token placement
auth = OAuth2Auth(
    token={'access_token': 'your-access-token', 'token_type': 'Bearer'},
    token_placement='uri'  # Add token as query parameter
)

response = requests.get('https://api.provider.com/user', auth=auth)

JWT Assertion Flow

from authlib.integrations.requests_client import AssertionSession
from authlib.jose import JsonWebToken
import time

# Create JWT assertion
jwt = JsonWebToken(['RS256'])
header = {'alg': 'RS256'}
payload = {
    'iss': 'your-client-id',
    'sub': 'your-client-id', 
    'aud': 'https://provider.com/token',
    'iat': int(time.time()),
    'exp': int(time.time()) + 3600
}
assertion = jwt.encode(header, payload, private_key)

# Use assertion to get token
session = AssertionSession(
    grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer',
    assertion=assertion,
    issuer='your-client-id',
    audience='https://provider.com/token'
)

token = session.fetch_token('https://provider.com/token')

# Make authenticated requests
response = session.get('https://api.provider.com/user')

Error Handling

from authlib.integrations.requests_client import OAuth2Session
from authlib.common.errors import AuthlibHTTPError
from requests.exceptions import HTTPError

client = OAuth2Session(client_id='your-client-id')

try:
    token = client.fetch_token(
        'https://provider.com/token',
        code='invalid-code',
        client_secret='your-client-secret'
    )
except AuthlibHTTPError as error:
    print(f'OAuth error: {error.error}')
    print(f'Description: {error.description}')
    print(f'Status code: {error.status_code}')
except HTTPError as error:
    print(f'HTTP error: {error.response.status_code}')

Token Management

import json
from authlib.integrations.requests_client import OAuth2Session

def load_token():
    try:
        with open('token.json', 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return None

def save_token(token):
    with open('token.json', 'w') as f:
        json.dump(token, f)

# Load existing token
token = load_token()

client = OAuth2Session(
    client_id='your-client-id',
    token=token,
    auto_refresh_url='https://provider.com/token',
    auto_refresh_kwargs={'client_secret': 'your-client-secret'},
    token_updater=save_token
)

# Token will be automatically saved when refreshed
response = client.get('https://api.provider.com/user')

Install with Tessl CLI

npx tessl i tessl/pypi-authlib

docs

common-utilities.md

django-integration.md

flask-integration.md

http-clients.md

index.md

jose.md

oauth1.md

oauth2.md

oidc.md

tile.json