The little ASGI library that shines.
Starlette provides a flexible authentication framework that supports various authentication methods through pluggable backends, with built-in decorators for authorization and user management.
from starlette.authentication import AuthenticationBackend, AuthCredentials, BaseUser
from starlette.requests import HTTPConnection
from typing import Optional, Tuple
class AuthenticationBackend:
"""
Abstract base class for authentication backends.
Authentication backends are responsible for:
- Examining request credentials (headers, cookies, etc.)
- Validating credentials against user store
- Returning user and credential information
"""
async def authenticate(
self,
conn: HTTPConnection
) -> Optional[Tuple[AuthCredentials, BaseUser]]:
"""
Authenticate the request.
Args:
conn: HTTP connection (Request or WebSocket)
Returns:
Tuple of (AuthCredentials, BaseUser) if authenticated, None otherwise
Raises:
AuthenticationError: For authentication failures that should
return 401/403 responses
"""
raise NotImplementedError()from typing import List
class AuthCredentials:
"""
Authentication credentials containing permission scopes.
Represents the permissions and scopes granted to an authenticated user.
"""
def __init__(self, scopes: List[str] = None) -> None:
"""
Initialize authentication credentials.
Args:
scopes: List of permission scopes (e.g., ["read", "write"])
"""
self.scopes = list(scopes or [])class BaseUser:
"""
Abstract base class for user objects.
Defines the interface that user objects must implement
to work with Starlette's authentication system.
"""
@property
def is_authenticated(self) -> bool:
"""Whether the user is authenticated."""
raise NotImplementedError()
@property
def display_name(self) -> str:
"""Display name for the user."""
raise NotImplementedError()
@property
def identity(self) -> str:
"""Unique identifier for the user."""
raise NotImplementedError()
class SimpleUser(BaseUser):
"""
Simple authenticated user implementation.
Basic user class for applications that only need username-based authentication.
"""
def __init__(self, username: str) -> None:
"""
Initialize simple user.
Args:
username: Username/identifier for the user
"""
self.username = username
@property
def is_authenticated(self) -> bool:
"""Always True for SimpleUser."""
return True
@property
def display_name(self) -> str:
"""Returns the username."""
return self.username
@property
def identity(self) -> str:
"""Returns the username."""
return self.username
class UnauthenticatedUser(BaseUser):
"""
Unauthenticated user implementation.
Represents users who have not been authenticated or have invalid credentials.
"""
@property
def is_authenticated(self) -> bool:
"""Always False for UnauthenticatedUser."""
return False
@property
def display_name(self) -> str:
"""Empty string for unauthenticated users."""
return ""
@property
def identity(self) -> str:
"""Empty string for unauthenticated users."""
return ""from starlette.requests import Request
from starlette.responses import Response, JSONResponse, RedirectResponse
from typing import Union, List, Callable
def has_required_scope(conn: HTTPConnection, scopes: List[str]) -> bool:
"""
Check if connection has required scopes.
Args:
conn: HTTP connection (Request or WebSocket)
scopes: Required permission scopes
Returns:
bool: True if user has all required scopes
"""
for scope in scopes:
if scope not in conn.auth.scopes:
return False
return True
def requires(
scopes: Union[str, List[str]],
status_code: int = 403,
redirect: str = None,
) -> Callable:
"""
Decorator to require authentication scopes.
Args:
scopes: Required scope(s) - string or list of strings
status_code: HTTP status code for authorization failure
redirect: URL to redirect unauthorized users (instead of error)
Returns:
Decorator function for endpoints
Usage:
@requires("authenticated")
@requires(["read", "write"])
@requires("admin", redirect="/login")
"""
if isinstance(scopes, str):
scopes = [scopes]
def decorator(func):
async def wrapper(*args, **kwargs):
# Extract request from arguments
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
raise ValueError("@requires decorator requires Request argument")
# Check authentication
if not request.user.is_authenticated:
if redirect:
return RedirectResponse(redirect, status_code=307)
return JSONResponse(
{"error": "Authentication required"},
status_code=401
)
# Check authorization
if not has_required_scope(request, scopes):
if redirect:
return RedirectResponse(redirect, status_code=307)
return JSONResponse(
{"error": "Insufficient permissions"},
status_code=status_code
)
# Call original function
return await func(*args, **kwargs)
return wrapper
return decoratorclass AuthenticationError(Exception):
"""
Exception raised for authentication errors.
Used by authentication backends to signal authentication
failures that should result in 401/403 responses.
"""
passfrom starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
# Create authentication backend
class MyAuthBackend(AuthenticationBackend):
async def authenticate(self, conn):
# Implementation details...
pass
# Configure application with authentication
middleware = [
Middleware(AuthenticationMiddleware, backend=MyAuthBackend()),
]
app = Starlette(
routes=routes,
middleware=middleware,
)
# Now request.user and request.auth are available in all endpoints
async def protected_endpoint(request):
if request.user.is_authenticated:
return JSONResponse({"user": request.user.display_name})
else:
return JSONResponse({"error": "Not authenticated"}, status_code=401)import secrets
from starlette.authentication import AuthenticationBackend, AuthCredentials, SimpleUser
class APIKeyAuthBackend(AuthenticationBackend):
"""API key-based authentication backend."""
def __init__(self, api_keys: dict[str, dict] = None):
# In production, store in database
self.api_keys = api_keys or {}
async def authenticate(self, conn):
# Check for API key in header
api_key = conn.headers.get("X-API-Key")
if not api_key:
return None
# Validate API key
user_data = self.api_keys.get(api_key)
if not user_data:
return None
# Return credentials and user
credentials = AuthCredentials(user_data.get("scopes", []))
user = SimpleUser(user_data["username"])
return credentials, user
# Usage
api_keys = {
"sk_test_123": {"username": "testuser", "scopes": ["read"]},
"sk_live_456": {"username": "liveuser", "scopes": ["read", "write"]},
}
backend = APIKeyAuthBackend(api_keys)
app.add_middleware(AuthenticationMiddleware, backend=backend)import jwt
from datetime import datetime, timedelta
from starlette.authentication import AuthenticationBackend, AuthCredentials
class JWTAuthBackend(AuthenticationBackend):
"""JWT token-based authentication backend."""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
async def authenticate(self, conn):
# Get token from Authorization header
authorization = conn.headers.get("Authorization")
if not authorization:
return None
try:
scheme, token = authorization.split(" ", 1)
if scheme.lower() != "bearer":
return None
except ValueError:
return None
try:
# Decode and validate token
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
# Check expiration
exp = payload.get("exp")
if exp and datetime.utcfromtimestamp(exp) < datetime.utcnow():
return None
# Create user and credentials
username = payload.get("sub")
scopes = payload.get("scopes", [])
credentials = AuthCredentials(scopes)
user = SimpleUser(username)
return credentials, user
except jwt.InvalidTokenError:
return None
# Create and sign JWT tokens
class JWTManager:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(
self,
username: str,
scopes: list[str] = None,
expires_delta: timedelta = None
) -> str:
"""Create a JWT token for user."""
if expires_delta is None:
expires_delta = timedelta(hours=24)
payload = {
"sub": username,
"scopes": scopes or [],
"exp": datetime.utcnow() + expires_delta,
"iat": datetime.utcnow(),
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
# Usage
jwt_manager = JWTManager("your-secret-key")
backend = JWTAuthBackend("your-secret-key")
app.add_middleware(AuthenticationMiddleware, backend=backend)
# Login endpoint to issue tokens
async def login(request):
data = await request.json()
username = data.get("username")
password = data.get("password")
# Validate credentials (implement your logic)
if await validate_credentials(username, password):
# Create token
token = jwt_manager.create_token(
username=username,
scopes=["read", "write"]
)
return JSONResponse({"access_token": token, "token_type": "bearer"})
else:
return JSONResponse({"error": "Invalid credentials"}, status_code=401)from starlette.middleware.sessions import SessionMiddleware
from starlette.authentication import AuthenticationBackend, AuthCredentials
class SessionAuthBackend(AuthenticationBackend):
"""Session-based authentication backend."""
async def authenticate(self, conn):
# Check if user is logged in via session
user_id = conn.session.get("user_id")
if not user_id:
return None
# Load user from database (implement your logic)
user_data = await load_user_by_id(user_id)
if not user_data:
# Clear invalid session
conn.session.clear()
return None
# Create credentials and user
credentials = AuthCredentials(user_data.get("scopes", ["authenticated"]))
user = SimpleUser(user_data["username"])
return credentials, user
# Setup with session middleware (order matters!)
middleware = [
Middleware(SessionMiddleware, secret_key="session-secret"),
Middleware(AuthenticationMiddleware, backend=SessionAuthBackend()),
]
# Login endpoint
async def login(request):
data = await request.form()
username = data.get("username")
password = data.get("password")
# Validate credentials
user = await authenticate_user(username, password)
if user:
# Store user ID in session
request.session["user_id"] = user.id
return RedirectResponse("/dashboard", status_code=302)
else:
return JSONResponse({"error": "Invalid credentials"}, status_code=401)
# Logout endpoint
async def logout(request):
request.session.clear()
return RedirectResponse("/", status_code=302)class MultiAuthBackend(AuthenticationBackend):
"""Support multiple authentication methods."""
def __init__(self, backends: list[AuthenticationBackend]):
self.backends = backends
async def authenticate(self, conn):
# Try each backend in order
for backend in self.backends:
result = await backend.authenticate(conn)
if result is not None:
return result
return None
# Combine multiple backends
api_key_backend = APIKeyAuthBackend(api_keys)
jwt_backend = JWTAuthBackend("secret-key")
session_backend = SessionAuthBackend()
multi_backend = MultiAuthBackend([
api_key_backend, # Try API key first
jwt_backend, # Then JWT token
session_backend, # Finally session
])
app.add_middleware(AuthenticationMiddleware, backend=multi_backend)from starlette.authentication import requires
# Require authentication
@requires("authenticated")
async def protected_endpoint(request):
return JSONResponse({"user": request.user.display_name})
# Require specific scopes
@requires(["read", "write"])
async def admin_endpoint(request):
return JSONResponse({"message": "Admin access granted"})
# Single scope
@requires("admin")
async def super_admin_endpoint(request):
return JSONResponse({"message": "Super admin access"})
# Custom status code
@requires("premium", status_code=402) # Payment Required
async def premium_feature(request):
return JSONResponse({"feature": "premium content"})
# Redirect instead of error
@requires("authenticated", redirect="/login")
async def dashboard(request):
return HTMLResponse("<h1>User Dashboard</h1>")from functools import wraps
def require_user(user_id_param: str = "user_id"):
"""Require user to match path parameter."""
def decorator(func):
@wraps(func)
async def wrapper(request):
if not request.user.is_authenticated:
return JSONResponse({"error": "Authentication required"}, status_code=401)
# Check if user matches path parameter
path_user_id = request.path_params.get(user_id_param)
if path_user_id != request.user.identity:
return JSONResponse({"error": "Access denied"}, status_code=403)
return await func(request)
return wrapper
return decorator
def require_role(role: str):
"""Require user to have specific role."""
def decorator(func):
@wraps(func)
async def wrapper(request):
if not request.user.is_authenticated:
return JSONResponse({"error": "Authentication required"}, status_code=401)
# Check user role (requires custom user class)
if not hasattr(request.user, 'role') or request.user.role != role:
return JSONResponse({"error": "Insufficient permissions"}, status_code=403)
return await func(request)
return wrapper
return decorator
# Usage
@require_user("user_id")
async def user_profile(request):
# Only allows access to /users/{user_id} if user_id matches authenticated user
return JSONResponse({"profile": "user data"})
@require_role("admin")
async def admin_panel(request):
return JSONResponse({"message": "Admin panel"})from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class User(BaseUser):
"""Rich user implementation with additional properties."""
id: int
username: str
email: str
full_name: str
role: str
permissions: List[str]
is_active: bool = True
last_login: Optional[datetime] = None
@property
def is_authenticated(self) -> bool:
return self.is_active
@property
def display_name(self) -> str:
return self.full_name or self.username
@property
def identity(self) -> str:
return str(self.id)
def has_permission(self, permission: str) -> bool:
"""Check if user has specific permission."""
return permission in self.permissions
def is_admin(self) -> bool:
"""Check if user is an administrator."""
return self.role == "admin"
def is_staff(self) -> bool:
"""Check if user is staff member."""
return self.role in ["admin", "staff"]
class DatabaseUserBackend(AuthenticationBackend):
"""Authentication backend that loads full user data."""
async def authenticate(self, conn):
# Get user ID from token/session
user_id = await self.get_user_id(conn)
if not user_id:
return None
# Load full user data from database
user_data = await self.load_user_from_db(user_id)
if not user_data or not user_data["is_active"]:
return None
# Create rich user object
user = User(**user_data)
# Create credentials with user permissions
scopes = ["authenticated"] + user.permissions
credentials = AuthCredentials(scopes)
return credentials, user
async def get_user_id(self, conn) -> Optional[int]:
# Extract user ID from JWT token, session, etc.
pass
async def load_user_from_db(self, user_id: int) -> Optional[dict]:
# Load user data from database
passfrom starlette.middleware.authentication import AuthenticationMiddleware
from starlette.responses import JSONResponse
async def auth_error_handler(conn, exc):
"""Custom authentication error handler."""
return JSONResponse(
{
"error": "Authentication failed",
"message": str(exc),
"type": exc.__class__.__name__
},
status_code=401,
headers={"WWW-Authenticate": "Bearer"}
)
# Use custom error handler
app.add_middleware(
AuthenticationMiddleware,
backend=auth_backend,
on_error=auth_error_handler
)class GracefulAuthBackend(AuthenticationBackend):
"""Authentication backend that never raises exceptions."""
async def authenticate(self, conn):
try:
# Attempt authentication
return await self.do_authenticate(conn)
except Exception as e:
# Log error but don't raise
print(f"Authentication error: {e}")
return None
async def do_authenticate(self, conn):
# Actual authentication logic that might raise exceptions
passfrom starlette.testclient import TestClient
from starlette.applications import Starlette
def test_protected_endpoint():
app = Starlette(routes=routes, middleware=middleware)
client = TestClient(app)
# Test without authentication
response = client.get("/protected")
assert response.status_code == 401
# Test with API key
headers = {"X-API-Key": "valid-key"}
response = client.get("/protected", headers=headers)
assert response.status_code == 200
# Test with JWT token
token = create_test_token("testuser", ["read"])
headers = {"Authorization": f"Bearer {token}"}
response = client.get("/protected", headers=headers)
assert response.status_code == 200
def test_authorization():
client = TestClient(app)
# User with read scope
token = create_test_token("user", ["read"])
headers = {"Authorization": f"Bearer {token}"}
# Should succeed
response = client.get("/read-only", headers=headers)
assert response.status_code == 200
# Should fail (needs write scope)
response = client.post("/write-data", headers=headers)
assert response.status_code == 403
def create_test_token(username: str, scopes: List[str]) -> str:
"""Helper to create tokens for testing."""
return jwt_manager.create_token(username, scopes)Starlette's authentication framework provides flexible, secure user authentication and authorization with support for multiple backends, rich user models, and comprehensive testing capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-starlette