CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-django-oauth-toolkit

OAuth2 Provider for Django web applications with complete server functionality, token management, and authorization endpoints.

Pending
Overview
Eval results
Files

management-commands.mddocs/

Management Commands and Utilities

Django OAuth Toolkit provides Django management commands for OAuth2 administration and utility functions for common operations. These tools help with token cleanup, application creation, and system maintenance.

Capabilities

Token Cleanup Command

Django management command for cleaning up expired OAuth2 tokens and grants.

class Command(BaseCommand):
    """
    Management command: python manage.py cleartokens
    
    Removes all expired tokens and grants from the database.
    Safe for production use with batch processing to handle large datasets.
    
    Usage:
        python manage.py cleartokens
        
    What it cleans:
        - Expired refresh tokens (if REFRESH_TOKEN_EXPIRE_SECONDS is set)
        - Revoked refresh tokens older than expiration threshold
        - Access tokens without refresh tokens that have expired
        - Expired ID tokens without associated access tokens
        - Expired authorization grants
        
    Batch Processing:
        Uses CLEAR_EXPIRED_TOKENS_BATCH_SIZE and CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL
        settings to process tokens in batches, preventing memory issues.
    """
    
    help = "Can be run as a cronjob or directly to clean out expired tokens"
    
    def handle(self, *args, **options):
        """Execute token cleanup process"""

Application Creation Command

Django management command for programmatically creating OAuth2 applications.

class Command(BaseCommand):
    """
    Management command: python manage.py createapplication
    
    Creates new OAuth2 client applications from command line.
    Useful for automation, testing, and deployment scripts.
    
    Usage:
        python manage.py createapplication <client_type> <authorization_grant_type> [options]
        
    Arguments:
        client_type: 'confidential' or 'public'
        authorization_grant_type: 'authorization-code', 'implicit', 'password', 
                                 'client-credentials', or 'openid-hybrid'
                                 
    Options:
        --client-id: Custom client ID (auto-generated if not provided)
        --client-secret: Custom client secret (auto-generated if not provided)
        --no-hash-client-secret: Don't hash the client secret on save
        --name: Application name
        --user: Username of application owner
        --redirect-uris: Space-separated redirect URIs
        --post-logout-redirect-uris: Space-separated OIDC logout redirect URIs
        --skip-authorization: Skip user authorization prompt
        --algorithm: OIDC signing algorithm ('', 'RS256', 'HS256')
        
    Examples:
        # Confidential web application
        python manage.py createapplication confidential authorization-code \\
            --name "My Web App" --redirect-uris "http://localhost:8000/callback/"
            
        # Public mobile application  
        python manage.py createapplication public authorization-code \\
            --name "Mobile App" --redirect-uris "myapp://callback/"
            
        # Server-to-server application
        python manage.py createapplication confidential client-credentials \\
            --name "API Client"
    """
    
    help = "Shortcut to create a new application in a programmatic way"
    
    def add_arguments(self, parser):
        """Add command line arguments"""
    
    def handle(self, *args, **options):
        """Create application with provided options"""

Utility Functions

Core utility functions for OAuth2 operations and maintenance.

def clear_expired() -> None:
    """
    Remove all expired tokens and grants from database.
    
    Processes in batches to handle large datasets efficiently:
    - Respects CLEAR_EXPIRED_TOKENS_BATCH_SIZE setting
    - Uses CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL for rate limiting
    - Handles refresh tokens, access tokens, ID tokens, and grants
    - Uses timezone-aware expiration checking
    - Provides detailed logging of deletion counts
    
    Safe for production use and can be called programmatically
    or via management command.
    """

def generate_client_id() -> str:
    """
    Generate OAuth2 client identifier.
    
    Returns:
        40-character client ID suitable for OAuth2 Basic Authentication
        
    Uses CLIENT_ID_GENERATOR_CLASS setting for customization.
    Default generates URL-safe random string without colons.
    """

def generate_client_secret() -> str:
    """
    Generate OAuth2 client secret.
    
    Returns:
        Random client secret of configurable length
        
    Uses CLIENT_SECRET_GENERATOR_CLASS and CLIENT_SECRET_GENERATOR_LENGTH
    settings for customization. Default is 128-character random string.
    """

def get_application_model():
    """
    Get the active Application model class.
    
    Returns:
        Application model class (supports swappable models)
        
    Respects OAUTH2_PROVIDER_APPLICATION_MODEL setting for custom models.
    """

