CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-authlib

The ultimate Python library in building OAuth and OpenID Connect servers and clients.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

django-integration.mddocs/

Django Integration

Django-specific OAuth 2.0 server implementation with seamless Django model integration, middleware support, and Django-specific request/response handling. Provides comprehensive support for Django's ORM, authentication system, and HTTP request/response cycle.

Capabilities

Django OAuth 2.0 Server

Django-specific OAuth 2.0 authorization server implementation optimized for Django applications.

class AuthorizationServer:
    """Django OAuth 2.0 authorization server."""
    
    def __init__(self, query_client: callable, save_token: callable) -> None:
        """
        Initialize Django authorization server.
        
        Args:
            query_client: Function to query client by client_id
            save_token: Function to save issued tokens
        """

    def register_grant(self, grant_cls: type, extensions: list = None) -> None:
        """
        Register a grant type.
        
        Args:
            grant_cls: Grant class to register
            extensions: List of grant extensions
        """

    def register_endpoint(self, endpoint: object) -> None:
        """
        Register an endpoint.
        
        Args:
            endpoint: Endpoint instance
        """

    def create_authorization_response(self, request: HttpRequest, grant_user: callable = None) -> HttpResponse:
        """
        Create authorization response for Django.
        
        Args:
            request: Django HttpRequest object
            grant_user: Function to grant authorization to user
            
        Returns:
            Django HttpResponse object
        """

    def create_token_response(self, request: HttpRequest) -> HttpResponse:
        """
        Create token response for Django.
        
        Args:
            request: Django HttpRequest object
            
        Returns:
            Django HttpResponse object
        """

    def create_revocation_response(self, request: HttpRequest) -> HttpResponse:
        """
        Create revocation response for Django.
        
        Args:
            request: Django HttpRequest object
            
        Returns:
            Django HttpResponse object
        """

    def create_introspection_response(self, request: HttpRequest) -> HttpResponse:
        """
        Create introspection response for Django.
        
        Args:
            request: Django HttpRequest object
            
        Returns:
            Django HttpResponse object
        """

    def validate_consent_request(self, request: HttpRequest, end_user=None) -> None:
        """
        Validate consent request at authorization endpoint.
        
        Args:
            request: Django HttpRequest object
            end_user: End user object (Django User model)
        """

Django Resource Protection

Django-specific OAuth 2.0 resource server protection with Django model integration.

class ResourceProtector:
    """Django OAuth 2.0 resource protector."""
    
    def __init__(self, require_oauth: callable = None) -> None:
        """
        Initialize Django resource protector.
        
        Args:
            require_oauth: Optional default OAuth requirement function
        """

    def register_token_validator(self, validator: 'BearerTokenValidator') -> None:
        """
        Register bearer token validator.
        
        Args:
            validator: Token validator instance
        """

    def __call__(self, scopes: list = None, optional: bool = False) -> callable:
        """
        Decorator for protecting Django views.
        
        Args:
            scopes: Required scopes
            optional: Whether protection is optional
            
        Returns:
            View decorator function
        """

    def acquire_token(self, request: HttpRequest, scopes: list = None, raise_error: bool = True) -> 'OAuth2Token':
        """
        Acquire token from Django request.
        
        Args:
            request: Django HttpRequest object
            scopes: Required scopes
            raise_error: Whether to raise error if token invalid
            
        Returns:
            OAuth2Token object if valid
        """

    def validate_request(self, request: HttpRequest, scopes: list = None) -> 'OAuth2Token':
        """
        Validate OAuth 2.0 request.
        
        Args:
            request: Django HttpRequest object
            scopes: Required scopes
            
        Returns:
            OAuth2Token object if valid
        """

Django Bearer Token Validator

Django-specific bearer token validator with ORM integration.

