OAuth2 Provider for Django web applications with complete server functionality, token management, and authorization endpoints.
—
Django OAuth Toolkit provides Django management commands for OAuth2 administration and utility functions for common operations. These tools help with token cleanup, application creation, and system maintenance.
Django management command for cleaning up expired OAuth2 tokens and grants.
class Command(BaseCommand):
"""
Management command: python manage.py cleartokens
Removes all expired tokens and grants from the database.
Safe for production use with batch processing to handle large datasets.
Usage:
python manage.py cleartokens
What it cleans:
- Expired refresh tokens (if REFRESH_TOKEN_EXPIRE_SECONDS is set)
- Revoked refresh tokens older than expiration threshold
- Access tokens without refresh tokens that have expired
- Expired ID tokens without associated access tokens
- Expired authorization grants
Batch Processing:
Uses CLEAR_EXPIRED_TOKENS_BATCH_SIZE and CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL
settings to process tokens in batches, preventing memory issues.
"""
help = "Can be run as a cronjob or directly to clean out expired tokens"
def handle(self, *args, **options):
"""Execute token cleanup process"""Django management command for programmatically creating OAuth2 applications.
class Command(BaseCommand):
"""
Management command: python manage.py createapplication
Creates new OAuth2 client applications from command line.
Useful for automation, testing, and deployment scripts.
Usage:
python manage.py createapplication <client_type> <authorization_grant_type> [options]
Arguments:
client_type: 'confidential' or 'public'
authorization_grant_type: 'authorization-code', 'implicit', 'password',
'client-credentials', or 'openid-hybrid'
Options:
--client-id: Custom client ID (auto-generated if not provided)
--client-secret: Custom client secret (auto-generated if not provided)
--no-hash-client-secret: Don't hash the client secret on save
--name: Application name
--user: Username of application owner
--redirect-uris: Space-separated redirect URIs
--post-logout-redirect-uris: Space-separated OIDC logout redirect URIs
--skip-authorization: Skip user authorization prompt
--algorithm: OIDC signing algorithm ('', 'RS256', 'HS256')
Examples:
# Confidential web application
python manage.py createapplication confidential authorization-code \\
--name "My Web App" --redirect-uris "http://localhost:8000/callback/"
# Public mobile application
python manage.py createapplication public authorization-code \\
--name "Mobile App" --redirect-uris "myapp://callback/"
# Server-to-server application
python manage.py createapplication confidential client-credentials \\
--name "API Client"
"""
help = "Shortcut to create a new application in a programmatic way"
def add_arguments(self, parser):
"""Add command line arguments"""
def handle(self, *args, **options):
"""Create application with provided options"""Core utility functions for OAuth2 operations and maintenance.
def clear_expired() -> None:
"""
Remove all expired tokens and grants from database.
Processes in batches to handle large datasets efficiently:
- Respects CLEAR_EXPIRED_TOKENS_BATCH_SIZE setting
- Uses CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL for rate limiting
- Handles refresh tokens, access tokens, ID tokens, and grants
- Uses timezone-aware expiration checking
- Provides detailed logging of deletion counts
Safe for production use and can be called programmatically
or via management command.
"""
def generate_client_id() -> str:
"""
Generate OAuth2 client identifier.
Returns:
40-character client ID suitable for OAuth2 Basic Authentication
Uses CLIENT_ID_GENERATOR_CLASS setting for customization.
Default generates URL-safe random string without colons.
"""
def generate_client_secret() -> str:
"""
Generate OAuth2 client secret.
Returns:
Random client secret of configurable length
Uses CLIENT_SECRET_GENERATOR_CLASS and CLIENT_SECRET_GENERATOR_LENGTH
settings for customization. Default is 128-character random string.
"""
def get_application_model():
"""
Get the active Application model class.
Returns:
Application model class (supports swappable models)
Respects OAUTH2_PROVIDER_APPLICATION_MODEL setting for custom models.
"""
def get_access_token_model():
"""Get the active AccessToken model class"""
def get_refresh_token_model():
"""Get the active RefreshToken model class"""
def get_grant_model():
"""Get the active Grant model class"""
def get_id_token_model():
"""Get the active IDToken model class"""Base classes and implementations for generating OAuth2 identifiers and secrets.
class BaseHashGenerator:
"""
Base class for OAuth2 token generators.
All custom generators should extend this class and override hash() method.
Provides consistent interface for generating client IDs, secrets, and tokens.
"""
def hash(self) -> str:
"""
Generate hash/token value.
Returns:
Generated string value
Raises:
NotImplementedError: Must be implemented by subclasses
"""
class ClientIdGenerator(BaseHashGenerator):
"""
Default OAuth2 client ID generator.
Generates 40-character client ID without colon characters
to comply with RFC 2617 Basic Authentication requirements.
Uses Unicode ASCII character set for maximum compatibility.
"""
def hash(self) -> str:
"""Generate RFC-compliant client ID"""
class ClientSecretGenerator(BaseHashGenerator):
"""
Default OAuth2 client secret generator.
Generates client secret of configurable length using
CLIENT_SECRET_GENERATOR_LENGTH setting (default: 128 characters).
Uses Unicode ASCII character set for secure random generation.
"""
def hash(self) -> str:
"""Generate secure client secret"""Utility functions for JWT token handling and cryptographic operations.
def jwk_from_pem(pem_string: str):
"""
Convert PEM private key to JSON Web Key (cached).
Args:
pem_string: PEM-formatted private key string
Returns:
JWK object for OIDC token signing
Uses LRU cache for performance as PEM conversion is expensive
for large keys (especially RSA). Cache persists across requests.
"""
def get_timezone(time_zone: str):
"""
Get timezone info object for specified timezone.
Args:
time_zone: Timezone name (e.g., 'UTC', 'America/New_York')
Returns:
Timezone info object (zoneinfo.ZoneInfo or pytz timezone)
Automatically handles zoneinfo (Python 3.9+) vs pytz compatibility.
Respects USE_DEPRECATED_PYTZ setting for explicit pytz usage.
"""# Automated token cleanup with cron job
# Add to crontab: 0 2 * * * /path/to/venv/bin/python /path/to/manage.py cleartokens
# Manual cleanup
python manage.py cleartokens
# Programmatic cleanup
from oauth2_provider.models import clear_expired
def cleanup_oauth_tokens():
"""Clean up expired tokens programmatically"""
try:
clear_expired()
print("OAuth2 token cleanup completed successfully")
except Exception as e:
print(f"Token cleanup failed: {e}")
# In Django view or task
def maintenance_view(request):
if request.user.is_staff:
clear_expired()
return JsonResponse({'message': 'Cleanup completed'})
return JsonResponse({'error': 'Unauthorized'}, status=403)# Create applications via management command
#!/bin/bash
# deployment_script.sh
# Create web application
python manage.py createapplication confidential authorization-code \
--name "Production Web App" \
--redirect-uris "https://myapp.com/oauth/callback/" \
--user admin
# Create mobile application
python manage.py createapplication public authorization-code \
--name "Mobile App" \
--redirect-uris "myapp://oauth/callback/" \
--skip-authorization
# Create API client
python manage.py createapplication confidential client-credentials \
--name "API Integration" \
--user system
# Programmatic application creation
from oauth2_provider.models import Application
from django.contrib.auth import get_user_model
User = get_user_model()
def create_oauth_application(name, client_type, grant_type, redirect_uris=None, user=None):
"""Create OAuth2 application programmatically"""
application = Application.objects.create(
name=name,
client_type=client_type,
authorization_grant_type=grant_type,
user=user,
)
if redirect_uris:
application.redirect_uris = ' '.join(redirect_uris)
application.save()
return {
'client_id': application.client_id,
'client_secret': application.client_secret,
'name': application.name,
}
# Usage
admin_user = User.objects.get(username='admin')
app_info = create_oauth_application(
name="My API Client",
client_type=Application.CLIENT_CONFIDENTIAL,
grant_type=Application.GRANT_CLIENT_CREDENTIALS,
user=admin_user
)
print(f"Created application: {app_info}")# custom_generators.py
from oauth2_provider.generators import BaseHashGenerator
import secrets
import string
class SecureClientIdGenerator(BaseHashGenerator):
"""Enhanced security client ID generator"""
def hash(self):
# Use cryptographically secure random generation
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(32))
class TimestampedSecretGenerator(BaseHashGenerator):
"""Client secret generator with timestamp prefix"""
def hash(self):
import time
timestamp = str(int(time.time()))
secret_part = secrets.token_urlsafe(64)
return f"{timestamp}_{secret_part}"
# settings.py
OAUTH2_PROVIDER = {
'CLIENT_ID_GENERATOR_CLASS': 'myapp.generators.SecureClientIdGenerator',
'CLIENT_SECRET_GENERATOR_CLASS': 'myapp.generators.TimestampedSecretGenerator',
}
# Usage
from oauth2_provider.generators import generate_client_id, generate_client_secret
client_id = generate_client_id() # Uses custom generator
client_secret = generate_client_secret() # Uses custom generator# maintenance.py - OAuth2 maintenance utilities
from django.core.management.base import BaseCommand
from oauth2_provider.models import clear_expired, get_access_token_model, get_application_model
from django.utils import timezone
from datetime import timedelta
class Command(BaseCommand):
"""Custom maintenance command for OAuth2 system"""
help = 'Perform OAuth2 system maintenance'
def add_arguments(self, parser):
parser.add_argument('--cleanup', action='store_true', help='Clean expired tokens')
parser.add_argument('--stats', action='store_true', help='Show system statistics')
parser.add_argument('--audit', action='store_true', help='Audit token usage')
def handle(self, *args, **options):
if options['cleanup']:
self.cleanup_tokens()
if options['stats']:
self.show_statistics()
if options['audit']:
self.audit_tokens()
def cleanup_tokens(self):
"""Clean up expired tokens with reporting"""
self.stdout.write('Starting token cleanup...')
# Get counts before cleanup
AccessToken = get_access_token_model()
before_count = AccessToken.objects.count()
# Perform cleanup
clear_expired()
# Report results
after_count = AccessToken.objects.count()
cleaned = before_count - after_count
self.stdout.write(
self.style.SUCCESS(f'Cleaned {cleaned} expired tokens')
)
def show_statistics(self):
"""Show OAuth2 system statistics"""
Application = get_application_model()
AccessToken = get_access_token_model()
# Application statistics
total_apps = Application.objects.count()
active_apps = Application.objects.filter(
accesstoken__expires__gt=timezone.now()
).distinct().count()
# Token statistics
total_tokens = AccessToken.objects.count()
active_tokens = AccessToken.objects.filter(
expires__gt=timezone.now()
).count()
self.stdout.write(f'Applications: {total_apps} total, {active_apps} active')
self.stdout.write(f'Access Tokens: {total_tokens} total, {active_tokens} active')
def audit_tokens(self):
"""Audit token usage patterns"""
AccessToken = get_access_token_model()
# Find old but active tokens
week_ago = timezone.now() - timedelta(days=7)
old_tokens = AccessToken.objects.filter(
created__lt=week_ago,
expires__gt=timezone.now()
)
self.stdout.write(f'Found {old_tokens.count()} tokens older than 1 week but still active')
# Token usage by application
from django.db.models import Count
app_usage = AccessToken.objects.values(
'application__name'
).annotate(
token_count=Count('id')
).order_by('-token_count')[:10]
self.stdout.write('Top 10 applications by token count:')
for item in app_usage:
self.stdout.write(f" {item['application__name']}: {item['token_count']}")# monitoring.py - OAuth2 system monitoring
from oauth2_provider.models import get_access_token_model, get_application_model
from django.utils import timezone
from datetime import timedelta
import logging
logger = logging.getLogger(__name__)
def oauth2_health_check():
"""Perform OAuth2 system health check"""
results = {
'status': 'healthy',
'checks': {}
}
try:
# Check database connectivity
AccessToken = get_access_token_model()
Application = get_application_model()
# Count active tokens
active_tokens = AccessToken.objects.filter(
expires__gt=timezone.now()
).count()
results['checks']['active_tokens'] = active_tokens
# Check for excessive expired tokens
total_tokens = AccessToken.objects.count()
if total_tokens > 0:
expired_ratio = 1 - (active_tokens / total_tokens)
if expired_ratio > 0.8: # More than 80% expired
results['status'] = 'warning'
results['checks']['expired_ratio'] = expired_ratio
logger.warning(f'High expired token ratio: {expired_ratio:.2%}')
# Check application health
total_apps = Application.objects.count()
results['checks']['total_applications'] = total_apps
# Check for recent token creation (system activity)
hour_ago = timezone.now() - timedelta(hours=1)
recent_tokens = AccessToken.objects.filter(created__gte=hour_ago).count()
results['checks']['recent_token_activity'] = recent_tokens
except Exception as e:
results['status'] = 'unhealthy'
results['error'] = str(e)
logger.error(f'OAuth2 health check failed: {e}')
return results
# Django view for health check endpoint
from django.http import JsonResponse
def oauth2_health_view(request):
"""Health check endpoint for OAuth2 system"""
health = oauth2_health_check()
status_code = 200
if health['status'] == 'warning':
status_code = 200 # Still operational
elif health['status'] == 'unhealthy':
status_code = 503 # Service unavailable
return JsonResponse(health, status=status_code)# batch_operations.py - Batch operations for OAuth2 entities
from oauth2_provider.models import get_access_token_model, get_application_model
from django.db import transaction
from django.utils import timezone
def batch_revoke_user_tokens(user, application=None):
"""Revoke all tokens for a specific user"""
AccessToken = get_access_token_model()
queryset = AccessToken.objects.filter(user=user)
if application:
queryset = queryset.filter(application=application)
with transaction.atomic():
count = 0
for token in queryset.iterator():
token.revoke()
count += 1
return count
def cleanup_inactive_applications():
"""Remove applications with no recent token activity"""
Application = get_application_model()
AccessToken = get_access_token_model()
# Find applications with no tokens in last 90 days
cutoff_date = timezone.now() - timezone.timedelta(days=90)
inactive_apps = Application.objects.exclude(
accesstoken__created__gte=cutoff_date
).distinct()
results = []
for app in inactive_apps:
# Only delete if no active tokens
if not AccessToken.objects.filter(
application=app,
expires__gt=timezone.now()
).exists():
results.append({
'name': app.name,
'client_id': app.client_id,
'deleted': True
})
app.delete()
else:
results.append({
'name': app.name,
'client_id': app.client_id,
'deleted': False,
'reason': 'has_active_tokens'
})
return results
def migrate_token_scopes(old_scope, new_scope):
"""Migrate tokens from old scope to new scope"""
AccessToken = get_access_token_model()
tokens = AccessToken.objects.filter(scope__contains=old_scope)
count = 0
for token in tokens:
scopes = token.scope.split()
if old_scope in scopes:
scopes = [new_scope if s == old_scope else s for s in scopes]
token.scope = ' '.join(scopes)
token.save()
count += 1
return countInstall with Tessl CLI
npx tessl i tessl/pypi-django-oauth-toolkit