0
# Management Commands and Utilities
1
2
Django OAuth Toolkit provides Django management commands for OAuth2 administration and utility functions for common operations. These tools help with token cleanup, application creation, and system maintenance.
3
4
## Capabilities
5
6
### Token Cleanup Command
7
8
Django management command for cleaning up expired OAuth2 tokens and grants.
9
10
```python { .api }
11
class Command(BaseCommand):
12
"""
13
Management command: python manage.py cleartokens
14
15
Removes all expired tokens and grants from the database.
16
Safe for production use with batch processing to handle large datasets.
17
18
Usage:
19
python manage.py cleartokens
20
21
What it cleans:
22
- Expired refresh tokens (if REFRESH_TOKEN_EXPIRE_SECONDS is set)
23
- Revoked refresh tokens older than expiration threshold
24
- Access tokens without refresh tokens that have expired
25
- Expired ID tokens without associated access tokens
26
- Expired authorization grants
27
28
Batch Processing:
29
Uses CLEAR_EXPIRED_TOKENS_BATCH_SIZE and CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL
30
settings to process tokens in batches, preventing memory issues.
31
"""
32
33
help = "Can be run as a cronjob or directly to clean out expired tokens"
34
35
def handle(self, *args, **options):
36
"""Execute token cleanup process"""
37
```
38
39
### Application Creation Command
40
41
Django management command for programmatically creating OAuth2 applications.
42
43
```python { .api }
44
class Command(BaseCommand):
45
"""
46
Management command: python manage.py createapplication
47
48
Creates new OAuth2 client applications from command line.
49
Useful for automation, testing, and deployment scripts.
50
51
Usage:
52
python manage.py createapplication <client_type> <authorization_grant_type> [options]
53
54
Arguments:
55
client_type: 'confidential' or 'public'
56
authorization_grant_type: 'authorization-code', 'implicit', 'password',
57
'client-credentials', or 'openid-hybrid'
58
59
Options:
60
--client-id: Custom client ID (auto-generated if not provided)
61
--client-secret: Custom client secret (auto-generated if not provided)
62
--no-hash-client-secret: Don't hash the client secret on save
63
--name: Application name
64
--user: Username of application owner
65
--redirect-uris: Space-separated redirect URIs
66
--post-logout-redirect-uris: Space-separated OIDC logout redirect URIs
67
--skip-authorization: Skip user authorization prompt
68
--algorithm: OIDC signing algorithm ('', 'RS256', 'HS256')
69
70
Examples:
71
# Confidential web application
72
python manage.py createapplication confidential authorization-code \\
73
--name "My Web App" --redirect-uris "http://localhost:8000/callback/"
74
75
# Public mobile application
76
python manage.py createapplication public authorization-code \\
77
--name "Mobile App" --redirect-uris "myapp://callback/"
78
79
# Server-to-server application
80
python manage.py createapplication confidential client-credentials \\
81
--name "API Client"
82
"""
83
84
help = "Shortcut to create a new application in a programmatic way"
85
86
def add_arguments(self, parser):
87
"""Add command line arguments"""
88
89
def handle(self, *args, **options):
90
"""Create application with provided options"""
91
```
92
93
### Utility Functions
94
95
Core utility functions for OAuth2 operations and maintenance.
96
97
```python { .api }
98
def clear_expired() -> None:
99
"""
100
Remove all expired tokens and grants from database.
101
102
Processes in batches to handle large datasets efficiently:
103
- Respects CLEAR_EXPIRED_TOKENS_BATCH_SIZE setting
104
- Uses CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL for rate limiting
105
- Handles refresh tokens, access tokens, ID tokens, and grants
106
- Uses timezone-aware expiration checking
107
- Provides detailed logging of deletion counts
108
109
Safe for production use and can be called programmatically
110
or via management command.
111
"""
112
113
def generate_client_id() -> str:
114
"""
115
Generate OAuth2 client identifier.
116
117
Returns:
118
40-character client ID suitable for OAuth2 Basic Authentication
119
120
Uses CLIENT_ID_GENERATOR_CLASS setting for customization.
121
Default generates URL-safe random string without colons.
122
"""
123
124
def generate_client_secret() -> str:
125
"""
126
Generate OAuth2 client secret.
127
128
Returns:
129
Random client secret of configurable length
130
131
Uses CLIENT_SECRET_GENERATOR_CLASS and CLIENT_SECRET_GENERATOR_LENGTH
132
settings for customization. Default is 128-character random string.
133
"""
134
135
def get_application_model():
136
"""
137
Get the active Application model class.
138
139
Returns:
140
Application model class (supports swappable models)
141
142
Respects OAUTH2_PROVIDER_APPLICATION_MODEL setting for custom models.
143
"""
144
145
def get_access_token_model():
146
"""Get the active AccessToken model class"""
147
148
def get_refresh_token_model():
149
"""Get the active RefreshToken model class"""
150
151
def get_grant_model():
152
"""Get the active Grant model class"""
153
154
def get_id_token_model():
155
"""Get the active IDToken model class"""
156
```
157
158
### Generator Classes
159
160
Base classes and implementations for generating OAuth2 identifiers and secrets.
161
162
```python { .api }
163
class BaseHashGenerator:
164
"""
165
Base class for OAuth2 token generators.
166
167
All custom generators should extend this class and override hash() method.
168
Provides consistent interface for generating client IDs, secrets, and tokens.
169
"""
170
171
def hash(self) -> str:
172
"""
173
Generate hash/token value.
174
175
Returns:
176
Generated string value
177
178
Raises:
179
NotImplementedError: Must be implemented by subclasses
180
"""
181
182
class ClientIdGenerator(BaseHashGenerator):
183
"""
184
Default OAuth2 client ID generator.
185
186
Generates 40-character client ID without colon characters
187
to comply with RFC 2617 Basic Authentication requirements.
188
Uses Unicode ASCII character set for maximum compatibility.
189
"""
190
191
def hash(self) -> str:
192
"""Generate RFC-compliant client ID"""
193
194
class ClientSecretGenerator(BaseHashGenerator):
195
"""
196
Default OAuth2 client secret generator.
197
198
Generates client secret of configurable length using
199
CLIENT_SECRET_GENERATOR_LENGTH setting (default: 128 characters).
200
Uses Unicode ASCII character set for secure random generation.
201
"""
202
203
def hash(self) -> str:
204
"""Generate secure client secret"""
205
```
206
207
### JWT and Cryptographic Utilities
208
209
Utility functions for JWT token handling and cryptographic operations.
210
211
```python { .api }
212
def jwk_from_pem(pem_string: str):
213
"""
214
Convert PEM private key to JSON Web Key (cached).
215
216
Args:
217
pem_string: PEM-formatted private key string
218
219
Returns:
220
JWK object for OIDC token signing
221
222
Uses LRU cache for performance as PEM conversion is expensive
223
for large keys (especially RSA). Cache persists across requests.
224
"""
225
226
def get_timezone(time_zone: str):
227
"""
228
Get timezone info object for specified timezone.
229
230
Args:
231
time_zone: Timezone name (e.g., 'UTC', 'America/New_York')
232
233
Returns:
234
Timezone info object (zoneinfo.ZoneInfo or pytz timezone)
235
236
Automatically handles zoneinfo (Python 3.9+) vs pytz compatibility.
237
Respects USE_DEPRECATED_PYTZ setting for explicit pytz usage.
238
"""
239
```
240
241
## Usage Examples
242
243
### Token Cleanup Automation
244
245
```python
246
# Automated token cleanup with cron job
247
# Add to crontab: 0 2 * * * /path/to/venv/bin/python /path/to/manage.py cleartokens
248
249
# Manual cleanup
250
python manage.py cleartokens
251
252
# Programmatic cleanup
253
from oauth2_provider.models import clear_expired
254
255
def cleanup_oauth_tokens():
256
"""Clean up expired tokens programmatically"""
257
try:
258
clear_expired()
259
print("OAuth2 token cleanup completed successfully")
260
except Exception as e:
261
print(f"Token cleanup failed: {e}")
262
263
# In Django view or task
264
def maintenance_view(request):
265
if request.user.is_staff:
266
clear_expired()
267
return JsonResponse({'message': 'Cleanup completed'})
268
return JsonResponse({'error': 'Unauthorized'}, status=403)
269
```
270
271
### Application Creation Automation
272
273
```python
274
# Create applications via management command
275
#!/bin/bash
276
# deployment_script.sh
277
278
# Create web application
279
python manage.py createapplication confidential authorization-code \
280
--name "Production Web App" \
281
--redirect-uris "https://myapp.com/oauth/callback/" \
282
--user admin
283
284
# Create mobile application
285
python manage.py createapplication public authorization-code \
286
--name "Mobile App" \
287
--redirect-uris "myapp://oauth/callback/" \
288
--skip-authorization
289
290
# Create API client
291
python manage.py createapplication confidential client-credentials \
292
--name "API Integration" \
293
--user system
294
295
# Programmatic application creation
296
from oauth2_provider.models import Application
297
from django.contrib.auth import get_user_model
298
299
User = get_user_model()
300
301
def create_oauth_application(name, client_type, grant_type, redirect_uris=None, user=None):
302
"""Create OAuth2 application programmatically"""
303
304
application = Application.objects.create(
305
name=name,
306
client_type=client_type,
307
authorization_grant_type=grant_type,
308
user=user,
309
)
310
311
if redirect_uris:
312
application.redirect_uris = ' '.join(redirect_uris)
313
application.save()
314
315
return {
316
'client_id': application.client_id,
317
'client_secret': application.client_secret,
318
'name': application.name,
319
}
320
321
# Usage
322
admin_user = User.objects.get(username='admin')
323
app_info = create_oauth_application(
324
name="My API Client",
325
client_type=Application.CLIENT_CONFIDENTIAL,
326
grant_type=Application.GRANT_CLIENT_CREDENTIALS,
327
user=admin_user
328
)
329
print(f"Created application: {app_info}")
330
```
331
332
### Custom Token Generators
333
334
```python
335
# custom_generators.py
336
from oauth2_provider.generators import BaseHashGenerator
337
import secrets
338
import string
339
340
class SecureClientIdGenerator(BaseHashGenerator):
341
"""Enhanced security client ID generator"""
342
343
def hash(self):
344
# Use cryptographically secure random generation
345
alphabet = string.ascii_letters + string.digits
346
return ''.join(secrets.choice(alphabet) for _ in range(32))
347
348
class TimestampedSecretGenerator(BaseHashGenerator):
349
"""Client secret generator with timestamp prefix"""
350
351
def hash(self):
352
import time
353
timestamp = str(int(time.time()))
354
secret_part = secrets.token_urlsafe(64)
355
return f"{timestamp}_{secret_part}"
356
357
# settings.py
358
OAUTH2_PROVIDER = {
359
'CLIENT_ID_GENERATOR_CLASS': 'myapp.generators.SecureClientIdGenerator',
360
'CLIENT_SECRET_GENERATOR_CLASS': 'myapp.generators.TimestampedSecretGenerator',
361
}
362
363
# Usage
364
from oauth2_provider.generators import generate_client_id, generate_client_secret
365
366
client_id = generate_client_id() # Uses custom generator
367
client_secret = generate_client_secret() # Uses custom generator
368
```
369
370
### Maintenance Scripts
371
372
```python
373
# maintenance.py - OAuth2 maintenance utilities
374
from django.core.management.base import BaseCommand
375
from oauth2_provider.models import clear_expired, get_access_token_model, get_application_model
376
from django.utils import timezone
377
from datetime import timedelta
378
379
class Command(BaseCommand):
380
"""Custom maintenance command for OAuth2 system"""
381
382
help = 'Perform OAuth2 system maintenance'
383
384
def add_arguments(self, parser):
385
parser.add_argument('--cleanup', action='store_true', help='Clean expired tokens')
386
parser.add_argument('--stats', action='store_true', help='Show system statistics')
387
parser.add_argument('--audit', action='store_true', help='Audit token usage')
388
389
def handle(self, *args, **options):
390
if options['cleanup']:
391
self.cleanup_tokens()
392
393
if options['stats']:
394
self.show_statistics()
395
396
if options['audit']:
397
self.audit_tokens()
398
399
def cleanup_tokens(self):
400
"""Clean up expired tokens with reporting"""
401
self.stdout.write('Starting token cleanup...')
402
403
# Get counts before cleanup
404
AccessToken = get_access_token_model()
405
before_count = AccessToken.objects.count()
406
407
# Perform cleanup
408
clear_expired()
409
410
# Report results
411
after_count = AccessToken.objects.count()
412
cleaned = before_count - after_count
413
self.stdout.write(
414
self.style.SUCCESS(f'Cleaned {cleaned} expired tokens')
415
)
416
417
def show_statistics(self):
418
"""Show OAuth2 system statistics"""
419
Application = get_application_model()
420
AccessToken = get_access_token_model()
421
422
# Application statistics
423
total_apps = Application.objects.count()
424
active_apps = Application.objects.filter(
425
accesstoken__expires__gt=timezone.now()
426
).distinct().count()
427
428
# Token statistics
429
total_tokens = AccessToken.objects.count()
430
active_tokens = AccessToken.objects.filter(
431
expires__gt=timezone.now()
432
).count()
433
434
self.stdout.write(f'Applications: {total_apps} total, {active_apps} active')
435
self.stdout.write(f'Access Tokens: {total_tokens} total, {active_tokens} active')
436
437
def audit_tokens(self):
438
"""Audit token usage patterns"""
439
AccessToken = get_access_token_model()
440
441
# Find old but active tokens
442
week_ago = timezone.now() - timedelta(days=7)
443
old_tokens = AccessToken.objects.filter(
444
created__lt=week_ago,
445
expires__gt=timezone.now()
446
)
447
448
self.stdout.write(f'Found {old_tokens.count()} tokens older than 1 week but still active')
449
450
# Token usage by application
451
from django.db.models import Count
452
app_usage = AccessToken.objects.values(
453
'application__name'
454
).annotate(
455
token_count=Count('id')
456
).order_by('-token_count')[:10]
457
458
self.stdout.write('Top 10 applications by token count:')
459
for item in app_usage:
460
self.stdout.write(f" {item['application__name']}: {item['token_count']}")
461
```
462
463
### Monitoring and Health Checks
464
465
```python
466
# monitoring.py - OAuth2 system monitoring
467
from oauth2_provider.models import get_access_token_model, get_application_model
468
from django.utils import timezone
469
from datetime import timedelta
470
import logging
471
472
logger = logging.getLogger(__name__)
473
474
def oauth2_health_check():
475
"""Perform OAuth2 system health check"""
476
477
results = {
478
'status': 'healthy',
479
'checks': {}
480
}
481
482
try:
483
# Check database connectivity
484
AccessToken = get_access_token_model()
485
Application = get_application_model()
486
487
# Count active tokens
488
active_tokens = AccessToken.objects.filter(
489
expires__gt=timezone.now()
490
).count()
491
results['checks']['active_tokens'] = active_tokens
492
493
# Check for excessive expired tokens
494
total_tokens = AccessToken.objects.count()
495
if total_tokens > 0:
496
expired_ratio = 1 - (active_tokens / total_tokens)
497
if expired_ratio > 0.8: # More than 80% expired
498
results['status'] = 'warning'
499
results['checks']['expired_ratio'] = expired_ratio
500
logger.warning(f'High expired token ratio: {expired_ratio:.2%}')
501
502
# Check application health
503
total_apps = Application.objects.count()
504
results['checks']['total_applications'] = total_apps
505
506
# Check for recent token creation (system activity)
507
hour_ago = timezone.now() - timedelta(hours=1)
508
recent_tokens = AccessToken.objects.filter(created__gte=hour_ago).count()
509
results['checks']['recent_token_activity'] = recent_tokens
510
511
except Exception as e:
512
results['status'] = 'unhealthy'
513
results['error'] = str(e)
514
logger.error(f'OAuth2 health check failed: {e}')
515
516
return results
517
518
# Django view for health check endpoint
519
from django.http import JsonResponse
520
521
def oauth2_health_view(request):
522
"""Health check endpoint for OAuth2 system"""
523
health = oauth2_health_check()
524
525
status_code = 200
526
if health['status'] == 'warning':
527
status_code = 200 # Still operational
528
elif health['status'] == 'unhealthy':
529
status_code = 503 # Service unavailable
530
531
return JsonResponse(health, status=status_code)
532
```
533
534
### Batch Operations
535
536
```python
537
# batch_operations.py - Batch operations for OAuth2 entities
538
from oauth2_provider.models import get_access_token_model, get_application_model
539
from django.db import transaction
540
from django.utils import timezone
541
542
def batch_revoke_user_tokens(user, application=None):
543
"""Revoke all tokens for a specific user"""
544
545
AccessToken = get_access_token_model()
546
queryset = AccessToken.objects.filter(user=user)
547
548
if application:
549
queryset = queryset.filter(application=application)
550
551
with transaction.atomic():
552
count = 0
553
for token in queryset.iterator():
554
token.revoke()
555
count += 1
556
557
return count
558
559
def cleanup_inactive_applications():
560
"""Remove applications with no recent token activity"""
561
562
Application = get_application_model()
563
AccessToken = get_access_token_model()
564
565
# Find applications with no tokens in last 90 days
566
cutoff_date = timezone.now() - timezone.timedelta(days=90)
567
568
inactive_apps = Application.objects.exclude(
569
accesstoken__created__gte=cutoff_date
570
).distinct()
571
572
results = []
573
for app in inactive_apps:
574
# Only delete if no active tokens
575
if not AccessToken.objects.filter(
576
application=app,
577
expires__gt=timezone.now()
578
).exists():
579
results.append({
580
'name': app.name,
581
'client_id': app.client_id,
582
'deleted': True
583
})
584
app.delete()
585
else:
586
results.append({
587
'name': app.name,
588
'client_id': app.client_id,
589
'deleted': False,
590
'reason': 'has_active_tokens'
591
})
592
593
return results
594
595
def migrate_token_scopes(old_scope, new_scope):
596
"""Migrate tokens from old scope to new scope"""
597
598
AccessToken = get_access_token_model()
599
600
tokens = AccessToken.objects.filter(scope__contains=old_scope)
601
count = 0
602
603
for token in tokens:
604
scopes = token.scope.split()
605
if old_scope in scopes:
606
scopes = [new_scope if s == old_scope else s for s in scopes]
607
token.scope = ' '.join(scopes)
608
token.save()
609
count += 1
610
611
return count
612
```