def get_access_token_model():
    """Get the active AccessToken model class"""

def get_refresh_token_model():
    """Get the active RefreshToken model class"""

def get_grant_model():
    """Get the active Grant model class"""

def get_id_token_model():
    """Get the active IDToken model class"""

Generator Classes

Base classes and implementations for generating OAuth2 identifiers and secrets.

class BaseHashGenerator:
    """
    Base class for OAuth2 token generators.
    
    All custom generators should extend this class and override hash() method.
    Provides consistent interface for generating client IDs, secrets, and tokens.
    """
    
    def hash(self) -> str:
        """
        Generate hash/token value.
        
        Returns:
            Generated string value
            
        Raises:
            NotImplementedError: Must be implemented by subclasses
        """

class ClientIdGenerator(BaseHashGenerator):
    """
    Default OAuth2 client ID generator.
    
    Generates 40-character client ID without colon characters
    to comply with RFC 2617 Basic Authentication requirements.
    Uses Unicode ASCII character set for maximum compatibility.
    """
    
    def hash(self) -> str:
        """Generate RFC-compliant client ID"""

class ClientSecretGenerator(BaseHashGenerator):
    """
    Default OAuth2 client secret generator.
    
    Generates client secret of configurable length using
    CLIENT_SECRET_GENERATOR_LENGTH setting (default: 128 characters).
    Uses Unicode ASCII character set for secure random generation.
    """
    
    def hash(self) -> str:
        """Generate secure client secret"""

JWT and Cryptographic Utilities

Utility functions for JWT token handling and cryptographic operations.

def jwk_from_pem(pem_string: str):
    """
    Convert PEM private key to JSON Web Key (cached).
    
    Args:
        pem_string: PEM-formatted private key string
        
    Returns:
        JWK object for OIDC token signing
        
    Uses LRU cache for performance as PEM conversion is expensive
    for large keys (especially RSA). Cache persists across requests.
    """

def get_timezone(time_zone: str):
    """
    Get timezone info object for specified timezone.
    
    Args:
        time_zone: Timezone name (e.g., 'UTC', 'America/New_York')
        
    Returns:
        Timezone info object (zoneinfo.ZoneInfo or pytz timezone)
        
    Automatically handles zoneinfo (Python 3.9+) vs pytz compatibility.
    Respects USE_DEPRECATED_PYTZ setting for explicit pytz usage.
    """

Usage Examples

Token Cleanup Automation

# Automated token cleanup with cron job
# Add to crontab: 0 2 * * * /path/to/venv/bin/python /path/to/manage.py cleartokens

# Manual cleanup
python manage.py cleartokens

# Programmatic cleanup
from oauth2_provider.models import clear_expired

def cleanup_oauth_tokens():
    """Clean up expired tokens programmatically"""
    try:
        clear_expired()
        print("OAuth2 token cleanup completed successfully")
    except Exception as e:
        print(f"Token cleanup failed: {e}")

# In Django view or task
def maintenance_view(request):
    if request.user.is_staff:
        clear_expired()
        return JsonResponse({'message': 'Cleanup completed'})
    return JsonResponse({'error': 'Unauthorized'}, status=403)

Application Creation Automation

# Create applications via management command
#!/bin/bash
# deployment_script.sh

# Create web application
python manage.py createapplication confidential authorization-code \
    --name "Production Web App" \
    --redirect-uris "https://myapp.com/oauth/callback/" \
    --user admin

# Create mobile application
python manage.py createapplication public authorization-code \
    --name "Mobile App" \
    --redirect-uris "myapp://oauth/callback/" \
    --skip-authorization

# Create API client
python manage.py createapplication confidential client-credentials \
    --name "API Integration" \
    --user system

# Programmatic application creation
from oauth2_provider.models import Application
from django.contrib.auth import get_user_model

User = get_user_model()

def create_oauth_application(name, client_type, grant_type, redirect_uris=None, user=None):
    """Create OAuth2 application programmatically"""
    
    application = Application.objects.create(
        name=name,
        client_type=client_type,
        authorization_grant_type=grant_type,
        user=user,
    )
    
    if redirect_uris:
        application.redirect_uris = ' '.join(redirect_uris)
        application.save()
    
    return {
        'client_id': application.client_id,
        'client_secret': application.client_secret,
        'name': application.name,
    }