class BearerTokenValidator:
    """Django bearer token validator."""
    
    def __init__(self, token_model: type = None, realm: str = None) -> None:
        """
        Initialize Django bearer token validator.
        
        Args:
            token_model: Django model class for tokens
            realm: OAuth realm for WWW-Authenticate header
        """

    def authenticate_token(self, token_string: str) -> object:
        """
        Authenticate bearer token using Django ORM.
        
        Args:
            token_string: Bearer token string
            
        Returns:
            Token model instance if valid
        """

    def request_invalid(self, request: HttpRequest) -> bool:
        """
        Check if Django request is invalid.
        
        Args:
            request: Django HttpRequest object
            
        Returns:
            True if request is invalid
        """

    def token_revoked(self, token: object) -> bool:
        """
        Check if token is revoked.
        
        Args:
            token: Token model instance
            
        Returns:
            True if token is revoked
        """

    def token_expired(self, token: object) -> bool:
        """
        Check if token is expired.
        
        Args:
            token: Token model instance
            
        Returns:
            True if token is expired
        """

    def get_token_scopes(self, token: object) -> list:
        """
        Get token scopes from Django model.
        
        Args:
            token: Token model instance
            
        Returns:
            List of scope strings
        """

Django Revocation Endpoint

OAuth 2.0 token revocation endpoint for Django.

class RevocationEndpoint:
    """Django OAuth 2.0 revocation endpoint."""
    
    def __init__(self, query_token: callable, revoke_token: callable) -> None:
        """
        Initialize Django revocation endpoint.
        
        Args:
            query_token: Function to query token by value
            revoke_token: Function to revoke token
        """

    def create_revocation_response(self, request: HttpRequest) -> HttpResponse:
        """
        Create revocation response for Django.
        
        Args:
            request: Django HttpRequest object
            
        Returns:
            Django HttpResponse object
        """

    def query_token(self, token: str, token_type_hint: str = None, client: object = None) -> object:
        """
        Query token by value.
        
        Args:
            token: Token string
            token_type_hint: Hint about token type
            client: Client object
            
        Returns:
            Token object if found
        """

    def revoke_token(self, token: object, client: object = None) -> None:
        """
        Revoke token.
        
        Args:
            token: Token object to revoke
            client: Client object
        """

Django Model Mixins

Enhanced model mixins for Django ORM integration with OAuth 2.0.

class ClientMixin:
    """Mixin for Django OAuth 2.0 client model."""
    
    client_id: str  # Client identifier field
    client_secret: str  # Client secret field (may be None)
    client_id_issued_at: int  # Client ID issued timestamp
    client_secret_expires_at: int  # Client secret expiration
    
    def get_client_id(self) -> str:
        """Get client ID."""
        return self.client_id
    
    def get_default_redirect_uri(self) -> str:
        """Get default redirect URI for this client."""
        return getattr(self, 'default_redirect_uri', '')
    
    def get_allowed_scope(self, scope: str) -> str:
        """
        Get allowed scope for client.
        
        Args:
            scope: Requested scope
            
        Returns:
            Allowed scope string
        """
        allowed = getattr(self, 'allowed_scopes', '')
        if not scope:
            return allowed
        scopes = scope.split()
        allowed_scopes = allowed.split()
        return ' '.join([s for s in scopes if s in allowed_scopes])
    
    def check_redirect_uri(self, redirect_uri: str) -> bool:
        """
        Check if redirect URI is allowed for this client.
        
        Args:
            redirect_uri: Redirect URI to check
            
        Returns:
            True if redirect URI is allowed
        """
        return redirect_uri in self.get_allowed_redirect_uris()
    
    def has_client_secret(self) -> bool:
        """Check if client has a secret."""
        return bool(self.client_secret)
    
    def check_client_secret(self, client_secret: str) -> bool:
        """
        Verify client secret.
        
        Args:
            client_secret: Secret to verify
            
        Returns:
            True if secret is valid
        """
        return self.client_secret == client_secret
    
    def check_token_endpoint_auth_method(self, method: str) -> bool:
        """
        Check if token endpoint auth method is supported.
        
        Args:
            method: Authentication method
            
        Returns:
            True if method is supported
        """
        return method in getattr(self, 'token_endpoint_auth_methods', ['client_secret_basic'])
    
    def check_response_type(self, response_type: str) -> bool:
        """
        Check if response type is supported.
        
        Args:
            response_type: Response type to check
            
        Returns:
            True if response type is supported
        """
        return response_type in getattr(self, 'response_types', ['code'])
    
    def check_grant_type(self, grant_type: str) -> bool:
        """
        Check if grant type is supported.
        
        Args:
            grant_type: Grant type to check
            
        Returns:
            True if grant type is supported
        """
        return grant_type in getattr(self, 'grant_types', ['authorization_code'])

