or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

drf-integration.mdindex.mdmanagement-commands.mdmanagement-views.mdmodels.mdoauth2-endpoints.mdoidc.mdsettings.mdview-protection.md

management-commands.mddocs/

0

# Management Commands and Utilities

1

2

Django OAuth Toolkit provides Django management commands for OAuth2 administration and utility functions for common operations. These tools help with token cleanup, application creation, and system maintenance.

3

4

## Capabilities

5

6

### Token Cleanup Command

7

8

Django management command for cleaning up expired OAuth2 tokens and grants.

9

10

```python { .api }

11

class Command(BaseCommand):

12

"""

13

Management command: python manage.py cleartokens

14

15

Removes all expired tokens and grants from the database.

16

Safe for production use with batch processing to handle large datasets.

17

18

Usage:

19

python manage.py cleartokens

20

21

What it cleans:

22

- Expired refresh tokens (if REFRESH_TOKEN_EXPIRE_SECONDS is set)

23

- Revoked refresh tokens older than expiration threshold

24

- Access tokens without refresh tokens that have expired

25

- Expired ID tokens without associated access tokens

26

- Expired authorization grants

27

28

Batch Processing:

29

Uses CLEAR_EXPIRED_TOKENS_BATCH_SIZE and CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL

30

settings to process tokens in batches, preventing memory issues.

31

"""

32

33

help = "Can be run as a cronjob or directly to clean out expired tokens"

34

35

def handle(self, *args, **options):

36

"""Execute token cleanup process"""

37

```

38

39

### Application Creation Command

40

41

Django management command for programmatically creating OAuth2 applications.

42

43

```python { .api }

44

class Command(BaseCommand):

45

"""

46

Management command: python manage.py createapplication

47

48

Creates new OAuth2 client applications from command line.

49

Useful for automation, testing, and deployment scripts.

50

51

Usage:

52

python manage.py createapplication <client_type> <authorization_grant_type> [options]

53

54

Arguments:

55

client_type: 'confidential' or 'public'

56

authorization_grant_type: 'authorization-code', 'implicit', 'password',

57

'client-credentials', or 'openid-hybrid'

58

59

Options:

60

--client-id: Custom client ID (auto-generated if not provided)

61

--client-secret: Custom client secret (auto-generated if not provided)

62

--no-hash-client-secret: Don't hash the client secret on save

63

--name: Application name

64

--user: Username of application owner

65

--redirect-uris: Space-separated redirect URIs

66

--post-logout-redirect-uris: Space-separated OIDC logout redirect URIs

67

--skip-authorization: Skip user authorization prompt

68

--algorithm: OIDC signing algorithm ('', 'RS256', 'HS256')

69

70

Examples:

71

# Confidential web application

72

python manage.py createapplication confidential authorization-code \\

73

--name "My Web App" --redirect-uris "http://localhost:8000/callback/"

74

75

# Public mobile application

76

python manage.py createapplication public authorization-code \\

77

--name "Mobile App" --redirect-uris "myapp://callback/"

78

79

# Server-to-server application

80

python manage.py createapplication confidential client-credentials \\

81

--name "API Client"

82

"""

83

84

help = "Shortcut to create a new application in a programmatic way"

85

86

def add_arguments(self, parser):

87

"""Add command line arguments"""

88

89

def handle(self, *args, **options):

90

"""Create application with provided options"""

91

```

92

93

### Utility Functions

94

95

Core utility functions for OAuth2 operations and maintenance.

96

97

```python { .api }

98

def clear_expired() -> None:

99

"""

100

Remove all expired tokens and grants from database.

101

102

Processes in batches to handle large datasets efficiently:

103

- Respects CLEAR_EXPIRED_TOKENS_BATCH_SIZE setting

104

- Uses CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL for rate limiting

105

- Handles refresh tokens, access tokens, ID tokens, and grants

106

- Uses timezone-aware expiration checking

107

- Provides detailed logging of deletion counts

108

109

Safe for production use and can be called programmatically

110

or via management command.

111

"""

112

113

def generate_client_id() -> str:

114

"""

115

Generate OAuth2 client identifier.

116

117

Returns:

118

40-character client ID suitable for OAuth2 Basic Authentication

119

120

Uses CLIENT_ID_GENERATOR_CLASS setting for customization.

121

Default generates URL-safe random string without colons.

122

"""

123

124

def generate_client_secret() -> str:

125

"""

126

Generate OAuth2 client secret.

127

128

Returns:

129

Random client secret of configurable length

130

131

Uses CLIENT_SECRET_GENERATOR_CLASS and CLIENT_SECRET_GENERATOR_LENGTH

132

settings for customization. Default is 128-character random string.

133

"""

134

135

def get_application_model():

136

"""

137

Get the active Application model class.

138

139

Returns:

140

Application model class (supports swappable models)

141

142

Respects OAUTH2_PROVIDER_APPLICATION_MODEL setting for custom models.

143

"""

144

145

def get_access_token_model():

146

"""Get the active AccessToken model class"""

147

148

def get_refresh_token_model():

149

"""Get the active RefreshToken model class"""

150

151

def get_grant_model():

152

"""Get the active Grant model class"""

153

154

def get_id_token_model():

155

"""Get the active IDToken model class"""

156

```

