0
# Authentication Framework
1
2
Starlette provides a flexible authentication framework that supports various authentication methods through pluggable backends, with built-in decorators for authorization and user management.
3
4
## Authentication Components
5
6
### Authentication Backend
7
8
```python { .api }
9
from starlette.authentication import AuthenticationBackend, AuthCredentials, BaseUser
10
from starlette.requests import HTTPConnection
11
from typing import Optional, Tuple
12
13
class AuthenticationBackend:
14
"""
15
Abstract base class for authentication backends.
16
17
Authentication backends are responsible for:
18
- Examining request credentials (headers, cookies, etc.)
19
- Validating credentials against user store
20
- Returning user and credential information
21
"""
22
23
async def authenticate(
24
self,
25
conn: HTTPConnection
26
) -> Optional[Tuple[AuthCredentials, BaseUser]]:
27
"""
28
Authenticate the request.
29
30
Args:
31
conn: HTTP connection (Request or WebSocket)
32
33
Returns:
34
Tuple of (AuthCredentials, BaseUser) if authenticated, None otherwise
35
36
Raises:
37
AuthenticationError: For authentication failures that should
38
return 401/403 responses
39
"""
40
raise NotImplementedError()
41
```
42
43
### Authentication Credentials
44
45
```python { .api }
46
from typing import List
47
48
class AuthCredentials:
49
"""
50
Authentication credentials containing permission scopes.
51
52
Represents the permissions and scopes granted to an authenticated user.
53
"""
54
55
def __init__(self, scopes: List[str] = None) -> None:
56
"""
57
Initialize authentication credentials.
58
59
Args:
60
scopes: List of permission scopes (e.g., ["read", "write"])
61
"""
62
self.scopes = list(scopes or [])
63
```
64
65
### User Classes
66
67
```python { .api }
68
class BaseUser:
69
"""
70
Abstract base class for user objects.
71
72
Defines the interface that user objects must implement
73
to work with Starlette's authentication system.
74
"""
75
76
@property
77
def is_authenticated(self) -> bool:
78
"""Whether the user is authenticated."""
79
raise NotImplementedError()
80
81
@property
82
def display_name(self) -> str:
83
"""Display name for the user."""
84
raise NotImplementedError()
85
86
@property
87
def identity(self) -> str:
88
"""Unique identifier for the user."""
89
raise NotImplementedError()
90
91
class SimpleUser(BaseUser):
92
"""
93
Simple authenticated user implementation.
94
95
Basic user class for applications that only need username-based authentication.
96
"""
97
98
def __init__(self, username: str) -> None:
99
"""
100
Initialize simple user.
101
102
Args:
103
username: Username/identifier for the user
104
"""
105
self.username = username
106
107
@property
108
def is_authenticated(self) -> bool:
109
"""Always True for SimpleUser."""
110
return True
111
112
@property
113
def display_name(self) -> str:
114
"""Returns the username."""
115
return self.username
116
117
@property
118
def identity(self) -> str:
119
"""Returns the username."""
120
return self.username
121
122
class UnauthenticatedUser(BaseUser):
123
"""
124
Unauthenticated user implementation.
125
126
Represents users who have not been authenticated or have invalid credentials.
127
"""
128
129
@property
130
def is_authenticated(self) -> bool:
131
"""Always False for UnauthenticatedUser."""
132
return False
133
134
@property
135
def display_name(self) -> str:
136
"""Empty string for unauthenticated users."""
137
return ""
138
139
@property
140
def identity(self) -> str:
141
"""Empty string for unauthenticated users."""
142
return ""
143
```
144
145
### Authentication Utilities
146
147
```python { .api }
148
from starlette.requests import Request
149
from starlette.responses import Response, JSONResponse, RedirectResponse
150
from typing import Union, List, Callable
151
152
def has_required_scope(conn: HTTPConnection, scopes: List[str]) -> bool:
153
"""
154
Check if connection has required scopes.
155
156
Args:
157
conn: HTTP connection (Request or WebSocket)
158
scopes: Required permission scopes
159
160
Returns:
161
bool: True if user has all required scopes
162
"""
163
for scope in scopes:
164
if scope not in conn.auth.scopes:
165
return False
166
return True
167
168
def requires(
169
scopes: Union[str, List[str]],
170
status_code: int = 403,
171
redirect: str = None,
172
) -> Callable:
173
"""
174
Decorator to require authentication scopes.
175
176
Args:
177
scopes: Required scope(s) - string or list of strings
178
status_code: HTTP status code for authorization failure
179
redirect: URL to redirect unauthorized users (instead of error)
180
181
Returns:
182
Decorator function for endpoints
183
184
Usage:
185
@requires("authenticated")
186
@requires(["read", "write"])
187
@requires("admin", redirect="/login")
188
"""
189
if isinstance(scopes, str):
190
scopes = [scopes]
191
192
def decorator(func):
193
async def wrapper(*args, **kwargs):
194
# Extract request from arguments
195
request = None
196
for arg in args:
197
if isinstance(arg, Request):
198
request = arg
199
break
200
201
if not request:
202
raise ValueError("@requires decorator requires Request argument")
203
204
# Check authentication
205
if not request.user.is_authenticated:
206
if redirect:
207
return RedirectResponse(redirect, status_code=307)
208
return JSONResponse(
209
{"error": "Authentication required"},
210
status_code=401
211
)
212
213
# Check authorization
214
if not has_required_scope(request, scopes):
215
if redirect:
216
return RedirectResponse(redirect, status_code=307)
217
return JSONResponse(
218
{"error": "Insufficient permissions"},
219
status_code=status_code
220
)
221
222
# Call original function
223
return await func(*args, **kwargs)
224
225
return wrapper
226
return decorator
227
```
228
229
### Authentication Exceptions
230
231
```python { .api }
232
class AuthenticationError(Exception):
233
"""
234
Exception raised for authentication errors.
235
236
Used by authentication backends to signal authentication
237
failures that should result in 401/403 responses.
238
"""
239
pass
240
```
241
242
## Setting Up Authentication
243
244
### Basic Authentication Middleware Setup
245
246
```python { .api }
247
from starlette.applications import Starlette
248
from starlette.middleware import Middleware
249
from starlette.middleware.authentication import AuthenticationMiddleware
250
251
# Create authentication backend
252
class MyAuthBackend(AuthenticationBackend):
253
async def authenticate(self, conn):
254
# Implementation details...
255
pass
256
257
# Configure application with authentication
258
middleware = [
259
Middleware(AuthenticationMiddleware, backend=MyAuthBackend()),
260
]
261
262
app = Starlette(
263
routes=routes,
264
middleware=middleware,
265
)
266
267
# Now request.user and request.auth are available in all endpoints
268
async def protected_endpoint(request):
269
if request.user.is_authenticated:
270
return JSONResponse({"user": request.user.display_name})
271
else:
272
return JSONResponse({"error": "Not authenticated"}, status_code=401)
273
```
274
275
## Authentication Backends
276
277
### API Key Authentication
278
279
```python { .api }
280
import secrets
281
from starlette.authentication import AuthenticationBackend, AuthCredentials, SimpleUser
282
283
class APIKeyAuthBackend(AuthenticationBackend):
284
"""API key-based authentication backend."""
285
286
def __init__(self, api_keys: dict[str, dict] = None):
287
# In production, store in database
288
self.api_keys = api_keys or {}
289
290
async def authenticate(self, conn):
291
# Check for API key in header
292
api_key = conn.headers.get("X-API-Key")
293
if not api_key:
294
return None
295
296
# Validate API key
297
user_data = self.api_keys.get(api_key)
298
if not user_data:
299
return None
300
301
# Return credentials and user
302
credentials = AuthCredentials(user_data.get("scopes", []))
303
user = SimpleUser(user_data["username"])
304
305
return credentials, user
306
307
# Usage
308
api_keys = {
309
"sk_test_123": {"username": "testuser", "scopes": ["read"]},
310
"sk_live_456": {"username": "liveuser", "scopes": ["read", "write"]},
311
}
312
313
backend = APIKeyAuthBackend(api_keys)
314
app.add_middleware(AuthenticationMiddleware, backend=backend)
315
```
316
317
### JWT Token Authentication
318
319
```python { .api }
320
import jwt
321
from datetime import datetime, timedelta
322
from starlette.authentication import AuthenticationBackend, AuthCredentials
323
324
class JWTAuthBackend(AuthenticationBackend):
325
"""JWT token-based authentication backend."""
326
327
def __init__(self, secret_key: str, algorithm: str = "HS256"):
328
self.secret_key = secret_key
329
self.algorithm = algorithm
330
331
async def authenticate(self, conn):
332
# Get token from Authorization header
333
authorization = conn.headers.get("Authorization")
334
if not authorization:
335
return None
336
337
try:
338
scheme, token = authorization.split(" ", 1)
339
if scheme.lower() != "bearer":
340
return None
341
except ValueError:
342
return None
343
344
try:
345
# Decode and validate token
346
payload = jwt.decode(
347
token,
348
self.secret_key,
349
algorithms=[self.algorithm]
350
)
351
352
# Check expiration
353
exp = payload.get("exp")
354
if exp and datetime.utcfromtimestamp(exp) < datetime.utcnow():
355
return None
356
357
# Create user and credentials
358
username = payload.get("sub")
359
scopes = payload.get("scopes", [])
360
361
credentials = AuthCredentials(scopes)
362
user = SimpleUser(username)
363
364
return credentials, user
365
366
except jwt.InvalidTokenError:
367
return None
368
369
# Create and sign JWT tokens
370
class JWTManager:
371
def __init__(self, secret_key: str, algorithm: str = "HS256"):
372
self.secret_key = secret_key
373
self.algorithm = algorithm
374
375
def create_token(
376
self,
377
username: str,
378
scopes: list[str] = None,
379
expires_delta: timedelta = None
380
) -> str:
381
"""Create a JWT token for user."""
382
if expires_delta is None:
383
expires_delta = timedelta(hours=24)
384
385
payload = {
386
"sub": username,
387
"scopes": scopes or [],
388
"exp": datetime.utcnow() + expires_delta,
389
"iat": datetime.utcnow(),
390
}
391
392
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
393
394
# Usage
395
jwt_manager = JWTManager("your-secret-key")
396
backend = JWTAuthBackend("your-secret-key")
397
app.add_middleware(AuthenticationMiddleware, backend=backend)
398
399
# Login endpoint to issue tokens
400
async def login(request):
401
data = await request.json()
402
username = data.get("username")
403
password = data.get("password")
404
405
# Validate credentials (implement your logic)
406
if await validate_credentials(username, password):
407
# Create token
408
token = jwt_manager.create_token(
409
username=username,
410
scopes=["read", "write"]
411
)
412
return JSONResponse({"access_token": token, "token_type": "bearer"})
413
else:
414
return JSONResponse({"error": "Invalid credentials"}, status_code=401)
415
```
416
417
### Session-based Authentication
418
419
```python { .api }
420
from starlette.middleware.sessions import SessionMiddleware
421
from starlette.authentication import AuthenticationBackend, AuthCredentials
422
423
class SessionAuthBackend(AuthenticationBackend):
424
"""Session-based authentication backend."""
425
426
async def authenticate(self, conn):
427
# Check if user is logged in via session
428
user_id = conn.session.get("user_id")
429
if not user_id:
430
return None
431
432
# Load user from database (implement your logic)
433
user_data = await load_user_by_id(user_id)
434
if not user_data:
435
# Clear invalid session
436
conn.session.clear()
437
return None
438
439
# Create credentials and user
440
credentials = AuthCredentials(user_data.get("scopes", ["authenticated"]))
441
user = SimpleUser(user_data["username"])
442
443
return credentials, user
444
445
# Setup with session middleware (order matters!)
446
middleware = [
447
Middleware(SessionMiddleware, secret_key="session-secret"),
448
Middleware(AuthenticationMiddleware, backend=SessionAuthBackend()),
449
]
450
451
# Login endpoint
452
async def login(request):
453
data = await request.form()
454
username = data.get("username")
455
password = data.get("password")
456
457
# Validate credentials
458
user = await authenticate_user(username, password)
459
if user:
460
# Store user ID in session
461
request.session["user_id"] = user.id
462
return RedirectResponse("/dashboard", status_code=302)
463
else:
464
return JSONResponse({"error": "Invalid credentials"}, status_code=401)
465
466
# Logout endpoint
467
async def logout(request):
468
request.session.clear()
469
return RedirectResponse("/", status_code=302)
470
```
471
472
### Multi-Backend Authentication
473
474
```python { .api }
475
class MultiAuthBackend(AuthenticationBackend):
476
"""Support multiple authentication methods."""
477
478
def __init__(self, backends: list[AuthenticationBackend]):
479
self.backends = backends
480
481
async def authenticate(self, conn):
482
# Try each backend in order
483
for backend in self.backends:
484
result = await backend.authenticate(conn)
485
if result is not None:
486
return result
487
488
return None
489
490
# Combine multiple backends
491
api_key_backend = APIKeyAuthBackend(api_keys)
492
jwt_backend = JWTAuthBackend("secret-key")
493
session_backend = SessionAuthBackend()
494
495
multi_backend = MultiAuthBackend([
496
api_key_backend, # Try API key first
497
jwt_backend, # Then JWT token
498
session_backend, # Finally session
499
])
500
501
app.add_middleware(AuthenticationMiddleware, backend=multi_backend)
502
```
503
504
## Authorization with Decorators
505
506
### Basic Authorization
507
508
```python { .api }
509
from starlette.authentication import requires
510
511
# Require authentication
512
@requires("authenticated")
513
async def protected_endpoint(request):
514
return JSONResponse({"user": request.user.display_name})
515
516
# Require specific scopes
517
@requires(["read", "write"])
518
async def admin_endpoint(request):
519
return JSONResponse({"message": "Admin access granted"})
520
521
# Single scope
522
@requires("admin")
523
async def super_admin_endpoint(request):
524
return JSONResponse({"message": "Super admin access"})
525
526
# Custom status code
527
@requires("premium", status_code=402) # Payment Required
528
async def premium_feature(request):
529
return JSONResponse({"feature": "premium content"})
530
531
# Redirect instead of error
532
@requires("authenticated", redirect="/login")
533
async def dashboard(request):
534
return HTMLResponse("<h1>User Dashboard</h1>")
535
```
536
537
### Custom Authorization Decorators
538
539
```python { .api }
540
from functools import wraps
541
542
def require_user(user_id_param: str = "user_id"):
543
"""Require user to match path parameter."""
544
def decorator(func):
545
@wraps(func)
546
async def wrapper(request):
547
if not request.user.is_authenticated:
548
return JSONResponse({"error": "Authentication required"}, status_code=401)
549
550
# Check if user matches path parameter
551
path_user_id = request.path_params.get(user_id_param)
552
if path_user_id != request.user.identity:
553
return JSONResponse({"error": "Access denied"}, status_code=403)
554
555
return await func(request)
556
return wrapper
557
return decorator
558
559
def require_role(role: str):
560
"""Require user to have specific role."""
561
def decorator(func):
562
@wraps(func)
563
async def wrapper(request):
564
if not request.user.is_authenticated:
565
return JSONResponse({"error": "Authentication required"}, status_code=401)
566
567
# Check user role (requires custom user class)
568
if not hasattr(request.user, 'role') or request.user.role != role:
569
return JSONResponse({"error": "Insufficient permissions"}, status_code=403)
570
571
return await func(request)
572
return wrapper
573
return decorator
574
575
# Usage
576
@require_user("user_id")
577
async def user_profile(request):
578
# Only allows access to /users/{user_id} if user_id matches authenticated user
579
return JSONResponse({"profile": "user data"})
580
581
@require_role("admin")
582
async def admin_panel(request):
583
return JSONResponse({"message": "Admin panel"})
584
```
585
586
## Advanced User Classes
587
588
### Rich User Implementation
589
590
```python { .api }
591
from dataclasses import dataclass
592
from typing import List, Optional
593
from datetime import datetime
594
595
@dataclass
596
class User(BaseUser):
597
"""Rich user implementation with additional properties."""
598
599
id: int
600
username: str
601
email: str
602
full_name: str
603
role: str
604
permissions: List[str]
605
is_active: bool = True
606
last_login: Optional[datetime] = None
607
608
@property
609
def is_authenticated(self) -> bool:
610
return self.is_active
611
612
@property
613
def display_name(self) -> str:
614
return self.full_name or self.username
615
616
@property
617
def identity(self) -> str:
618
return str(self.id)
619
620
def has_permission(self, permission: str) -> bool:
621
"""Check if user has specific permission."""
622
return permission in self.permissions
623
624
def is_admin(self) -> bool:
625
"""Check if user is an administrator."""
626
return self.role == "admin"
627
628
def is_staff(self) -> bool:
629
"""Check if user is staff member."""
630
return self.role in ["admin", "staff"]
631
632
class DatabaseUserBackend(AuthenticationBackend):
633
"""Authentication backend that loads full user data."""
634
635
async def authenticate(self, conn):
636
# Get user ID from token/session
637
user_id = await self.get_user_id(conn)
638
if not user_id:
639
return None
640
641
# Load full user data from database
642
user_data = await self.load_user_from_db(user_id)
643
if not user_data or not user_data["is_active"]:
644
return None
645
646
# Create rich user object
647
user = User(**user_data)
648
649
# Create credentials with user permissions
650
scopes = ["authenticated"] + user.permissions
651
credentials = AuthCredentials(scopes)
652
653
return credentials, user
654
655
async def get_user_id(self, conn) -> Optional[int]:
656
# Extract user ID from JWT token, session, etc.
657
pass
658
659
async def load_user_from_db(self, user_id: int) -> Optional[dict]:
660
# Load user data from database
661
pass
662
```
663
664
## Error Handling
665
666
### Authentication Error Handling
667
668
```python { .api }
669
from starlette.middleware.authentication import AuthenticationMiddleware
670
from starlette.responses import JSONResponse
671
672
async def auth_error_handler(conn, exc):
673
"""Custom authentication error handler."""
674
return JSONResponse(
675
{
676
"error": "Authentication failed",
677
"message": str(exc),
678
"type": exc.__class__.__name__
679
},
680
status_code=401,
681
headers={"WWW-Authenticate": "Bearer"}
682
)
683
684
# Use custom error handler
685
app.add_middleware(
686
AuthenticationMiddleware,
687
backend=auth_backend,
688
on_error=auth_error_handler
689
)
690
```
691
692
### Graceful Authentication Failure
693
694
```python { .api }
695
class GracefulAuthBackend(AuthenticationBackend):
696
"""Authentication backend that never raises exceptions."""
697
698
async def authenticate(self, conn):
699
try:
700
# Attempt authentication
701
return await self.do_authenticate(conn)
702
except Exception as e:
703
# Log error but don't raise
704
print(f"Authentication error: {e}")
705
return None
706
707
async def do_authenticate(self, conn):
708
# Actual authentication logic that might raise exceptions
709
pass
710
```
711
712
## Testing Authentication
713
714
### Testing with Authentication
715
716
```python { .api }
717
from starlette.testclient import TestClient
718
from starlette.applications import Starlette
719
720
def test_protected_endpoint():
721
app = Starlette(routes=routes, middleware=middleware)
722
client = TestClient(app)
723
724
# Test without authentication
725
response = client.get("/protected")
726
assert response.status_code == 401
727
728
# Test with API key
729
headers = {"X-API-Key": "valid-key"}
730
response = client.get("/protected", headers=headers)
731
assert response.status_code == 200
732
733
# Test with JWT token
734
token = create_test_token("testuser", ["read"])
735
headers = {"Authorization": f"Bearer {token}"}
736
response = client.get("/protected", headers=headers)
737
assert response.status_code == 200
738
739
def test_authorization():
740
client = TestClient(app)
741
742
# User with read scope
743
token = create_test_token("user", ["read"])
744
headers = {"Authorization": f"Bearer {token}"}
745
746
# Should succeed
747
response = client.get("/read-only", headers=headers)
748
assert response.status_code == 200
749
750
# Should fail (needs write scope)
751
response = client.post("/write-data", headers=headers)
752
assert response.status_code == 403
753
754
def create_test_token(username: str, scopes: List[str]) -> str:
755
"""Helper to create tokens for testing."""
756
return jwt_manager.create_token(username, scopes)
757
```
758
759
Starlette's authentication framework provides flexible, secure user authentication and authorization with support for multiple backends, rich user models, and comprehensive testing capabilities.