class AuthorizationCodeMixin:
    """Mixin for Django authorization code model."""
    
    code: str  # Authorization code field
    client_id: str  # Client identifier field
    redirect_uri: str  # Redirect URI field
    scope: str  # Authorized scope field
    user_id: str  # User identifier field
    code_challenge: str  # PKCE code challenge field
    code_challenge_method: str  # PKCE challenge method field
    
    def is_expired(self) -> bool:
        """
        Check if authorization code is expired.
        
        Returns:
            True if code is expired
        """
        from django.utils import timezone
        expires_at = getattr(self, 'expires_at', None)
        if not expires_at:
            return False
        return timezone.now() > expires_at
    
    def get_redirect_uri(self) -> str:
        """Get redirect URI."""
        return self.redirect_uri
    
    def get_scope(self) -> str:
        """Get authorized scope."""
        return self.scope
    
    def get_user_id(self) -> str:
        """Get user ID."""
        return str(self.user_id)
    
    def get_code_challenge(self) -> str:
        """Get PKCE code challenge."""
        return getattr(self, 'code_challenge', '')
    
    def get_code_challenge_method(self) -> str:
        """Get PKCE challenge method."""
        return getattr(self, 'code_challenge_method', '')

class TokenMixin:
    """Mixin for Django access token model."""
    
    access_token: str  # Access token field
    client_id: str  # Client identifier field
    token_type: str  # Token type field
    refresh_token: str  # Refresh token field
    scope: str  # Token scope field
    user_id: str  # User identifier field
    issued_at: int  # Token issued timestamp
    expires_in: int  # Token lifetime in seconds
    
    def get_scope(self) -> str:
        """Get token scope."""
        return self.scope or ''
    
    def get_user_id(self) -> str:
        """Get user ID."""
        return str(self.user_id)
    
    def is_expired(self) -> bool:
        """
        Check if token is expired.
        
        Returns:
            True if token is expired
        """
        from django.utils import timezone
        if not self.expires_in:
            return False
        expires_at = self.issued_at + self.expires_in
        return timezone.now().timestamp() > expires_at
    
    def is_revoked(self) -> bool:
        """
        Check if token is revoked.
        
        Returns:
            True if token is revoked
        """
        return getattr(self, 'revoked', False)
    
    def get_expires_at(self) -> int:
        """
        Get expiration timestamp.
        
        Returns:
            Expiration timestamp
        """
        if not self.expires_in:
            return 0
        return self.issued_at + self.expires_in

Django Signals

Django signals for OAuth 2.0 events.

import django.dispatch

# OAuth 2.0 server signals
client_authenticated = django.dispatch.Signal()  # providing_args=['client']
token_authenticated = django.dispatch.Signal()   # providing_args=['token'] 
token_revoked = django.dispatch.Signal()        # providing_args=['token']

Usage Examples

Django OAuth 2.0 Server Setup

# models.py
from django.db import models
from django.contrib.auth.models import User
from authlib.integrations.django_oauth2 import ClientMixin, AuthorizationCodeMixin, TokenMixin

class Client(models.Model, ClientMixin):
    client_id = models.CharField(max_length=40, unique=True)
    client_secret = models.CharField(max_length=55, blank=True)
    name = models.CharField(max_length=100)
    redirect_uris = models.TextField()
    allowed_scopes = models.TextField(default='')
    response_types = models.TextField(default='code')
    grant_types = models.TextField(default='authorization_code refresh_token')
    
    def get_allowed_redirect_uris(self):
        return self.redirect_uris.split()