# Usage
admin_user = User.objects.get(username='admin')
app_info = create_oauth_application(
    name="My API Client",
    client_type=Application.CLIENT_CONFIDENTIAL,
    grant_type=Application.GRANT_CLIENT_CREDENTIALS,
    user=admin_user
)
print(f"Created application: {app_info}")

Custom Token Generators

# custom_generators.py
from oauth2_provider.generators import BaseHashGenerator
import secrets
import string

class SecureClientIdGenerator(BaseHashGenerator):
    """Enhanced security client ID generator"""
    
    def hash(self):
        # Use cryptographically secure random generation
        alphabet = string.ascii_letters + string.digits
        return ''.join(secrets.choice(alphabet) for _ in range(32))

class TimestampedSecretGenerator(BaseHashGenerator):
    """Client secret generator with timestamp prefix"""
    
    def hash(self):
        import time
        timestamp = str(int(time.time()))
        secret_part = secrets.token_urlsafe(64)
        return f"{timestamp}_{secret_part}"

# settings.py
OAUTH2_PROVIDER = {
    'CLIENT_ID_GENERATOR_CLASS': 'myapp.generators.SecureClientIdGenerator',
    'CLIENT_SECRET_GENERATOR_CLASS': 'myapp.generators.TimestampedSecretGenerator',
}

# Usage
from oauth2_provider.generators import generate_client_id, generate_client_secret

client_id = generate_client_id()  # Uses custom generator
client_secret = generate_client_secret()  # Uses custom generator

Maintenance Scripts

# maintenance.py - OAuth2 maintenance utilities
from django.core.management.base import BaseCommand
from oauth2_provider.models import clear_expired, get_access_token_model, get_application_model
from django.utils import timezone
from datetime import timedelta

class Command(BaseCommand):
    """Custom maintenance command for OAuth2 system"""
    
    help = 'Perform OAuth2 system maintenance'
    
    def add_arguments(self, parser):
        parser.add_argument('--cleanup', action='store_true', help='Clean expired tokens')
        parser.add_argument('--stats', action='store_true', help='Show system statistics')
        parser.add_argument('--audit', action='store_true', help='Audit token usage')
    
    def handle(self, *args, **options):
        if options['cleanup']:
            self.cleanup_tokens()
        
        if options['stats']:
            self.show_statistics()
            
        if options['audit']:
            self.audit_tokens()
    
    def cleanup_tokens(self):
        """Clean up expired tokens with reporting"""
        self.stdout.write('Starting token cleanup...')
        
        # Get counts before cleanup
        AccessToken = get_access_token_model()
        before_count = AccessToken.objects.count()
        
        # Perform cleanup
        clear_expired()
        
        # Report results
        after_count = AccessToken.objects.count()
        cleaned = before_count - after_count
        self.stdout.write(
            self.style.SUCCESS(f'Cleaned {cleaned} expired tokens')
        )
    
    def show_statistics(self):
        """Show OAuth2 system statistics"""
        Application = get_application_model()
        AccessToken = get_access_token_model()
        
        # Application statistics
        total_apps = Application.objects.count()
        active_apps = Application.objects.filter(
            accesstoken__expires__gt=timezone.now()
        ).distinct().count()
        
        # Token statistics
        total_tokens = AccessToken.objects.count()
        active_tokens = AccessToken.objects.filter(
            expires__gt=timezone.now()
        ).count()
        
        self.stdout.write(f'Applications: {total_apps} total, {active_apps} active')
        self.stdout.write(f'Access Tokens: {total_tokens} total, {active_tokens} active')
    
    def audit_tokens(self):
        """Audit token usage patterns"""
        AccessToken = get_access_token_model()
        
        # Find old but active tokens
        week_ago = timezone.now() - timedelta(days=7)
        old_tokens = AccessToken.objects.filter(
            created__lt=week_ago,
            expires__gt=timezone.now()
        )
        
        self.stdout.write(f'Found {old_tokens.count()} tokens older than 1 week but still active')
        
        # Token usage by application
        from django.db.models import Count
        app_usage = AccessToken.objects.values(
            'application__name'
        ).annotate(
            token_count=Count('id')
        ).order_by('-token_count')[:10]
        
        self.stdout.write('Top 10 applications by token count:')
        for item in app_usage:
            self.stdout.write(f"  {item['application__name']}: {item['token_count']}")

