The ultimate Python library in building OAuth and OpenID Connect servers and clients.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Django-specific OAuth 2.0 server implementation with seamless Django model integration, middleware support, and Django-specific request/response handling. Provides comprehensive support for Django's ORM, authentication system, and HTTP request/response cycle.
Django-specific OAuth 2.0 authorization server implementation optimized for Django applications.
class AuthorizationServer:
"""Django OAuth 2.0 authorization server."""
def __init__(self, query_client: callable, save_token: callable) -> None:
"""
Initialize Django authorization server.
Args:
query_client: Function to query client by client_id
save_token: Function to save issued tokens
"""
def register_grant(self, grant_cls: type, extensions: list = None) -> None:
"""
Register a grant type.
Args:
grant_cls: Grant class to register
extensions: List of grant extensions
"""
def register_endpoint(self, endpoint: object) -> None:
"""
Register an endpoint.
Args:
endpoint: Endpoint instance
"""
def create_authorization_response(self, request: HttpRequest, grant_user: callable = None) -> HttpResponse:
"""
Create authorization response for Django.
Args:
request: Django HttpRequest object
grant_user: Function to grant authorization to user
Returns:
Django HttpResponse object
"""
def create_token_response(self, request: HttpRequest) -> HttpResponse:
"""
Create token response for Django.
Args:
request: Django HttpRequest object
Returns:
Django HttpResponse object
"""
def create_revocation_response(self, request: HttpRequest) -> HttpResponse:
"""
Create revocation response for Django.
Args:
request: Django HttpRequest object
Returns:
Django HttpResponse object
"""
def create_introspection_response(self, request: HttpRequest) -> HttpResponse:
"""
Create introspection response for Django.
Args:
request: Django HttpRequest object
Returns:
Django HttpResponse object
"""
def validate_consent_request(self, request: HttpRequest, end_user=None) -> None:
"""
Validate consent request at authorization endpoint.
Args:
request: Django HttpRequest object
end_user: End user object (Django User model)
"""Django-specific OAuth 2.0 resource server protection with Django model integration.
class ResourceProtector:
"""Django OAuth 2.0 resource protector."""
def __init__(self, require_oauth: callable = None) -> None:
"""
Initialize Django resource protector.
Args:
require_oauth: Optional default OAuth requirement function
"""
def register_token_validator(self, validator: 'BearerTokenValidator') -> None:
"""
Register bearer token validator.
Args:
validator: Token validator instance
"""
def __call__(self, scopes: list = None, optional: bool = False) -> callable:
"""
Decorator for protecting Django views.
Args:
scopes: Required scopes
optional: Whether protection is optional
Returns:
View decorator function
"""
def acquire_token(self, request: HttpRequest, scopes: list = None, raise_error: bool = True) -> 'OAuth2Token':
"""
Acquire token from Django request.
Args:
request: Django HttpRequest object
scopes: Required scopes
raise_error: Whether to raise error if token invalid
Returns:
OAuth2Token object if valid
"""
def validate_request(self, request: HttpRequest, scopes: list = None) -> 'OAuth2Token':
"""
Validate OAuth 2.0 request.
Args:
request: Django HttpRequest object
scopes: Required scopes
Returns:
OAuth2Token object if valid
"""Django-specific bearer token validator with ORM integration.
class BearerTokenValidator:
"""Django bearer token validator."""
def __init__(self, token_model: type = None, realm: str = None) -> None:
"""
Initialize Django bearer token validator.
Args:
token_model: Django model class for tokens
realm: OAuth realm for WWW-Authenticate header
"""
def authenticate_token(self, token_string: str) -> object:
"""
Authenticate bearer token using Django ORM.
Args:
token_string: Bearer token string
Returns:
Token model instance if valid
"""
def request_invalid(self, request: HttpRequest) -> bool:
"""
Check if Django request is invalid.
Args:
request: Django HttpRequest object
Returns:
True if request is invalid
"""
def token_revoked(self, token: object) -> bool:
"""
Check if token is revoked.
Args:
token: Token model instance
Returns:
True if token is revoked
"""
def token_expired(self, token: object) -> bool:
"""
Check if token is expired.
Args:
token: Token model instance
Returns:
True if token is expired
"""
def get_token_scopes(self, token: object) -> list:
"""
Get token scopes from Django model.
Args:
token: Token model instance
Returns:
List of scope strings
"""OAuth 2.0 token revocation endpoint for Django.
class RevocationEndpoint:
"""Django OAuth 2.0 revocation endpoint."""
def __init__(self, query_token: callable, revoke_token: callable) -> None:
"""
Initialize Django revocation endpoint.
Args:
query_token: Function to query token by value
revoke_token: Function to revoke token
"""
def create_revocation_response(self, request: HttpRequest) -> HttpResponse:
"""
Create revocation response for Django.
Args:
request: Django HttpRequest object
Returns:
Django HttpResponse object
"""
def query_token(self, token: str, token_type_hint: str = None, client: object = None) -> object:
"""
Query token by value.
Args:
token: Token string
token_type_hint: Hint about token type
client: Client object
Returns:
Token object if found
"""
def revoke_token(self, token: object, client: object = None) -> None:
"""
Revoke token.
Args:
token: Token object to revoke
client: Client object
"""Enhanced model mixins for Django ORM integration with OAuth 2.0.
class ClientMixin:
"""Mixin for Django OAuth 2.0 client model."""
client_id: str # Client identifier field
client_secret: str # Client secret field (may be None)
client_id_issued_at: int # Client ID issued timestamp
client_secret_expires_at: int # Client secret expiration
def get_client_id(self) -> str:
"""Get client ID."""
return self.client_id
def get_default_redirect_uri(self) -> str:
"""Get default redirect URI for this client."""
return getattr(self, 'default_redirect_uri', '')
def get_allowed_scope(self, scope: str) -> str:
"""
Get allowed scope for client.
Args:
scope: Requested scope
Returns:
Allowed scope string
"""
allowed = getattr(self, 'allowed_scopes', '')
if not scope:
return allowed
scopes = scope.split()
allowed_scopes = allowed.split()
return ' '.join([s for s in scopes if s in allowed_scopes])
def check_redirect_uri(self, redirect_uri: str) -> bool:
"""
Check if redirect URI is allowed for this client.
Args:
redirect_uri: Redirect URI to check
Returns:
True if redirect URI is allowed
"""
return redirect_uri in self.get_allowed_redirect_uris()
def has_client_secret(self) -> bool:
"""Check if client has a secret."""
return bool(self.client_secret)
def check_client_secret(self, client_secret: str) -> bool:
"""
Verify client secret.
Args:
client_secret: Secret to verify
Returns:
True if secret is valid
"""
return self.client_secret == client_secret
def check_token_endpoint_auth_method(self, method: str) -> bool:
"""
Check if token endpoint auth method is supported.
Args:
method: Authentication method
Returns:
True if method is supported
"""
return method in getattr(self, 'token_endpoint_auth_methods', ['client_secret_basic'])
def check_response_type(self, response_type: str) -> bool:
"""
Check if response type is supported.
Args:
response_type: Response type to check
Returns:
True if response type is supported
"""
return response_type in getattr(self, 'response_types', ['code'])
def check_grant_type(self, grant_type: str) -> bool:
"""
Check if grant type is supported.
Args:
grant_type: Grant type to check
Returns:
True if grant type is supported
"""
return grant_type in getattr(self, 'grant_types', ['authorization_code'])
class AuthorizationCodeMixin:
"""Mixin for Django authorization code model."""
code: str # Authorization code field
client_id: str # Client identifier field
redirect_uri: str # Redirect URI field
scope: str # Authorized scope field
user_id: str # User identifier field
code_challenge: str # PKCE code challenge field
code_challenge_method: str # PKCE challenge method field
def is_expired(self) -> bool:
"""
Check if authorization code is expired.
Returns:
True if code is expired
"""
from django.utils import timezone
expires_at = getattr(self, 'expires_at', None)
if not expires_at:
return False
return timezone.now() > expires_at
def get_redirect_uri(self) -> str:
"""Get redirect URI."""
return self.redirect_uri
def get_scope(self) -> str:
"""Get authorized scope."""
return self.scope
def get_user_id(self) -> str:
"""Get user ID."""
return str(self.user_id)
def get_code_challenge(self) -> str:
"""Get PKCE code challenge."""
return getattr(self, 'code_challenge', '')
def get_code_challenge_method(self) -> str:
"""Get PKCE challenge method."""
return getattr(self, 'code_challenge_method', '')
class TokenMixin:
"""Mixin for Django access token model."""
access_token: str # Access token field
client_id: str # Client identifier field
token_type: str # Token type field
refresh_token: str # Refresh token field
scope: str # Token scope field
user_id: str # User identifier field
issued_at: int # Token issued timestamp
expires_in: int # Token lifetime in seconds
def get_scope(self) -> str:
"""Get token scope."""
return self.scope or ''
def get_user_id(self) -> str:
"""Get user ID."""
return str(self.user_id)
def is_expired(self) -> bool:
"""
Check if token is expired.
Returns:
True if token is expired
"""
from django.utils import timezone
if not self.expires_in:
return False
expires_at = self.issued_at + self.expires_in
return timezone.now().timestamp() > expires_at
def is_revoked(self) -> bool:
"""
Check if token is revoked.
Returns:
True if token is revoked
"""
return getattr(self, 'revoked', False)
def get_expires_at(self) -> int:
"""
Get expiration timestamp.
Returns:
Expiration timestamp
"""
if not self.expires_in:
return 0
return self.issued_at + self.expires_inDjango signals for OAuth 2.0 events.
import django.dispatch
# OAuth 2.0 server signals
client_authenticated = django.dispatch.Signal() # providing_args=['client']
token_authenticated = django.dispatch.Signal() # providing_args=['token']
token_revoked = django.dispatch.Signal() # providing_args=['token']# models.py
from django.db import models
from django.contrib.auth.models import User
from authlib.integrations.django_oauth2 import ClientMixin, AuthorizationCodeMixin, TokenMixin
class Client(models.Model, ClientMixin):
client_id = models.CharField(max_length=40, unique=True)
client_secret = models.CharField(max_length=55, blank=True)
name = models.CharField(max_length=100)
redirect_uris = models.TextField()
allowed_scopes = models.TextField(default='')
response_types = models.TextField(default='code')
grant_types = models.TextField(default='authorization_code refresh_token')
def get_allowed_redirect_uris(self):
return self.redirect_uris.split()
class AuthorizationCode(models.Model, AuthorizationCodeMixin):
user = models.ForeignKey(User, on_delete=models.CASCADE)
client = models.ForeignKey(Client, on_delete=models.CASCADE)
code = models.CharField(max_length=120, unique=True)
redirect_uri = models.TextField()
scope = models.TextField(default='')
created_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField()
code_challenge = models.TextField(blank=True)
code_challenge_method = models.CharField(max_length=10, blank=True)
@property
def client_id(self):
return self.client.client_id
@property
def user_id(self):
return self.user.id
class Token(models.Model, TokenMixin):
user = models.ForeignKey(User, on_delete=models.CASCADE)
client = models.ForeignKey(Client, on_delete=models.CASCADE)
access_token = models.CharField(max_length=255, unique=True)
refresh_token = models.CharField(max_length=255, unique=True, blank=True)
token_type = models.CharField(max_length=20, default='Bearer')
scope = models.TextField(default='')
created_at = models.DateTimeField(auto_now_add=True)
expires_in = models.IntegerField(default=3600)
revoked = models.BooleanField(default=False)
@property
def client_id(self):
return self.client.client_id
@property
def user_id(self):
return self.user.id
@property
def issued_at(self):
return int(self.created_at.timestamp())
# views.py
from django.http import HttpResponse
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt
from authlib.integrations.django_oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749.grants import AuthorizationCodeGrant, RefreshTokenGrant
def query_client(client_id):
try:
return Client.objects.get(client_id=client_id)
except Client.DoesNotExist:
return None
def save_token(token, request, *args, **kwargs):
if request.grant_type == 'authorization_code':
code = request.credential
Token.objects.create(
client=request.client,
user_id=code.user_id,
access_token=token['access_token'],
refresh_token=token.get('refresh_token', ''),
scope=token.get('scope', ''),
expires_in=token.get('expires_in', 3600)
)
# Initialize authorization server
authorization_server = AuthorizationServer(
query_client=query_client,
save_token=save_token
)
authorization_server.register_grant(AuthorizationCodeGrant)
authorization_server.register_grant(RefreshTokenGrant)
@login_required
def authorize(request):
if request.method == 'GET':
try:
grant = authorization_server.validate_consent_request(request, end_user=request.user)
return render(request, 'oauth2/authorize.html', {
'grant': grant,
'user': request.user
})
except OAuth2Error as error:
return HttpResponse(f'Error: {error.error}', status=400)
if request.POST.get('confirm'):
grant_user = request.user
else:
grant_user = None
return authorization_server.create_authorization_response(request, grant_user)
@csrf_exempt
def issue_token(request):
return authorization_server.create_token_response(request)
@csrf_exempt
def revoke_token(request):
return authorization_server.create_revocation_response(request)# api/views.py
from django.http import JsonResponse
from django.contrib.auth.models import User
from authlib.integrations.django_oauth2 import ResourceProtector, BearerTokenValidator
class MyBearerTokenValidator(BearerTokenValidator):
def authenticate_token(self, token_string):
try:
return Token.objects.get(access_token=token_string, revoked=False)
except Token.DoesNotExist:
return None
def token_expired(self, token):
return token.is_expired()
def get_token_scopes(self, token):
return token.scope.split() if token.scope else []
# Initialize resource protector
require_oauth = ResourceProtector()
require_oauth.register_token_validator(MyBearerTokenValidator())
@require_oauth('profile')
def api_user(request):
token = require_oauth.acquire_token(request)
user = User.objects.get(id=token.user_id)
return JsonResponse({
'id': user.id,
'username': user.username,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name
})
@require_oauth('read')
def api_posts(request):
token = require_oauth.acquire_token(request)
# Get posts for authenticated user
posts = Post.objects.filter(author_id=token.user_id)
return JsonResponse({
'posts': [{
'id': post.id,
'title': post.title,
'content': post.content,
'created_at': post.created_at.isoformat()
} for post in posts]
})
@require_oauth('write')
def api_create_post(request):
token = require_oauth.acquire_token(request)
import json
data = json.loads(request.body)
post = Post.objects.create(
author_id=token.user_id,
title=data['title'],
content=data['content']
)
return JsonResponse({'id': post.id}, status=201)
# Optional protection
@require_oauth(optional=True)
def api_public_posts(request):
token = require_oauth.acquire_token(request, raise_error=False)
if token:
# Show personalized content for authenticated users
posts = Post.objects.filter(public=True).order_by('-created_at')
message = f'Hello {User.objects.get(id=token.user_id).username}'
else:
# Show limited content for anonymous users
posts = Post.objects.filter(public=True, featured=True).order_by('-created_at')
message = 'Hello anonymous user'
return JsonResponse({
'message': message,
'posts': [{'id': p.id, 'title': p.title} for p in posts[:10]]
})# urls.py
from django.urls import path
from . import views
urlpatterns = [
# OAuth 2.0 endpoints
path('authorize/', views.authorize, name='oauth2_authorize'),
path('token/', views.issue_token, name='oauth2_token'),
path('revoke/', views.revoke_token, name='oauth2_revoke'),
# API endpoints
path('api/user/', views.api_user, name='api_user'),
path('api/posts/', views.api_posts, name='api_posts'),
path('api/posts/create/', views.api_create_post, name='api_create_post'),
path('api/public/', views.api_public_posts, name='api_public_posts'),
]# signals.py
from django.dispatch import receiver
from authlib.integrations.django_oauth2 import client_authenticated, token_authenticated, token_revoked
import logging
logger = logging.getLogger(__name__)
@receiver(client_authenticated)
def on_client_authenticated(sender, client=None, **kwargs):
logger.info(f'Client {client.client_id} authenticated')
# Update client statistics, log authentication, etc.
@receiver(token_authenticated)
def on_token_authenticated(sender, token=None, **kwargs):
logger.info(f'Token for user {token.user_id} authenticated')
# Update user last activity, log API usage, etc.
User.objects.filter(id=token.user_id).update(last_api_access=timezone.now())
@receiver(token_revoked)
def on_token_revoked(sender, token=None, **kwargs):
logger.info(f'Token {token.access_token} revoked')
# Clean up related resources, notify user, etc.
# Send push notification about token revocation# management/commands/create_oauth_client.py
from django.core.management.base import BaseCommand
from myapp.models import Client
import secrets
class Command(BaseCommand):
help = 'Create OAuth 2.0 client'
def add_arguments(self, parser):
parser.add_argument('name', type=str, help='Client name')
parser.add_argument('--redirect-uris', required=True, help='Redirect URIs (space-separated)')
parser.add_argument('--scopes', default='read write', help='Allowed scopes')
def handle(self, *args, **options):
client = Client.objects.create(
client_id=secrets.token_urlsafe(32),
client_secret=secrets.token_urlsafe(48),
name=options['name'],
redirect_uris=options['redirect_uris'],
allowed_scopes=options['scopes']
)
self.stdout.write(self.style.SUCCESS(f'Created client: {client.client_id}'))
self.stdout.write(f'Client Secret: {client.client_secret}')# middleware.py
class OAuthMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Add OAuth token to request if available
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
token_string = auth_header[7:]
try:
token = Token.objects.get(access_token=token_string, revoked=False)
if not token.is_expired():
request.oauth_token = token
request.user = User.objects.get(id=token.user_id)
except Token.DoesNotExist:
pass
response = self.get_response(request)
return responseInstall with Tessl CLI
npx tessl i tessl/pypi-authlib