class AuthorizationCode(models.Model, AuthorizationCodeMixin):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    client = models.ForeignKey(Client, on_delete=models.CASCADE)
    code = models.CharField(max_length=120, unique=True)
    redirect_uri = models.TextField()
    scope = models.TextField(default='')
    created_at = models.DateTimeField(auto_now_add=True)
    expires_at = models.DateTimeField()
    code_challenge = models.TextField(blank=True)
    code_challenge_method = models.CharField(max_length=10, blank=True)
    
    @property
    def client_id(self):
        return self.client.client_id
    
    @property
    def user_id(self):
        return self.user.id

class Token(models.Model, TokenMixin):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    client = models.ForeignKey(Client, on_delete=models.CASCADE)
    access_token = models.CharField(max_length=255, unique=True)
    refresh_token = models.CharField(max_length=255, unique=True, blank=True)
    token_type = models.CharField(max_length=20, default='Bearer')
    scope = models.TextField(default='')
    created_at = models.DateTimeField(auto_now_add=True)
    expires_in = models.IntegerField(default=3600)
    revoked = models.BooleanField(default=False)
    
    @property
    def client_id(self):
        return self.client.client_id
    
    @property
    def user_id(self):
        return self.user.id
    
    @property
    def issued_at(self):
        return int(self.created_at.timestamp())

# views.py
from django.http import HttpResponse
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt
from authlib.integrations.django_oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749.grants import AuthorizationCodeGrant, RefreshTokenGrant

def query_client(client_id):
    try:
        return Client.objects.get(client_id=client_id)
    except Client.DoesNotExist:
        return None

def save_token(token, request, *args, **kwargs):
    if request.grant_type == 'authorization_code':
        code = request.credential
        Token.objects.create(
            client=request.client,
            user_id=code.user_id,
            access_token=token['access_token'],
            refresh_token=token.get('refresh_token', ''),
            scope=token.get('scope', ''),
            expires_in=token.get('expires_in', 3600)
        )

# Initialize authorization server
authorization_server = AuthorizationServer(
    query_client=query_client,
    save_token=save_token
)
authorization_server.register_grant(AuthorizationCodeGrant)
authorization_server.register_grant(RefreshTokenGrant)

@login_required
def authorize(request):
    if request.method == 'GET':
        try:
            grant = authorization_server.validate_consent_request(request, end_user=request.user)
            return render(request, 'oauth2/authorize.html', {
                'grant': grant,
                'user': request.user
            })
        except OAuth2Error as error:
            return HttpResponse(f'Error: {error.error}', status=400)
    
    if request.POST.get('confirm'):
        grant_user = request.user
    else:
        grant_user = None
    
    return authorization_server.create_authorization_response(request, grant_user)

@csrf_exempt
def issue_token(request):
    return authorization_server.create_token_response(request)

@csrf_exempt
def revoke_token(request):
    return authorization_server.create_revocation_response(request)

Resource Protection

# api/views.py
from django.http import JsonResponse
from django.contrib.auth.models import User
from authlib.integrations.django_oauth2 import ResourceProtector, BearerTokenValidator

class MyBearerTokenValidator(BearerTokenValidator):
    def authenticate_token(self, token_string):
        try:
            return Token.objects.get(access_token=token_string, revoked=False)
        except Token.DoesNotExist:
            return None
    
    def token_expired(self, token):
        return token.is_expired()
    
    def get_token_scopes(self, token):
        return token.scope.split() if token.scope else []

# Initialize resource protector
require_oauth = ResourceProtector()
require_oauth.register_token_validator(MyBearerTokenValidator())

@require_oauth('profile')
def api_user(request):
    token = require_oauth.acquire_token(request)
    user = User.objects.get(id=token.user_id)
    return JsonResponse({
        'id': user.id,
        'username': user.username,
        'email': user.email,
        'first_name': user.first_name,
        'last_name': user.last_name
    })

@require_oauth('read')
def api_posts(request):
    token = require_oauth.acquire_token(request)
    # Get posts for authenticated user
    posts = Post.objects.filter(author_id=token.user_id)
    return JsonResponse({
        'posts': [{
            'id': post.id,
            'title': post.title,
            'content': post.content,
            'created_at': post.created_at.isoformat()
        } for post in posts]
    })