Monitoring and Health Checks

# monitoring.py - OAuth2 system monitoring
from oauth2_provider.models import get_access_token_model, get_application_model
from django.utils import timezone
from datetime import timedelta
import logging

logger = logging.getLogger(__name__)

def oauth2_health_check():
    """Perform OAuth2 system health check"""
    
    results = {
        'status': 'healthy',
        'checks': {}
    }
    
    try:
        # Check database connectivity
        AccessToken = get_access_token_model()
        Application = get_application_model()
        
        # Count active tokens
        active_tokens = AccessToken.objects.filter(
            expires__gt=timezone.now()
        ).count()
        results['checks']['active_tokens'] = active_tokens
        
        # Check for excessive expired tokens
        total_tokens = AccessToken.objects.count()
        if total_tokens > 0:
            expired_ratio = 1 - (active_tokens / total_tokens)
            if expired_ratio > 0.8:  # More than 80% expired
                results['status'] = 'warning'
                results['checks']['expired_ratio'] = expired_ratio
                logger.warning(f'High expired token ratio: {expired_ratio:.2%}')
        
        # Check application health
        total_apps = Application.objects.count()
        results['checks']['total_applications'] = total_apps
        
        # Check for recent token creation (system activity)
        hour_ago = timezone.now() - timedelta(hours=1)
        recent_tokens = AccessToken.objects.filter(created__gte=hour_ago).count()
        results['checks']['recent_token_activity'] = recent_tokens
        
    except Exception as e:
        results['status'] = 'unhealthy'
        results['error'] = str(e)
        logger.error(f'OAuth2 health check failed: {e}')
    
    return results

# Django view for health check endpoint
from django.http import JsonResponse

def oauth2_health_view(request):
    """Health check endpoint for OAuth2 system"""
    health = oauth2_health_check()
    
    status_code = 200
    if health['status'] == 'warning':
        status_code = 200  # Still operational
    elif health['status'] == 'unhealthy':
        status_code = 503  # Service unavailable
    
    return JsonResponse(health, status=status_code)

Batch Operations

# batch_operations.py - Batch operations for OAuth2 entities
from oauth2_provider.models import get_access_token_model, get_application_model
from django.db import transaction
from django.utils import timezone

def batch_revoke_user_tokens(user, application=None):
    """Revoke all tokens for a specific user"""
    
    AccessToken = get_access_token_model()
    queryset = AccessToken.objects.filter(user=user)
    
    if application:
        queryset = queryset.filter(application=application)
    
    with transaction.atomic():
        count = 0
        for token in queryset.iterator():
            token.revoke()
            count += 1
    
    return count

def cleanup_inactive_applications():
    """Remove applications with no recent token activity"""
    
    Application = get_application_model()
    AccessToken = get_access_token_model()
    
    # Find applications with no tokens in last 90 days
    cutoff_date = timezone.now() - timezone.timedelta(days=90)
    
    inactive_apps = Application.objects.exclude(
        accesstoken__created__gte=cutoff_date
    ).distinct()
    
    results = []
    for app in inactive_apps:
        # Only delete if no active tokens
        if not AccessToken.objects.filter(
            application=app,
            expires__gt=timezone.now()
        ).exists():
            results.append({
                'name': app.name,
                'client_id': app.client_id,
                'deleted': True
            })
            app.delete()
        else:
            results.append({
                'name': app.name,  
                'client_id': app.client_id,
                'deleted': False,
                'reason': 'has_active_tokens'
            })
    
    return results

def migrate_token_scopes(old_scope, new_scope):
    """Migrate tokens from old scope to new scope"""
    
    AccessToken = get_access_token_model()
    
    tokens = AccessToken.objects.filter(scope__contains=old_scope)
    count = 0
    
    for token in tokens:
        scopes = token.scope.split()
        if old_scope in scopes:
            scopes = [new_scope if s == old_scope else s for s in scopes]
            token.scope = ' '.join(scopes)
            token.save()
            count += 1
    
    return count

Install with Tessl CLI

npx tessl i tessl/pypi-django-oauth-toolkit

docs

drf-integration.md

index.md

management-commands.md

management-views.md

models.md

oauth2-endpoints.md

oidc.md

settings.md

view-protection.md

tile.json