157

158

### Generator Classes

159

160

Base classes and implementations for generating OAuth2 identifiers and secrets.

161

162

```python { .api }

163

class BaseHashGenerator:

164

"""

165

Base class for OAuth2 token generators.

166

167

All custom generators should extend this class and override hash() method.

168

Provides consistent interface for generating client IDs, secrets, and tokens.

169

"""

170

171

def hash(self) -> str:

172

"""

173

Generate hash/token value.

174

175

Returns:

176

Generated string value

177

178

Raises:

179

NotImplementedError: Must be implemented by subclasses

180

"""

181

182

class ClientIdGenerator(BaseHashGenerator):

183

"""

184

Default OAuth2 client ID generator.

185

186

Generates 40-character client ID without colon characters

187

to comply with RFC 2617 Basic Authentication requirements.

188

Uses Unicode ASCII character set for maximum compatibility.

189

"""

190

191

def hash(self) -> str:

192

"""Generate RFC-compliant client ID"""

193

194

class ClientSecretGenerator(BaseHashGenerator):

195

"""

196

Default OAuth2 client secret generator.

197

198

Generates client secret of configurable length using

199

CLIENT_SECRET_GENERATOR_LENGTH setting (default: 128 characters).

200

Uses Unicode ASCII character set for secure random generation.

201

"""

202

203

def hash(self) -> str:

204

"""Generate secure client secret"""

205

```

206

207

### JWT and Cryptographic Utilities

208

209

Utility functions for JWT token handling and cryptographic operations.

210

211

```python { .api }

212

def jwk_from_pem(pem_string: str):

213

"""

214

Convert PEM private key to JSON Web Key (cached).

215

216

Args:

217

pem_string: PEM-formatted private key string

218

219

Returns:

220

JWK object for OIDC token signing

221

222

Uses LRU cache for performance as PEM conversion is expensive

223

for large keys (especially RSA). Cache persists across requests.

224

"""

225

226

def get_timezone(time_zone: str):

227

"""

228

Get timezone info object for specified timezone.

229

230

Args:

231

time_zone: Timezone name (e.g., 'UTC', 'America/New_York')

232

233

Returns:

234

Timezone info object (zoneinfo.ZoneInfo or pytz timezone)

235

236

Automatically handles zoneinfo (Python 3.9+) vs pytz compatibility.

237

Respects USE_DEPRECATED_PYTZ setting for explicit pytz usage.

238

"""

239

```

240

241

## Usage Examples

242

243

### Token Cleanup Automation

244

245

```python

246

# Automated token cleanup with cron job

247

# Add to crontab: 0 2 * * * /path/to/venv/bin/python /path/to/manage.py cleartokens

248

249

# Manual cleanup

250

python manage.py cleartokens

251

252

# Programmatic cleanup

253

from oauth2_provider.models import clear_expired

254

255

def cleanup_oauth_tokens():

256

"""Clean up expired tokens programmatically"""

257

try:

258

clear_expired()

259

print("OAuth2 token cleanup completed successfully")

260

except Exception as e:

261

print(f"Token cleanup failed: {e}")

262

263

# In Django view or task

264

def maintenance_view(request):

265

if request.user.is_staff:

266

clear_expired()

267

return JsonResponse({'message': 'Cleanup completed'})

268

return JsonResponse({'error': 'Unauthorized'}, status=403)

269

```

270

271

### Application Creation Automation

272

273

```python

274

# Create applications via management command

275

#!/bin/bash

276

# deployment_script.sh

277

278

# Create web application

279

python manage.py createapplication confidential authorization-code \

280

--name "Production Web App" \

281

--redirect-uris "https://myapp.com/oauth/callback/" \

282

--user admin

283

284

# Create mobile application

285

python manage.py createapplication public authorization-code \

286

--name "Mobile App" \

287

--redirect-uris "myapp://oauth/callback/" \

288

--skip-authorization

289

290

# Create API client

291

python manage.py createapplication confidential client-credentials \

292

--name "API Integration" \

293

--user system

294

295

# Programmatic application creation

296

from oauth2_provider.models import Application

297

from django.contrib.auth import get_user_model

298

299

User = get_user_model()

300

301

def create_oauth_application(name, client_type, grant_type, redirect_uris=None, user=None):

302

"""Create OAuth2 application programmatically"""

303

304

application = Application.objects.create(

305

name=name,

306

client_type=client_type,

307

authorization_grant_type=grant_type,

308

user=user,

309

)

310

311

if redirect_uris:

312

application.redirect_uris = ' '.join(redirect_uris)

313

application.save()

314

315

return {

316

'client_id': application.client_id,

317

'client_secret': application.client_secret,

318

'name': application.name,

319

}

320

321

# Usage

322

admin_user = User.objects.get(username='admin')

323

app_info = create_oauth_application(

324

name="My API Client",

325

client_type=Application.CLIENT_CONFIDENTIAL,

326

grant_type=Application.GRANT_CLIENT_CREDENTIALS,

327

user=admin_user

328

)

329

print(f"Created application: {app_info}")

330

```

