OAuth2 Provider for Django web applications with complete server functionality, token management, and authorization endpoints.
—
Django OAuth Toolkit provides first-class integration with Django REST Framework, offering OAuth2 authentication and comprehensive permission classes for API protection. This integration enables token-based API authentication with fine-grained scope validation.
DRF authentication backend that validates OAuth2 access tokens and provides user context.
class OAuth2Authentication(BaseAuthentication):
www_authenticate_realm: str = "api"
def authenticate(self, request) -> Optional[Tuple[User, AccessToken]]:
"""
Authenticate request using OAuth2 access token.
Args:
request: DRF Request object
Returns:
Tuple of (user, token) if authentication succeeds, None otherwise
"""
def authenticate_header(self, request) -> str:
"""Return WWW-Authenticate header value for OAuth2"""Permission classes that validate OAuth2 token scopes for API endpoint protection.
class TokenHasScope(BasePermission):
"""
Permission class that requires token to have specific scopes.
Validates that the OAuth2 access token has required scopes.
Must be used with OAuth2Authentication.
Attributes:
required_scopes: List of required scope names
message: Custom error message for permission denied
Usage:
@permission_classes([TokenHasScope])
@required_scopes(['read'])
def my_view(request):
# Requires 'read' scope
pass
class MyAPIView(APIView):
permission_classes = [TokenHasScope]
required_scopes = ['write']
"""
required_scopes = []
def has_permission(self, request, view) -> bool:
"""
Check if token has required scopes.
Args:
request: DRF Request object with OAuth2 authentication
view: DRF View instance
Returns:
True if token has all required scopes
"""
def get_scopes(self, request, view) -> list:
"""
Get required scopes for this request/view.
Args:
request: DRF Request object
view: DRF View instance
Returns:
List of required scope names
"""
class TokenHasReadWriteScope(TokenHasScope):
"""
Permission class with automatic read/write scope assignment.
Safe HTTP methods (GET, HEAD, OPTIONS) require 'read' scope.
Unsafe HTTP methods require 'write' scope.
Additional scopes can be specified via required_scopes.
Usage:
@permission_classes([TokenHasReadWriteScope])
def api_view(request):
# GET requests need 'read' scope
# POST/PUT/DELETE need 'write' scope
pass
"""
def get_scopes(self, request, view) -> list:
"""Get read/write scopes based on HTTP method"""
class TokenHasResourceScope(TokenHasScope):
"""
Permission class for resource-specific scope validation.
Validates scopes based on the specific resource being accessed.
Useful for APIs with multiple resource types.
Attributes:
required_alternate_scopes: Dict mapping resources to required scopes
Usage:
class MyView(APIView):
permission_classes = [TokenHasResourceScope]
required_alternate_scopes = {
'users': ['user:read'],
'posts': ['post:read'],
}
"""
required_alternate_scopes = {}
def get_required_alternate_scopes(self, request, view) -> dict:
"""Get resource-specific scope requirements"""
class IsAuthenticatedOrTokenHasScope(BasePermission):
"""
Permission allowing access to authenticated users or valid token holders.
Grants access if user is authenticated via session OR has valid OAuth2 token.
If using OAuth2 token, validates required scopes.
Attributes:
required_scopes: Scopes required for OAuth2 token access
Usage:
# Allow both session auth and OAuth2 token auth
@permission_classes([IsAuthenticatedOrTokenHasScope])
@required_scopes(['api'])
def hybrid_view(request):
pass
"""
required_scopes = []
def has_permission(self, request, view) -> bool:
"""Check session authentication or token scopes"""
class TokenMatchesOASRequirements(BasePermission):
"""
Permission class for OpenAPI Specification (OAS) scope matching.
Validates OAuth2 scopes against OpenAPI security requirements.
Useful for APIs documented with OpenAPI/Swagger specifications.
Attributes:
required_alternate_scopes: OpenAPI-style scope alternatives
Usage:
# Matches OpenAPI security requirement alternatives
class APIView(APIView):
permission_classes = [TokenMatchesOASRequirements]
required_alternate_scopes = {
'oauth2': ['read', 'write']
}
"""
required_alternate_scopes = {}
def has_permission(self, request, view) -> bool:
"""Validate against OAS-style scope requirements"""# settings.py
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'oauth2_provider.contrib.rest_framework.OAuth2Authentication',
'rest_framework.authentication.SessionAuthentication', # Optional: session auth
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
}
# Optional: OAuth2 settings
OAUTH2_PROVIDER = {
'SCOPES': {
'read': 'Read scope',
'write': 'Write scope',
'admin': 'Admin scope',
},
'ACCESS_TOKEN_EXPIRE_SECONDS': 3600,
'ERROR_RESPONSE_WITH_SCOPES': True, # Include required scopes in error responses
}from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from oauth2_provider.contrib.rest_framework import TokenHasScope
@api_view(['GET'])
@permission_classes([TokenHasScope])
def read_only_api(request):
"""API endpoint requiring 'read' scope"""
required_scopes = ['read'] # Set on view function
return Response({'data': 'Read-only content'})
@api_view(['POST'])
@permission_classes([TokenHasScope])
def write_api(request):
"""API endpoint requiring 'write' scope"""
required_scopes = ['write']
return Response({'message': 'Data created'})
@api_view(['GET', 'POST'])
@permission_classes([TokenHasReadWriteScope])
def auto_scope_api(request):
"""API with automatic read/write scope assignment"""
if request.method == 'GET':
return Response({'data': 'Retrieved'})
else:
return Response({'message': 'Created'})from rest_framework.views import APIView
from rest_framework.response import Response
from oauth2_provider.contrib.rest_framework import (
OAuth2Authentication,
TokenHasScope,
TokenHasReadWriteScope,
IsAuthenticatedOrTokenHasScope
)
class ProtectedAPIView(APIView):
"""Basic OAuth2 protected API view"""
authentication_classes = [OAuth2Authentication]
permission_classes = [TokenHasScope]
required_scopes = ['api']
def get(self, request):
return Response({
'user': request.user.username,
'scopes': request.auth.scope.split() if request.auth else []
})
class ReadWriteAPIView(APIView):
"""API view with read/write scope handling"""
authentication_classes = [OAuth2Authentication]
permission_classes = [TokenHasReadWriteScope]
def get(self, request):
# Requires 'read' scope
return Response({'data': 'content'})
def post(self, request):
# Requires 'write' scope
return Response({'message': 'created'})
class HybridAuthView(APIView):
"""View supporting both session and OAuth2 authentication"""
authentication_classes = [OAuth2Authentication, SessionAuthentication]
permission_classes = [IsAuthenticatedOrTokenHasScope]
required_scopes = ['api']
def get(self, request):
auth_type = 'oauth2' if hasattr(request, 'auth') and request.auth else 'session'
return Response({
'user': request.user.username,
'auth_type': auth_type
})from rest_framework.viewsets import ModelViewSet
from rest_framework.routers import DefaultRouter
from oauth2_provider.contrib.rest_framework import OAuth2Authentication, TokenHasScope
class BookViewSet(ModelViewSet):
"""ViewSet with OAuth2 authentication and scope permissions"""
queryset = Book.objects.all()
serializer_class = BookSerializer
authentication_classes = [OAuth2Authentication]
permission_classes = [TokenHasScope]
required_scopes = ['books']
def get_permissions(self):
"""Custom permission logic based on action"""
if self.action in ['list', 'retrieve']:
# Read operations require 'read' scope
self.required_scopes = ['books:read']
elif self.action in ['create', 'update', 'partial_update', 'destroy']:
# Write operations require 'write' scope
self.required_scopes = ['books:write']
return super().get_permissions()
# URL configuration
router = DefaultRouter()
router.register(r'books', BookViewSet)
urlpatterns = router.urlsfrom oauth2_provider.contrib.rest_framework import TokenHasScope
class AdminOrTokenHasScope(TokenHasScope):
"""Permission allowing admin users or specific token scopes"""
def has_permission(self, request, view):
# Allow admin users regardless of token
if request.user.is_authenticated and request.user.is_staff:
return True
# Otherwise check token scopes
return super().has_permission(request, view)
class ResourceOwnerOrTokenHasScope(TokenHasScope):
"""Permission for resource owners or token with scopes"""
def has_object_permission(self, request, view, obj):
# Resource owner can always access their own objects
if hasattr(obj, 'user') and obj.user == request.user:
return True
# Otherwise check token scopes
return super().has_permission(request, view)
class ConditionalTokenScope(TokenHasScope):
"""Conditional scope requirements based on request context"""
def get_scopes(self, request, view):
"""Dynamic scope requirements"""
scopes = super().get_scopes(request, view)
# Add conditional scopes
if request.query_params.get('include_sensitive'):
scopes.append('sensitive_data')
if request.method in ['PUT', 'PATCH', 'DELETE']:
scopes.append('modify')
return scopesfrom rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from oauth2_provider.contrib.rest_framework import OAuth2Authentication, TokenHasScope
class ErrorHandlingView(APIView):
"""Demonstrates OAuth2 error handling in DRF"""
authentication_classes = [OAuth2Authentication]
permission_classes = [TokenHasScope]
required_scopes = ['read']
def get(self, request):
# Check if user is authenticated via OAuth2
if not hasattr(request, 'auth') or not request.auth:
return Response(
{'error': 'OAuth2 authentication required'},
status=status.HTTP_401_UNAUTHORIZED
)
# Check token validity
if request.auth.is_expired():
return Response(
{'error': 'Token has expired'},
status=status.HTTP_401_UNAUTHORIZED
)
# Check specific scopes manually
if not request.auth.allow_scopes(['read', 'special']):
return Response(
{
'error': 'Insufficient scopes',
'required_scopes': ['read', 'special'],
'provided_scopes': request.auth.scope.split()
},
status=status.HTTP_403_FORBIDDEN
)
return Response({'message': 'Access granted'})
# OAuth2 authentication errors are automatically handled:
# - No token: HTTP 401 with WWW-Authenticate header
# - Invalid token: HTTP 401 Unauthorized
# - Insufficient scopes: HTTP 403 Forbidden (with scope details if configured)from rest_framework.views import APIView
from rest_framework.response import Response
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from oauth2_provider.contrib.rest_framework import TokenHasScope
class DocumentedAPIView(APIView):
"""API view with OAuth2 documentation for swagger/openapi"""
authentication_classes = [OAuth2Authentication]
permission_classes = [TokenHasScope]
required_scopes = ['read']
@swagger_auto_schema(
operation_description="Get protected data",
security=[{'oauth2': ['read']}],
responses={
200: openapi.Response('Success', examples={
'application/json': {'data': 'content'}
}),
401: openapi.Response('Unauthorized'),
403: openapi.Response('Forbidden - insufficient scopes')
}
)
def get(self, request):
return Response({'data': 'protected content'})
# settings.py for drf-yasg OAuth2 configuration
SWAGGER_SETTINGS = {
'SECURITY_DEFINITIONS': {
'oauth2': {
'type': 'oauth2',
'flow': 'authorization_code',
'authorizationUrl': 'https://example.com/o/authorize/',
'tokenUrl': 'https://example.com/o/token/',
'scopes': {
'read': 'Read access',
'write': 'Write access',
'admin': 'Admin access'
}
}
}
}from rest_framework.views import APIView
from rest_framework.response import Response
from oauth2_provider.contrib.rest_framework import OAuth2Authentication
class TokenInfoView(APIView):
"""View to inspect current OAuth2 token details"""
authentication_classes = [OAuth2Authentication]
def get(self, request):
if not hasattr(request, 'auth') or not request.auth:
return Response({'error': 'No OAuth2 token provided'}, status=401)
token = request.auth
return Response({
'token_info': {
'scopes': token.scope.split(),
'application': token.application.name,
'expires_at': token.expires.isoformat(),
'is_expired': token.is_expired(),
'user': token.user.username if token.user else None,
},
'user_info': {
'id': request.user.id,
'username': request.user.username,
'email': request.user.email,
'is_staff': request.user.is_staff,
}
})from rest_framework.throttling import UserRateThrottle
from rest_framework.views import APIView
from oauth2_provider.contrib.rest_framework import OAuth2Authentication, TokenHasScope
class OAuth2UserRateThrottle(UserRateThrottle):
"""Custom throttle that works with OAuth2 authentication"""
def get_cache_key(self, request, view):
# Use OAuth2 user if available, fallback to session user
if hasattr(request, 'auth') and request.auth and request.auth.user:
user = request.auth.user
else:
user = request.user
if user.is_authenticated:
ident = user.pk
else:
ident = self.get_ident(request)
return self.cache_format % {
'scope': self.scope,
'ident': ident
}
class ThrottledAPIView(APIView):
"""API view with OAuth2 authentication and throttling"""
authentication_classes = [OAuth2Authentication]
permission_classes = [TokenHasScope]
throttle_classes = [OAuth2UserRateThrottle]
throttle_scope = 'user'
required_scopes = ['api']
def get(self, request):
return Response({'message': 'throttled endpoint'})
# settings.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'user': '100/hour',
'anon': '10/hour',
}
}Install with Tessl CLI
npx tessl i tessl/pypi-django-oauth-toolkit