@require_oauth('write')
def api_create_post(request):
    token = require_oauth.acquire_token(request)
    import json
    data = json.loads(request.body)
    
    post = Post.objects.create(
        author_id=token.user_id,
        title=data['title'],
        content=data['content']
    )
    
    return JsonResponse({'id': post.id}, status=201)

# Optional protection
@require_oauth(optional=True)
def api_public_posts(request):
    token = require_oauth.acquire_token(request, raise_error=False)
    
    if token:
        # Show personalized content for authenticated users
        posts = Post.objects.filter(public=True).order_by('-created_at')
        message = f'Hello {User.objects.get(id=token.user_id).username}'
    else:
        # Show limited content for anonymous users  
        posts = Post.objects.filter(public=True, featured=True).order_by('-created_at')
        message = 'Hello anonymous user'
    
    return JsonResponse({
        'message': message,
        'posts': [{'id': p.id, 'title': p.title} for p in posts[:10]]
    })

URL Configuration

# urls.py
from django.urls import path
from . import views

urlpatterns = [
    # OAuth 2.0 endpoints
    path('authorize/', views.authorize, name='oauth2_authorize'),
    path('token/', views.issue_token, name='oauth2_token'),
    path('revoke/', views.revoke_token, name='oauth2_revoke'),
    
    # API endpoints
    path('api/user/', views.api_user, name='api_user'),
    path('api/posts/', views.api_posts, name='api_posts'),
    path('api/posts/create/', views.api_create_post, name='api_create_post'),
    path('api/public/', views.api_public_posts, name='api_public_posts'),
]

Using Django Signals

# signals.py
from django.dispatch import receiver
from authlib.integrations.django_oauth2 import client_authenticated, token_authenticated, token_revoked
import logging

logger = logging.getLogger(__name__)

@receiver(client_authenticated)
def on_client_authenticated(sender, client=None, **kwargs):
    logger.info(f'Client {client.client_id} authenticated')
    # Update client statistics, log authentication, etc.

@receiver(token_authenticated)
def on_token_authenticated(sender, token=None, **kwargs):
    logger.info(f'Token for user {token.user_id} authenticated')
    # Update user last activity, log API usage, etc.
    User.objects.filter(id=token.user_id).update(last_api_access=timezone.now())

@receiver(token_revoked)
def on_token_revoked(sender, token=None, **kwargs):
    logger.info(f'Token {token.access_token} revoked')
    # Clean up related resources, notify user, etc.
    # Send push notification about token revocation

Management Commands

# management/commands/create_oauth_client.py
from django.core.management.base import BaseCommand
from myapp.models import Client
import secrets

class Command(BaseCommand):
    help = 'Create OAuth 2.0 client'
    
    def add_arguments(self, parser):
        parser.add_argument('name', type=str, help='Client name')
        parser.add_argument('--redirect-uris', required=True, help='Redirect URIs (space-separated)')
        parser.add_argument('--scopes', default='read write', help='Allowed scopes')
    
    def handle(self, *args, **options):
        client = Client.objects.create(
            client_id=secrets.token_urlsafe(32),
            client_secret=secrets.token_urlsafe(48),
            name=options['name'],
            redirect_uris=options['redirect_uris'],
            allowed_scopes=options['scopes']
        )
        
        self.stdout.write(self.style.SUCCESS(f'Created client: {client.client_id}'))
        self.stdout.write(f'Client Secret: {client.client_secret}')

Middleware Integration

# middleware.py
class OAuthMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # Add OAuth token to request if available
        auth_header = request.META.get('HTTP_AUTHORIZATION', '')
        if auth_header.startswith('Bearer '):
            token_string = auth_header[7:]
            try:
                token = Token.objects.get(access_token=token_string, revoked=False)
                if not token.is_expired():
                    request.oauth_token = token
                    request.user = User.objects.get(id=token.user_id)
            except Token.DoesNotExist:
                pass
        
        response = self.get_response(request)
        return response

Install with Tessl CLI

npx tessl i tessl/pypi-authlib

docs

common-utilities.md

django-integration.md

flask-integration.md

http-clients.md

index.md

jose.md

oauth1.md

oauth2.md

oidc.md

tile.json