331

332

### Custom Token Generators

333

334

```python

335

# custom_generators.py

336

from oauth2_provider.generators import BaseHashGenerator

337

import secrets

338

import string

339

340

class SecureClientIdGenerator(BaseHashGenerator):

341

"""Enhanced security client ID generator"""

342

343

def hash(self):

344

# Use cryptographically secure random generation

345

alphabet = string.ascii_letters + string.digits

346

return ''.join(secrets.choice(alphabet) for _ in range(32))

347

348

class TimestampedSecretGenerator(BaseHashGenerator):

349

"""Client secret generator with timestamp prefix"""

350

351

def hash(self):

352

import time

353

timestamp = str(int(time.time()))

354

secret_part = secrets.token_urlsafe(64)

355

return f"{timestamp}_{secret_part}"

356

357

# settings.py

358

OAUTH2_PROVIDER = {

359

'CLIENT_ID_GENERATOR_CLASS': 'myapp.generators.SecureClientIdGenerator',

360

'CLIENT_SECRET_GENERATOR_CLASS': 'myapp.generators.TimestampedSecretGenerator',

361

}

362

363

# Usage

364

from oauth2_provider.generators import generate_client_id, generate_client_secret

365

366

client_id = generate_client_id() # Uses custom generator

367

client_secret = generate_client_secret() # Uses custom generator

368

```

369

370

### Maintenance Scripts

371

372

```python

373

# maintenance.py - OAuth2 maintenance utilities

374

from django.core.management.base import BaseCommand

375

from oauth2_provider.models import clear_expired, get_access_token_model, get_application_model

376

from django.utils import timezone

377

from datetime import timedelta

378

379

class Command(BaseCommand):

380

"""Custom maintenance command for OAuth2 system"""

381

382

help = 'Perform OAuth2 system maintenance'

383

384

def add_arguments(self, parser):

385

parser.add_argument('--cleanup', action='store_true', help='Clean expired tokens')

386

parser.add_argument('--stats', action='store_true', help='Show system statistics')

387

parser.add_argument('--audit', action='store_true', help='Audit token usage')

388

389

def handle(self, *args, **options):

390

if options['cleanup']:

391

self.cleanup_tokens()

392

393

if options['stats']:

394

self.show_statistics()

395

396

if options['audit']:

397

self.audit_tokens()

398

399

def cleanup_tokens(self):

400

"""Clean up expired tokens with reporting"""

401

self.stdout.write('Starting token cleanup...')

402

403

# Get counts before cleanup

404

AccessToken = get_access_token_model()

405

before_count = AccessToken.objects.count()

406

407

# Perform cleanup

408

clear_expired()

409

410

# Report results

411

after_count = AccessToken.objects.count()

412

cleaned = before_count - after_count

413

self.stdout.write(

414

self.style.SUCCESS(f'Cleaned {cleaned} expired tokens')

415

)

416

417

def show_statistics(self):

418

"""Show OAuth2 system statistics"""

419

Application = get_application_model()

420

AccessToken = get_access_token_model()

421

422

# Application statistics

423

total_apps = Application.objects.count()

424

active_apps = Application.objects.filter(

425

accesstoken__expires__gt=timezone.now()

426

).distinct().count()

427

428

# Token statistics

429

total_tokens = AccessToken.objects.count()

430

active_tokens = AccessToken.objects.filter(

431

expires__gt=timezone.now()

432

).count()

433

434

self.stdout.write(f'Applications: {total_apps} total, {active_apps} active')

435

self.stdout.write(f'Access Tokens: {total_tokens} total, {active_tokens} active')

436

437

def audit_tokens(self):

438

"""Audit token usage patterns"""

439

AccessToken = get_access_token_model()

440

441

# Find old but active tokens

442

week_ago = timezone.now() - timedelta(days=7)

443

old_tokens = AccessToken.objects.filter(

444

created__lt=week_ago,

445

expires__gt=timezone.now()

446

)

447

448

self.stdout.write(f'Found {old_tokens.count()} tokens older than 1 week but still active')

449

450

# Token usage by application

451

from django.db.models import Count

452

app_usage = AccessToken.objects.values(

453

'application__name'

454

).annotate(

455

token_count=Count('id')

456

).order_by('-token_count')[:10]

457

458

self.stdout.write('Top 10 applications by token count:')

459

for item in app_usage:

460

self.stdout.write(f" {item['application__name']}: {item['token_count']}")

461

```

