Litestar is a powerful, flexible yet opinionated ASGI web framework specifically focused on building high-performance APIs.
—
Authentication and authorization systems including JWT authentication, session-based auth, and OAuth2 support. Litestar provides comprehensive security primitives for protecting routes and managing user sessions.
JSON Web Token-based authentication with support for various token storage methods and OAuth2 flows.
class JWTAuth(BaseJWTAuth):
def __init__(
self,
token_secret: str,
retrieve_user_handler: UserHandlerProtocol,
*,
algorithm: str = "HS256",
auth_header: str = "Authorization",
default_token_expiration: timedelta = timedelta(days=1),
openapi_security_scheme_name: str = "BearerToken",
description: str = "JWT api-key authentication and authorization.",
authentication_middleware_class: type[AbstractAuthenticationMiddleware] = JWTAuthenticationMiddleware,
guards: Sequence[Guard] | None = None,
exclude: str | list[str] | None = None,
exclude_opt_key: str = "exclude_from_auth",
scopes: Scopes | None = None,
route_handlers: Sequence[BaseRouteHandler] | None = None,
dependencies: Dependencies | None = None,
middleware: Sequence[Middleware] | None = None,
):
"""
JWT Authentication configuration.
Parameters:
- token_secret: Secret key for signing tokens
- retrieve_user_handler: Function to retrieve user from token payload
- algorithm: JWT signing algorithm (HS256, RS256, etc.)
- auth_header: HTTP header containing the token
- default_token_expiration: Default token expiration time
- openapi_security_scheme_name: Name for OpenAPI security scheme
- description: Description for OpenAPI documentation
- authentication_middleware_class: Middleware class to use
- guards: Authorization guards
- exclude: Paths to exclude from authentication
- exclude_opt_key: Key for excluding specific routes
- scopes: OAuth2 scopes configuration
- route_handlers: Route handlers requiring authentication
- dependencies: Additional dependencies
- middleware: Additional middleware
"""
def create_token(
self,
identifier: str,
*,
exp: datetime | None = None,
sub: str | None = None,
aud: str | list[str] | None = None,
iss: str | None = None,
jti: str | None = None,
**kwargs: Any,
) -> str:
"""
Create a JWT token.
Parameters:
- identifier: Unique user identifier
- exp: Token expiration time
- sub: Token subject
- aud: Token audience
- iss: Token issuer
- jti: JWT ID
- **kwargs: Additional claims
Returns:
Encoded JWT token string
"""
def decode_token(self, encoded_token: str) -> dict[str, Any]:
"""
Decode and validate a JWT token.
Parameters:
- encoded_token: JWT token string
Returns:
Decoded token payload
"""
class JWTCookieAuth(JWTAuth):
def __init__(
self,
token_secret: str,
retrieve_user_handler: UserHandlerProtocol,
*,
key: str = "token",
path: str = "/",
domain: str | None = None,
secure: bool = True,
httponly: bool = True,
samesite: Literal["lax", "strict", "none"] = "lax",
**kwargs: Any,
):
"""
Cookie-based JWT authentication.
Parameters:
- token_secret: Secret key for signing tokens
- retrieve_user_handler: Function to retrieve user from token payload
- key: Cookie name for storing the token
- path: Cookie path
- domain: Cookie domain
- secure: Use secure cookies (HTTPS only)
- httponly: Use HTTP-only cookies
- samesite: SameSite cookie attribute
"""
def create_response_with_token(
self,
identifier: str,
response: Response | None = None,
**kwargs: Any,
) -> Response:
"""
Create a response with JWT token set as cookie.
Parameters:
- identifier: User identifier for token
- response: Existing response to modify (creates new if None)
- **kwargs: Additional token claims
Returns:
Response with token cookie set
"""
def create_logout_response(self, response: Response | None = None) -> Response:
"""
Create a response that clears the authentication cookie.
Parameters:
- response: Existing response to modify
Returns:
Response with cleared authentication cookie
"""OAuth2 authentication flows including password bearer and authorization code flows.
class OAuth2PasswordBearerAuth(JWTAuth):
def __init__(
self,
token_url: str,
token_secret: str,
retrieve_user_handler: UserHandlerProtocol,
*,
scopes: dict[str, str] | None = None,
**kwargs: Any,
):
"""
OAuth2 password bearer authentication.
Parameters:
- token_url: URL endpoint for token requests
- token_secret: Secret for signing tokens
- retrieve_user_handler: Function to retrieve user
- scopes: OAuth2 scopes mapping
"""
class OAuth2Login:
def __init__(
self,
*,
identifier: str,
exp: datetime | None = None,
sub: str | None = None,
aud: str | list[str] | None = None,
iss: str | None = None,
jti: str | None = None,
**kwargs: Any,
):
"""
OAuth2 login token configuration.
Parameters:
- identifier: User identifier
- exp: Expiration time
- sub: Subject
- aud: Audience
- iss: Issuer
- jti: JWT ID
"""Session-based authentication using server-side session storage.
class SessionAuth:
def __init__(
self,
retrieve_user_handler: UserHandlerProtocol,
session_backend: BaseSessionBackend,
*,
exclude: str | list[str] | None = None,
exclude_opt_key: str = "exclude_from_auth",
guards: Sequence[Guard] | None = None,
route_handlers: Sequence[BaseRouteHandler] | None = None,
dependencies: Dependencies | None = None,
middleware: Sequence[Middleware] | None = None,
):
"""
Session-based authentication.
Parameters:
- retrieve_user_handler: Function to retrieve user from session
- session_backend: Session storage backend
- exclude: Paths to exclude from authentication
- exclude_opt_key: Key for excluding specific routes
- guards: Authorization guards
- route_handlers: Route handlers requiring authentication
- dependencies: Additional dependencies
- middleware: Additional middleware
"""
async def login(
self,
identifier: str,
request: Request,
*,
response: Response | None = None,
**kwargs: Any,
) -> Response:
"""
Log in a user by creating a session.
Parameters:
- identifier: User identifier
- request: HTTP request object
- response: Existing response to modify
- **kwargs: Additional session data
Returns:
Response with session established
"""
async def logout(
self,
request: Request,
response: Response | None = None,
) -> Response:
"""
Log out a user by destroying the session.
Parameters:
- request: HTTP request object
- response: Existing response to modify
Returns:
Response with session cleared
"""Base classes for implementing custom authentication middleware.
class AbstractAuthenticationMiddleware(AbstractMiddleware):
def __init__(
self,
app: ASGIApp,
exclude: str | list[str] | None = None,
exclude_opt_key: str = "exclude_from_auth",
scopes: Scopes | None = None,
):
"""
Base authentication middleware.
Parameters:
- app: ASGI application
- exclude: Paths to exclude from authentication
- exclude_opt_key: Key for excluding specific routes
- scopes: OAuth2 scopes configuration
"""
async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:
"""
Authenticate an incoming request.
Parameters:
- connection: ASGI connection object
Returns:
Authentication result with user and auth information
"""
class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware):
"""JWT authentication middleware implementation."""
class JWTCookieAuthenticationMiddleware(AbstractAuthenticationMiddleware):
"""JWT cookie authentication middleware implementation."""
class SessionAuthMiddleware(AbstractAuthenticationMiddleware):
"""Session authentication middleware implementation."""Authorization guards for protecting routes based on user permissions and roles.
def guards(*guards: Guard) -> Callable[[T], T]:
"""
Decorator to apply authorization guards to route handlers.
Parameters:
- *guards: Guard functions to apply
Returns:
Decorator function
"""
# Common guard implementations
async def user_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
"""Guard that requires an authenticated user."""
if not connection.user:
raise NotAuthorizedException("Authentication required")
async def admin_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
"""Guard that requires admin privileges."""
if not connection.user or not getattr(connection.user, "is_admin", False):
raise PermissionDeniedException("Admin access required")
def role_required_guard(*required_roles: str) -> Guard:
"""
Create a guard that requires specific roles.
Parameters:
- *required_roles: Required role names
Returns:
Guard function
"""
async def guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
if not connection.user:
raise NotAuthorizedException("Authentication required")
user_roles = getattr(connection.user, "roles", [])
if not any(role in user_roles for role in required_roles):
raise PermissionDeniedException("Insufficient permissions")
return guardData structures for authentication results and user information.
class AuthenticationResult:
def __init__(self, user: Any | None = None, auth: Any | None = None):
"""
Authentication result.
Parameters:
- user: Authenticated user object
- auth: Authentication information
"""
self.user = user
self.auth = auth
class Token:
def __init__(
self,
token: str,
*,
exp: datetime | None = None,
sub: str | None = None,
aud: str | list[str] | None = None,
iss: str | None = None,
jti: str | None = None,
**claims: Any,
):
"""
JWT token representation.
Parameters:
- token: Encoded token string
- exp: Expiration time
- sub: Subject
- aud: Audience
- iss: Issuer
- jti: JWT ID
- **claims: Additional token claims
"""from litestar import Litestar, get, post
from litestar.security.jwt import JWTAuth, Token
from litestar.connection import ASGIConnection
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class User:
id: int
username: str
email: str
# User retrieval function
async def retrieve_user_handler(token: Token, connection: ASGIConnection) -> User | None:
# In a real app, you'd query your database
if token.sub == "123":
return User(id=123, username="alice", email="alice@example.com")
return None
# JWT configuration
jwt_auth = JWTAuth(
token_secret="your-secret-key",
retrieve_user_handler=retrieve_user_handler,
default_token_expiration=timedelta(hours=24),
)
@post("/login", exclude_from_auth=True)
async def login_handler(data: dict) -> dict:
# Validate credentials (simplified)
if data.get("username") == "alice" and data.get("password") == "secret":
token = jwt_auth.create_token(identifier="123")
return {"access_token": token, "token_type": "bearer"}
raise NotAuthorizedException("Invalid credentials")
@get("/profile")
async def get_profile(request: Request) -> dict:
# request.user is automatically populated by JWT middleware
user = request.user
return {
"id": user.id,
"username": user.username,
"email": user.email
}
app = Litestar(
route_handlers=[login_handler, get_profile],
on_app_init=[jwt_auth.on_app_init],
)from litestar.security.jwt import JWTCookieAuth
from litestar.response import Response
jwt_cookie_auth = JWTCookieAuth(
token_secret="your-secret-key",
retrieve_user_handler=retrieve_user_handler,
key="access_token",
httponly=True,
secure=True,
samesite="strict",
)
@post("/login", exclude_from_auth=True)
async def login_with_cookie(data: dict) -> Response:
if data.get("username") == "alice" and data.get("password") == "secret":
response = Response({"status": "logged in"})
return jwt_cookie_auth.create_response_with_token(
identifier="123",
response=response
)
raise NotAuthorizedException("Invalid credentials")
@post("/logout")
async def logout() -> Response:
response = Response({"status": "logged out"})
return jwt_cookie_auth.create_logout_response(response)from litestar.security.session_auth import SessionAuth
from litestar.middleware.session import SessionMiddleware
from litestar.stores.memory import MemoryStore
# Session storage
session_store = MemoryStore()
# Session middleware
session_middleware = SessionMiddleware(
store=session_store,
secret="session-secret-key",
)
# Session auth
session_auth = SessionAuth(
retrieve_user_handler=retrieve_user_handler,
session_backend=session_store,
)
@post("/login", exclude_from_auth=True)
async def session_login(request: Request, data: dict) -> Response:
if data.get("username") == "alice" and data.get("password") == "secret":
return await session_auth.login(
identifier="123",
request=request,
response=Response({"status": "logged in"})
)
raise NotAuthorizedException("Invalid credentials")
@post("/logout")
async def session_logout(request: Request) -> Response:
return await session_auth.logout(
request=request,
response=Response({"status": "logged out"})
)
app = Litestar(
route_handlers=[session_login, session_logout, get_profile],
middleware=[session_middleware],
on_app_init=[session_auth.on_app_init],
)from litestar.security.jwt import OAuth2PasswordBearerAuth
oauth2_auth = OAuth2PasswordBearerAuth(
token_url="/token",
token_secret="your-secret-key",
retrieve_user_handler=retrieve_user_handler,
scopes={
"read": "Read access",
"write": "Write access",
"admin": "Administrative access"
}
)
@post("/token", exclude_from_auth=True)
async def get_token(data: dict) -> dict:
# OAuth2 token endpoint
if data.get("username") == "alice" and data.get("password") == "secret":
token = oauth2_auth.create_token(
identifier="123",
scopes=["read", "write"]
)
return {
"access_token": token,
"token_type": "bearer",
"expires_in": 3600,
"scope": "read write"
}
raise NotAuthorizedException("Invalid credentials")from litestar.security import guards
@dataclass
class User:
id: int
username: str
roles: list[str]
is_active: bool = True
async def admin_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
user = connection.user
if not user or "admin" not in user.roles:
raise PermissionDeniedException("Admin access required")
@get("/admin/users", guards=[admin_guard])
async def list_all_users() -> list[dict]:
# Only accessible by admins
return [{"id": 1, "username": "alice"}]
@get("/profile", guards=[user_required_guard])
async def get_user_profile(request: Request) -> dict:
# Requires any authenticated user
return {"user": request.user.username}
# Custom role-based guard
editor_guard = role_required_guard("editor", "admin")
@post("/content", guards=[editor_guard])
async def create_content(data: dict) -> dict:
# Requires editor or admin role
return {"status": "created", "content": data}from litestar.middleware.authentication import AbstractAuthenticationMiddleware, AuthenticationResult
class APIKeyAuthMiddleware(AbstractAuthenticationMiddleware):
def __init__(self, app: ASGIApp, api_keys: dict[str, dict]):
super().__init__(app)
self.api_keys = api_keys # Map API keys to user data
async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:
api_key = connection.headers.get("X-API-Key")
if not api_key:
return AuthenticationResult()
user_data = self.api_keys.get(api_key)
if user_data:
user = User(**user_data)
return AuthenticationResult(user=user, auth={"api_key": api_key})
return AuthenticationResult()
# Usage
api_keys = {
"abc123": {"id": 1, "username": "service_user", "roles": ["api"]},
"def456": {"id": 2, "username": "admin_user", "roles": ["api", "admin"]},
}
app = Litestar(
route_handlers=[...],
middleware=[APIKeyAuthMiddleware(api_keys=api_keys)],
)# User handler protocol
class UserHandlerProtocol(Protocol):
async def __call__(self, token: Token, connection: ASGIConnection) -> Any | None:
"""Retrieve user from token and connection."""
# Guard type
Guard = Callable[[ASGIConnection, BaseRouteHandler], bool | None | Awaitable[bool | None]]
# Scopes type
Scopes = dict[str, str]
# JWT algorithm types
JWTAlgorithm = Literal["HS256", "HS384", "HS512", "RS256", "RS384", "RS512", "ES256", "ES384", "ES512"]
# Authentication result types
AuthType = Any
UserType = AnyInstall with Tessl CLI
npx tessl i tessl/pypi-litestar