0
# Security and Authentication
1
2
Authentication and authorization systems including JWT authentication, session-based auth, and OAuth2 support. Litestar provides comprehensive security primitives for protecting routes and managing user sessions.
3
4
## Capabilities
5
6
### JWT Authentication
7
8
JSON Web Token-based authentication with support for various token storage methods and OAuth2 flows.
9
10
```python { .api }
11
class JWTAuth(BaseJWTAuth):
12
def __init__(
13
self,
14
token_secret: str,
15
retrieve_user_handler: UserHandlerProtocol,
16
*,
17
algorithm: str = "HS256",
18
auth_header: str = "Authorization",
19
default_token_expiration: timedelta = timedelta(days=1),
20
openapi_security_scheme_name: str = "BearerToken",
21
description: str = "JWT api-key authentication and authorization.",
22
authentication_middleware_class: type[AbstractAuthenticationMiddleware] = JWTAuthenticationMiddleware,
23
guards: Sequence[Guard] | None = None,
24
exclude: str | list[str] | None = None,
25
exclude_opt_key: str = "exclude_from_auth",
26
scopes: Scopes | None = None,
27
route_handlers: Sequence[BaseRouteHandler] | None = None,
28
dependencies: Dependencies | None = None,
29
middleware: Sequence[Middleware] | None = None,
30
):
31
"""
32
JWT Authentication configuration.
33
34
Parameters:
35
- token_secret: Secret key for signing tokens
36
- retrieve_user_handler: Function to retrieve user from token payload
37
- algorithm: JWT signing algorithm (HS256, RS256, etc.)
38
- auth_header: HTTP header containing the token
39
- default_token_expiration: Default token expiration time
40
- openapi_security_scheme_name: Name for OpenAPI security scheme
41
- description: Description for OpenAPI documentation
42
- authentication_middleware_class: Middleware class to use
43
- guards: Authorization guards
44
- exclude: Paths to exclude from authentication
45
- exclude_opt_key: Key for excluding specific routes
46
- scopes: OAuth2 scopes configuration
47
- route_handlers: Route handlers requiring authentication
48
- dependencies: Additional dependencies
49
- middleware: Additional middleware
50
"""
51
52
def create_token(
53
self,
54
identifier: str,
55
*,
56
exp: datetime | None = None,
57
sub: str | None = None,
58
aud: str | list[str] | None = None,
59
iss: str | None = None,
60
jti: str | None = None,
61
**kwargs: Any,
62
) -> str:
63
"""
64
Create a JWT token.
65
66
Parameters:
67
- identifier: Unique user identifier
68
- exp: Token expiration time
69
- sub: Token subject
70
- aud: Token audience
71
- iss: Token issuer
72
- jti: JWT ID
73
- **kwargs: Additional claims
74
75
Returns:
76
Encoded JWT token string
77
"""
78
79
def decode_token(self, encoded_token: str) -> dict[str, Any]:
80
"""
81
Decode and validate a JWT token.
82
83
Parameters:
84
- encoded_token: JWT token string
85
86
Returns:
87
Decoded token payload
88
"""
89
90
class JWTCookieAuth(JWTAuth):
91
def __init__(
92
self,
93
token_secret: str,
94
retrieve_user_handler: UserHandlerProtocol,
95
*,
96
key: str = "token",
97
path: str = "/",
98
domain: str | None = None,
99
secure: bool = True,
100
httponly: bool = True,
101
samesite: Literal["lax", "strict", "none"] = "lax",
102
**kwargs: Any,
103
):
104
"""
105
Cookie-based JWT authentication.
106
107
Parameters:
108
- token_secret: Secret key for signing tokens
109
- retrieve_user_handler: Function to retrieve user from token payload
110
- key: Cookie name for storing the token
111
- path: Cookie path
112
- domain: Cookie domain
113
- secure: Use secure cookies (HTTPS only)
114
- httponly: Use HTTP-only cookies
115
- samesite: SameSite cookie attribute
116
"""
117
118
def create_response_with_token(
119
self,
120
identifier: str,
121
response: Response | None = None,
122
**kwargs: Any,
123
) -> Response:
124
"""
125
Create a response with JWT token set as cookie.
126
127
Parameters:
128
- identifier: User identifier for token
129
- response: Existing response to modify (creates new if None)
130
- **kwargs: Additional token claims
131
132
Returns:
133
Response with token cookie set
134
"""
135
136
def create_logout_response(self, response: Response | None = None) -> Response:
137
"""
138
Create a response that clears the authentication cookie.
139
140
Parameters:
141
- response: Existing response to modify
142
143
Returns:
144
Response with cleared authentication cookie
145
"""
146
```
147
148
### OAuth2 Authentication
149
150
OAuth2 authentication flows including password bearer and authorization code flows.
151
152
```python { .api }
153
class OAuth2PasswordBearerAuth(JWTAuth):
154
def __init__(
155
self,
156
token_url: str,
157
token_secret: str,
158
retrieve_user_handler: UserHandlerProtocol,
159
*,
160
scopes: dict[str, str] | None = None,
161
**kwargs: Any,
162
):
163
"""
164
OAuth2 password bearer authentication.
165
166
Parameters:
167
- token_url: URL endpoint for token requests
168
- token_secret: Secret for signing tokens
169
- retrieve_user_handler: Function to retrieve user
170
- scopes: OAuth2 scopes mapping
171
"""
172
173
class OAuth2Login:
174
def __init__(
175
self,
176
*,
177
identifier: str,
178
exp: datetime | None = None,
179
sub: str | None = None,
180
aud: str | list[str] | None = None,
181
iss: str | None = None,
182
jti: str | None = None,
183
**kwargs: Any,
184
):
185
"""
186
OAuth2 login token configuration.
187
188
Parameters:
189
- identifier: User identifier
190
- exp: Expiration time
191
- sub: Subject
192
- aud: Audience
193
- iss: Issuer
194
- jti: JWT ID
195
"""
196
```
197
198
### Session Authentication
199
200
Session-based authentication using server-side session storage.
201
202
```python { .api }
203
class SessionAuth:
204
def __init__(
205
self,
206
retrieve_user_handler: UserHandlerProtocol,
207
session_backend: BaseSessionBackend,
208
*,
209
exclude: str | list[str] | None = None,
210
exclude_opt_key: str = "exclude_from_auth",
211
guards: Sequence[Guard] | None = None,
212
route_handlers: Sequence[BaseRouteHandler] | None = None,
213
dependencies: Dependencies | None = None,
214
middleware: Sequence[Middleware] | None = None,
215
):
216
"""
217
Session-based authentication.
218
219
Parameters:
220
- retrieve_user_handler: Function to retrieve user from session
221
- session_backend: Session storage backend
222
- exclude: Paths to exclude from authentication
223
- exclude_opt_key: Key for excluding specific routes
224
- guards: Authorization guards
225
- route_handlers: Route handlers requiring authentication
226
- dependencies: Additional dependencies
227
- middleware: Additional middleware
228
"""
229
230
async def login(
231
self,
232
identifier: str,
233
request: Request,
234
*,
235
response: Response | None = None,
236
**kwargs: Any,
237
) -> Response:
238
"""
239
Log in a user by creating a session.
240
241
Parameters:
242
- identifier: User identifier
243
- request: HTTP request object
244
- response: Existing response to modify
245
- **kwargs: Additional session data
246
247
Returns:
248
Response with session established
249
"""
250
251
async def logout(
252
self,
253
request: Request,
254
response: Response | None = None,
255
) -> Response:
256
"""
257
Log out a user by destroying the session.
258
259
Parameters:
260
- request: HTTP request object
261
- response: Existing response to modify
262
263
Returns:
264
Response with session cleared
265
"""
266
```
267
268
### Authentication Middleware
269
270
Base classes for implementing custom authentication middleware.
271
272
```python { .api }
273
class AbstractAuthenticationMiddleware(AbstractMiddleware):
274
def __init__(
275
self,
276
app: ASGIApp,
277
exclude: str | list[str] | None = None,
278
exclude_opt_key: str = "exclude_from_auth",
279
scopes: Scopes | None = None,
280
):
281
"""
282
Base authentication middleware.
283
284
Parameters:
285
- app: ASGI application
286
- exclude: Paths to exclude from authentication
287
- exclude_opt_key: Key for excluding specific routes
288
- scopes: OAuth2 scopes configuration
289
"""
290
291
async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:
292
"""
293
Authenticate an incoming request.
294
295
Parameters:
296
- connection: ASGI connection object
297
298
Returns:
299
Authentication result with user and auth information
300
"""
301
302
class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware):
303
"""JWT authentication middleware implementation."""
304
305
class JWTCookieAuthenticationMiddleware(AbstractAuthenticationMiddleware):
306
"""JWT cookie authentication middleware implementation."""
307
308
class SessionAuthMiddleware(AbstractAuthenticationMiddleware):
309
"""Session authentication middleware implementation."""
310
```
311
312
### Authorization and Guards
313
314
Authorization guards for protecting routes based on user permissions and roles.
315
316
```python { .api }
317
def guards(*guards: Guard) -> Callable[[T], T]:
318
"""
319
Decorator to apply authorization guards to route handlers.
320
321
Parameters:
322
- *guards: Guard functions to apply
323
324
Returns:
325
Decorator function
326
"""
327
328
# Common guard implementations
329
async def user_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
330
"""Guard that requires an authenticated user."""
331
if not connection.user:
332
raise NotAuthorizedException("Authentication required")
333
334
async def admin_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
335
"""Guard that requires admin privileges."""
336
if not connection.user or not getattr(connection.user, "is_admin", False):
337
raise PermissionDeniedException("Admin access required")
338
339
def role_required_guard(*required_roles: str) -> Guard:
340
"""
341
Create a guard that requires specific roles.
342
343
Parameters:
344
- *required_roles: Required role names
345
346
Returns:
347
Guard function
348
"""
349
async def guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
350
if not connection.user:
351
raise NotAuthorizedException("Authentication required")
352
353
user_roles = getattr(connection.user, "roles", [])
354
if not any(role in user_roles for role in required_roles):
355
raise PermissionDeniedException("Insufficient permissions")
356
357
return guard
358
```
359
360
### Authentication Result
361
362
Data structures for authentication results and user information.
363
364
```python { .api }
365
class AuthenticationResult:
366
def __init__(self, user: Any | None = None, auth: Any | None = None):
367
"""
368
Authentication result.
369
370
Parameters:
371
- user: Authenticated user object
372
- auth: Authentication information
373
"""
374
self.user = user
375
self.auth = auth
376
377
class Token:
378
def __init__(
379
self,
380
token: str,
381
*,
382
exp: datetime | None = None,
383
sub: str | None = None,
384
aud: str | list[str] | None = None,
385
iss: str | None = None,
386
jti: str | None = None,
387
**claims: Any,
388
):
389
"""
390
JWT token representation.
391
392
Parameters:
393
- token: Encoded token string
394
- exp: Expiration time
395
- sub: Subject
396
- aud: Audience
397
- iss: Issuer
398
- jti: JWT ID
399
- **claims: Additional token claims
400
"""
401
```
402
403
## Usage Examples
404
405
### Basic JWT Authentication
406
407
```python
408
from litestar import Litestar, get, post
409
from litestar.security.jwt import JWTAuth, Token
410
from litestar.connection import ASGIConnection
411
from dataclasses import dataclass
412
from datetime import datetime, timedelta
413
414
@dataclass
415
class User:
416
id: int
417
username: str
418
email: str
419
420
# User retrieval function
421
async def retrieve_user_handler(token: Token, connection: ASGIConnection) -> User | None:
422
# In a real app, you'd query your database
423
if token.sub == "123":
424
return User(id=123, username="alice", email="alice@example.com")
425
return None
426
427
# JWT configuration
428
jwt_auth = JWTAuth(
429
token_secret="your-secret-key",
430
retrieve_user_handler=retrieve_user_handler,
431
default_token_expiration=timedelta(hours=24),
432
)
433
434
@post("/login", exclude_from_auth=True)
435
async def login_handler(data: dict) -> dict:
436
# Validate credentials (simplified)
437
if data.get("username") == "alice" and data.get("password") == "secret":
438
token = jwt_auth.create_token(identifier="123")
439
return {"access_token": token, "token_type": "bearer"}
440
441
raise NotAuthorizedException("Invalid credentials")
442
443
@get("/profile")
444
async def get_profile(request: Request) -> dict:
445
# request.user is automatically populated by JWT middleware
446
user = request.user
447
return {
448
"id": user.id,
449
"username": user.username,
450
"email": user.email
451
}
452
453
app = Litestar(
454
route_handlers=[login_handler, get_profile],
455
on_app_init=[jwt_auth.on_app_init],
456
)
457
```
458
459
### Cookie-Based JWT Authentication
460
461
```python
462
from litestar.security.jwt import JWTCookieAuth
463
from litestar.response import Response
464
465
jwt_cookie_auth = JWTCookieAuth(
466
token_secret="your-secret-key",
467
retrieve_user_handler=retrieve_user_handler,
468
key="access_token",
469
httponly=True,
470
secure=True,
471
samesite="strict",
472
)
473
474
@post("/login", exclude_from_auth=True)
475
async def login_with_cookie(data: dict) -> Response:
476
if data.get("username") == "alice" and data.get("password") == "secret":
477
response = Response({"status": "logged in"})
478
return jwt_cookie_auth.create_response_with_token(
479
identifier="123",
480
response=response
481
)
482
483
raise NotAuthorizedException("Invalid credentials")
484
485
@post("/logout")
486
async def logout() -> Response:
487
response = Response({"status": "logged out"})
488
return jwt_cookie_auth.create_logout_response(response)
489
```
490
491
### Session Authentication
492
493
```python
494
from litestar.security.session_auth import SessionAuth
495
from litestar.middleware.session import SessionMiddleware
496
from litestar.stores.memory import MemoryStore
497
498
# Session storage
499
session_store = MemoryStore()
500
501
# Session middleware
502
session_middleware = SessionMiddleware(
503
store=session_store,
504
secret="session-secret-key",
505
)
506
507
# Session auth
508
session_auth = SessionAuth(
509
retrieve_user_handler=retrieve_user_handler,
510
session_backend=session_store,
511
)
512
513
@post("/login", exclude_from_auth=True)
514
async def session_login(request: Request, data: dict) -> Response:
515
if data.get("username") == "alice" and data.get("password") == "secret":
516
return await session_auth.login(
517
identifier="123",
518
request=request,
519
response=Response({"status": "logged in"})
520
)
521
522
raise NotAuthorizedException("Invalid credentials")
523
524
@post("/logout")
525
async def session_logout(request: Request) -> Response:
526
return await session_auth.logout(
527
request=request,
528
response=Response({"status": "logged out"})
529
)
530
531
app = Litestar(
532
route_handlers=[session_login, session_logout, get_profile],
533
middleware=[session_middleware],
534
on_app_init=[session_auth.on_app_init],
535
)
536
```
537
538
### OAuth2 Password Bearer
539
540
```python
541
from litestar.security.jwt import OAuth2PasswordBearerAuth
542
543
oauth2_auth = OAuth2PasswordBearerAuth(
544
token_url="/token",
545
token_secret="your-secret-key",
546
retrieve_user_handler=retrieve_user_handler,
547
scopes={
548
"read": "Read access",
549
"write": "Write access",
550
"admin": "Administrative access"
551
}
552
)
553
554
@post("/token", exclude_from_auth=True)
555
async def get_token(data: dict) -> dict:
556
# OAuth2 token endpoint
557
if data.get("username") == "alice" and data.get("password") == "secret":
558
token = oauth2_auth.create_token(
559
identifier="123",
560
scopes=["read", "write"]
561
)
562
return {
563
"access_token": token,
564
"token_type": "bearer",
565
"expires_in": 3600,
566
"scope": "read write"
567
}
568
569
raise NotAuthorizedException("Invalid credentials")
570
```
571
572
### Authorization Guards
573
574
```python
575
from litestar.security import guards
576
577
@dataclass
578
class User:
579
id: int
580
username: str
581
roles: list[str]
582
is_active: bool = True
583
584
async def admin_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
585
user = connection.user
586
if not user or "admin" not in user.roles:
587
raise PermissionDeniedException("Admin access required")
588
589
@get("/admin/users", guards=[admin_guard])
590
async def list_all_users() -> list[dict]:
591
# Only accessible by admins
592
return [{"id": 1, "username": "alice"}]
593
594
@get("/profile", guards=[user_required_guard])
595
async def get_user_profile(request: Request) -> dict:
596
# Requires any authenticated user
597
return {"user": request.user.username}
598
599
# Custom role-based guard
600
editor_guard = role_required_guard("editor", "admin")
601
602
@post("/content", guards=[editor_guard])
603
async def create_content(data: dict) -> dict:
604
# Requires editor or admin role
605
return {"status": "created", "content": data}
606
```
607
608
### Custom Authentication Middleware
609
610
```python
611
from litestar.middleware.authentication import AbstractAuthenticationMiddleware, AuthenticationResult
612
613
class APIKeyAuthMiddleware(AbstractAuthenticationMiddleware):
614
def __init__(self, app: ASGIApp, api_keys: dict[str, dict]):
615
super().__init__(app)
616
self.api_keys = api_keys # Map API keys to user data
617
618
async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:
619
api_key = connection.headers.get("X-API-Key")
620
621
if not api_key:
622
return AuthenticationResult()
623
624
user_data = self.api_keys.get(api_key)
625
if user_data:
626
user = User(**user_data)
627
return AuthenticationResult(user=user, auth={"api_key": api_key})
628
629
return AuthenticationResult()
630
631
# Usage
632
api_keys = {
633
"abc123": {"id": 1, "username": "service_user", "roles": ["api"]},
634
"def456": {"id": 2, "username": "admin_user", "roles": ["api", "admin"]},
635
}
636
637
app = Litestar(
638
route_handlers=[...],
639
middleware=[APIKeyAuthMiddleware(api_keys=api_keys)],
640
)
641
```
642
643
## Types
644
645
```python { .api }
646
# User handler protocol
647
class UserHandlerProtocol(Protocol):
648
async def __call__(self, token: Token, connection: ASGIConnection) -> Any | None:
649
"""Retrieve user from token and connection."""
650
651
# Guard type
652
Guard = Callable[[ASGIConnection, BaseRouteHandler], bool | None | Awaitable[bool | None]]
653
654
# Scopes type
655
Scopes = dict[str, str]
656
657
# JWT algorithm types
658
JWTAlgorithm = Literal["HS256", "HS384", "HS512", "RS256", "RS384", "RS512", "ES256", "ES384", "ES512"]
659
660
# Authentication result types
661
AuthType = Any
662
UserType = Any
663
```