462

463

### Monitoring and Health Checks

464

465

```python

466

# monitoring.py - OAuth2 system monitoring

467

from oauth2_provider.models import get_access_token_model, get_application_model

468

from django.utils import timezone

469

from datetime import timedelta

470

import logging

471

472

logger = logging.getLogger(__name__)

473

474

def oauth2_health_check():

475

"""Perform OAuth2 system health check"""

476

477

results = {

478

'status': 'healthy',

479

'checks': {}

480

}

481

482

try:

483

# Check database connectivity

484

AccessToken = get_access_token_model()

485

Application = get_application_model()

486

487

# Count active tokens

488

active_tokens = AccessToken.objects.filter(

489

expires__gt=timezone.now()

490

).count()

491

results['checks']['active_tokens'] = active_tokens

492

493

# Check for excessive expired tokens

494

total_tokens = AccessToken.objects.count()

495

if total_tokens > 0:

496

expired_ratio = 1 - (active_tokens / total_tokens)

497

if expired_ratio > 0.8: # More than 80% expired

498

results['status'] = 'warning'

499

results['checks']['expired_ratio'] = expired_ratio

500

logger.warning(f'High expired token ratio: {expired_ratio:.2%}')

501

502

# Check application health

503

total_apps = Application.objects.count()

504

results['checks']['total_applications'] = total_apps

505

506

# Check for recent token creation (system activity)

507

hour_ago = timezone.now() - timedelta(hours=1)

508

recent_tokens = AccessToken.objects.filter(created__gte=hour_ago).count()

509

results['checks']['recent_token_activity'] = recent_tokens

510

511

except Exception as e:

512

results['status'] = 'unhealthy'

513

results['error'] = str(e)

514

logger.error(f'OAuth2 health check failed: {e}')

515

516

return results

517

518

# Django view for health check endpoint

519

from django.http import JsonResponse

520

521

def oauth2_health_view(request):

522

"""Health check endpoint for OAuth2 system"""

523

health = oauth2_health_check()

524

525

status_code = 200

526

if health['status'] == 'warning':

527

status_code = 200 # Still operational

528

elif health['status'] == 'unhealthy':

529

status_code = 503 # Service unavailable

530

531

return JsonResponse(health, status=status_code)

532

```

533

534

### Batch Operations

535

536

```python

537

# batch_operations.py - Batch operations for OAuth2 entities

538

from oauth2_provider.models import get_access_token_model, get_application_model

539

from django.db import transaction

540

from django.utils import timezone

541

542

def batch_revoke_user_tokens(user, application=None):

543

"""Revoke all tokens for a specific user"""

544

545

AccessToken = get_access_token_model()

546

queryset = AccessToken.objects.filter(user=user)

547

548

if application:

549

queryset = queryset.filter(application=application)

550

551

with transaction.atomic():

552

count = 0

553

for token in queryset.iterator():

554

token.revoke()

555

count += 1

556

557

return count

558

559

def cleanup_inactive_applications():

560

"""Remove applications with no recent token activity"""

561

562

Application = get_application_model()

563

AccessToken = get_access_token_model()

564

565

# Find applications with no tokens in last 90 days

566

cutoff_date = timezone.now() - timezone.timedelta(days=90)

567

568

inactive_apps = Application.objects.exclude(

569

accesstoken__created__gte=cutoff_date

570

).distinct()

571

572

results = []

573

for app in inactive_apps:

574

# Only delete if no active tokens

575

if not AccessToken.objects.filter(

576

application=app,

577

expires__gt=timezone.now()

578

).exists():

579

results.append({

580

'name': app.name,

581

'client_id': app.client_id,

582

'deleted': True

583

})

584

app.delete()

585

else:

586

results.append({

587

'name': app.name,

588

'client_id': app.client_id,

589

'deleted': False,

590

'reason': 'has_active_tokens'

591

})

592

593

return results

594

595

def migrate_token_scopes(old_scope, new_scope):

596

"""Migrate tokens from old scope to new scope"""

597

598

AccessToken = get_access_token_model()

599

600

tokens = AccessToken.objects.filter(scope__contains=old_scope)

601

count = 0

602

603

for token in tokens:

604

scopes = token.scope.split()

605

if old_scope in scopes:

606

scopes = [new_scope if s == old_scope else s for s in scopes]

607

token.scope = ' '.join(scopes)

608

token.save()

609

count += 1

610

611

return count

612

```