CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-starlette

The little ASGI library that shines.

Overview
Eval results
Files

authentication.mddocs/

Authentication Framework

Starlette provides a flexible authentication framework that supports various authentication methods through pluggable backends, with built-in decorators for authorization and user management.

Authentication Components

Authentication Backend

from starlette.authentication import AuthenticationBackend, AuthCredentials, BaseUser
from starlette.requests import HTTPConnection
from typing import Optional, Tuple

class AuthenticationBackend:
    """
    Abstract base class for authentication backends.
    
    Authentication backends are responsible for:
    - Examining request credentials (headers, cookies, etc.)
    - Validating credentials against user store
    - Returning user and credential information
    """
    
    async def authenticate(
        self, 
        conn: HTTPConnection
    ) -> Optional[Tuple[AuthCredentials, BaseUser]]:
        """
        Authenticate the request.
        
        Args:
            conn: HTTP connection (Request or WebSocket)
            
        Returns:
            Tuple of (AuthCredentials, BaseUser) if authenticated, None otherwise
            
        Raises:
            AuthenticationError: For authentication failures that should
                                return 401/403 responses
        """
        raise NotImplementedError()

Authentication Credentials

from typing import List

class AuthCredentials:
    """
    Authentication credentials containing permission scopes.
    
    Represents the permissions and scopes granted to an authenticated user.
    """
    
    def __init__(self, scopes: List[str] = None) -> None:
        """
        Initialize authentication credentials.
        
        Args:
            scopes: List of permission scopes (e.g., ["read", "write"])
        """
        self.scopes = list(scopes or [])

User Classes

class BaseUser:
    """
    Abstract base class for user objects.
    
    Defines the interface that user objects must implement
    to work with Starlette's authentication system.
    """
    
    @property
    def is_authenticated(self) -> bool:
        """Whether the user is authenticated."""
        raise NotImplementedError()
    
    @property
    def display_name(self) -> str:
        """Display name for the user."""
        raise NotImplementedError()
    
    @property
    def identity(self) -> str:
        """Unique identifier for the user."""
        raise NotImplementedError()

class SimpleUser(BaseUser):
    """
    Simple authenticated user implementation.
    
    Basic user class for applications that only need username-based authentication.
    """
    
    def __init__(self, username: str) -> None:
        """
        Initialize simple user.
        
        Args:
            username: Username/identifier for the user
        """
        self.username = username
    
    @property
    def is_authenticated(self) -> bool:
        """Always True for SimpleUser."""
        return True
    
    @property
    def display_name(self) -> str:
        """Returns the username."""
        return self.username
    
    @property
    def identity(self) -> str:
        """Returns the username."""
        return self.username

class UnauthenticatedUser(BaseUser):
    """
    Unauthenticated user implementation.
    
    Represents users who have not been authenticated or have invalid credentials.
    """
    
    @property
    def is_authenticated(self) -> bool:
        """Always False for UnauthenticatedUser."""
        return False
    
    @property
    def display_name(self) -> str:
        """Empty string for unauthenticated users."""
        return ""
    
    @property
    def identity(self) -> str:
        """Empty string for unauthenticated users."""
        return ""

Authentication Utilities

from starlette.requests import Request
from starlette.responses import Response, JSONResponse, RedirectResponse
from typing import Union, List, Callable

def has_required_scope(conn: HTTPConnection, scopes: List[str]) -> bool:
    """
    Check if connection has required scopes.
    
    Args:
        conn: HTTP connection (Request or WebSocket)  
        scopes: Required permission scopes
        
    Returns:
        bool: True if user has all required scopes
    """
    for scope in scopes:
        if scope not in conn.auth.scopes:
            return False
    return True

def requires(
    scopes: Union[str, List[str]],
    status_code: int = 403,
    redirect: str = None,
) -> Callable:
    """
    Decorator to require authentication scopes.
    
    Args:
        scopes: Required scope(s) - string or list of strings
        status_code: HTTP status code for authorization failure
        redirect: URL to redirect unauthorized users (instead of error)
        
    Returns:
        Decorator function for endpoints
        
    Usage:
        @requires("authenticated")
        @requires(["read", "write"])
        @requires("admin", redirect="/login")
    """
    if isinstance(scopes, str):
        scopes = [scopes]
    
    def decorator(func):
        async def wrapper(*args, **kwargs):
            # Extract request from arguments
            request = None
            for arg in args:
                if isinstance(arg, Request):
                    request = arg
                    break
            
            if not request:
                raise ValueError("@requires decorator requires Request argument")
            
            # Check authentication
            if not request.user.is_authenticated:
                if redirect:
                    return RedirectResponse(redirect, status_code=307)
                return JSONResponse(
                    {"error": "Authentication required"},
                    status_code=401
                )
            
            # Check authorization
            if not has_required_scope(request, scopes):
                if redirect:
                    return RedirectResponse(redirect, status_code=307)
                return JSONResponse(
                    {"error": "Insufficient permissions"},
                    status_code=status_code
                )
            
            # Call original function
            return await func(*args, **kwargs)
        
        return wrapper
    return decorator

Authentication Exceptions

class AuthenticationError(Exception):
    """
    Exception raised for authentication errors.
    
    Used by authentication backends to signal authentication
    failures that should result in 401/403 responses.
    """
    pass

Setting Up Authentication

Basic Authentication Middleware Setup

from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware

# Create authentication backend
class MyAuthBackend(AuthenticationBackend):
    async def authenticate(self, conn):
        # Implementation details...
        pass

# Configure application with authentication
middleware = [
    Middleware(AuthenticationMiddleware, backend=MyAuthBackend()),
]

app = Starlette(
    routes=routes,
    middleware=middleware,
)

# Now request.user and request.auth are available in all endpoints
async def protected_endpoint(request):
    if request.user.is_authenticated:
        return JSONResponse({"user": request.user.display_name})
    else:
        return JSONResponse({"error": "Not authenticated"}, status_code=401)

Authentication Backends

API Key Authentication

import secrets
from starlette.authentication import AuthenticationBackend, AuthCredentials, SimpleUser

class APIKeyAuthBackend(AuthenticationBackend):
    """API key-based authentication backend."""
    
    def __init__(self, api_keys: dict[str, dict] = None):
        # In production, store in database
        self.api_keys = api_keys or {}
    
    async def authenticate(self, conn):
        # Check for API key in header
        api_key = conn.headers.get("X-API-Key")
        if not api_key:
            return None
        
        # Validate API key
        user_data = self.api_keys.get(api_key)
        if not user_data:
            return None
        
        # Return credentials and user
        credentials = AuthCredentials(user_data.get("scopes", []))
        user = SimpleUser(user_data["username"])
        
        return credentials, user

# Usage
api_keys = {
    "sk_test_123": {"username": "testuser", "scopes": ["read"]},
    "sk_live_456": {"username": "liveuser", "scopes": ["read", "write"]},
}

backend = APIKeyAuthBackend(api_keys)
app.add_middleware(AuthenticationMiddleware, backend=backend)

JWT Token Authentication

import jwt
from datetime import datetime, timedelta
from starlette.authentication import AuthenticationBackend, AuthCredentials

class JWTAuthBackend(AuthenticationBackend):
    """JWT token-based authentication backend."""
    
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
    
    async def authenticate(self, conn):
        # Get token from Authorization header
        authorization = conn.headers.get("Authorization")
        if not authorization:
            return None
        
        try:
            scheme, token = authorization.split(" ", 1)
            if scheme.lower() != "bearer":
                return None
        except ValueError:
            return None
        
        try:
            # Decode and validate token
            payload = jwt.decode(
                token, 
                self.secret_key, 
                algorithms=[self.algorithm]
            )
            
            # Check expiration
            exp = payload.get("exp")
            if exp and datetime.utcfromtimestamp(exp) < datetime.utcnow():
                return None
            
            # Create user and credentials
            username = payload.get("sub")
            scopes = payload.get("scopes", [])
            
            credentials = AuthCredentials(scopes)
            user = SimpleUser(username)
            
            return credentials, user
            
        except jwt.InvalidTokenError:
            return None

# Create and sign JWT tokens
class JWTManager:
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
    
    def create_token(
        self, 
        username: str, 
        scopes: list[str] = None,
        expires_delta: timedelta = None
    ) -> str:
        """Create a JWT token for user."""
        if expires_delta is None:
            expires_delta = timedelta(hours=24)
        
        payload = {
            "sub": username,
            "scopes": scopes or [],
            "exp": datetime.utcnow() + expires_delta,
            "iat": datetime.utcnow(),
        }
        
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

# Usage
jwt_manager = JWTManager("your-secret-key")
backend = JWTAuthBackend("your-secret-key")
app.add_middleware(AuthenticationMiddleware, backend=backend)

# Login endpoint to issue tokens
async def login(request):
    data = await request.json()
    username = data.get("username")
    password = data.get("password")
    
    # Validate credentials (implement your logic)
    if await validate_credentials(username, password):
        # Create token
        token = jwt_manager.create_token(
            username=username,
            scopes=["read", "write"]
        )
        return JSONResponse({"access_token": token, "token_type": "bearer"})
    else:
        return JSONResponse({"error": "Invalid credentials"}, status_code=401)

Session-based Authentication

from starlette.middleware.sessions import SessionMiddleware
from starlette.authentication import AuthenticationBackend, AuthCredentials

class SessionAuthBackend(AuthenticationBackend):
    """Session-based authentication backend."""
    
    async def authenticate(self, conn):
        # Check if user is logged in via session
        user_id = conn.session.get("user_id")
        if not user_id:
            return None
        
        # Load user from database (implement your logic)
        user_data = await load_user_by_id(user_id)
        if not user_data:
            # Clear invalid session
            conn.session.clear()
            return None
        
        # Create credentials and user
        credentials = AuthCredentials(user_data.get("scopes", ["authenticated"]))
        user = SimpleUser(user_data["username"])
        
        return credentials, user

# Setup with session middleware (order matters!)
middleware = [
    Middleware(SessionMiddleware, secret_key="session-secret"),
    Middleware(AuthenticationMiddleware, backend=SessionAuthBackend()),
]

# Login endpoint
async def login(request):
    data = await request.form()
    username = data.get("username")
    password = data.get("password")
    
    # Validate credentials
    user = await authenticate_user(username, password)
    if user:
        # Store user ID in session
        request.session["user_id"] = user.id
        return RedirectResponse("/dashboard", status_code=302)
    else:
        return JSONResponse({"error": "Invalid credentials"}, status_code=401)

# Logout endpoint
async def logout(request):
    request.session.clear()
    return RedirectResponse("/", status_code=302)

Multi-Backend Authentication

class MultiAuthBackend(AuthenticationBackend):
    """Support multiple authentication methods."""
    
    def __init__(self, backends: list[AuthenticationBackend]):
        self.backends = backends
    
    async def authenticate(self, conn):
        # Try each backend in order
        for backend in self.backends:
            result = await backend.authenticate(conn)
            if result is not None:
                return result
        
        return None

# Combine multiple backends
api_key_backend = APIKeyAuthBackend(api_keys)
jwt_backend = JWTAuthBackend("secret-key")
session_backend = SessionAuthBackend()

multi_backend = MultiAuthBackend([
    api_key_backend,    # Try API key first
    jwt_backend,        # Then JWT token
    session_backend,    # Finally session
])

app.add_middleware(AuthenticationMiddleware, backend=multi_backend)

Authorization with Decorators

Basic Authorization

from starlette.authentication import requires

# Require authentication
@requires("authenticated")
async def protected_endpoint(request):
    return JSONResponse({"user": request.user.display_name})

# Require specific scopes
@requires(["read", "write"])
async def admin_endpoint(request):
    return JSONResponse({"message": "Admin access granted"})

# Single scope
@requires("admin")
async def super_admin_endpoint(request):
    return JSONResponse({"message": "Super admin access"})

# Custom status code
@requires("premium", status_code=402)  # Payment Required
async def premium_feature(request):
    return JSONResponse({"feature": "premium content"})

# Redirect instead of error
@requires("authenticated", redirect="/login")
async def dashboard(request):
    return HTMLResponse("<h1>User Dashboard</h1>")

Custom Authorization Decorators

from functools import wraps

def require_user(user_id_param: str = "user_id"):
    """Require user to match path parameter."""
    def decorator(func):
        @wraps(func)
        async def wrapper(request):
            if not request.user.is_authenticated:
                return JSONResponse({"error": "Authentication required"}, status_code=401)
            
            # Check if user matches path parameter
            path_user_id = request.path_params.get(user_id_param)
            if path_user_id != request.user.identity:
                return JSONResponse({"error": "Access denied"}, status_code=403)
            
            return await func(request)
        return wrapper
    return decorator

def require_role(role: str):
    """Require user to have specific role."""
    def decorator(func):
        @wraps(func)
        async def wrapper(request):
            if not request.user.is_authenticated:
                return JSONResponse({"error": "Authentication required"}, status_code=401)
            
            # Check user role (requires custom user class)
            if not hasattr(request.user, 'role') or request.user.role != role:
                return JSONResponse({"error": "Insufficient permissions"}, status_code=403)
            
            return await func(request)
        return wrapper
    return decorator

# Usage
@require_user("user_id")
async def user_profile(request):
    # Only allows access to /users/{user_id} if user_id matches authenticated user
    return JSONResponse({"profile": "user data"})

@require_role("admin")  
async def admin_panel(request):
    return JSONResponse({"message": "Admin panel"})

Advanced User Classes

Rich User Implementation

from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime

@dataclass
class User(BaseUser):
    """Rich user implementation with additional properties."""
    
    id: int
    username: str
    email: str
    full_name: str
    role: str
    permissions: List[str]
    is_active: bool = True
    last_login: Optional[datetime] = None
    
    @property
    def is_authenticated(self) -> bool:
        return self.is_active
    
    @property
    def display_name(self) -> str:
        return self.full_name or self.username
    
    @property
    def identity(self) -> str:
        return str(self.id)
    
    def has_permission(self, permission: str) -> bool:
        """Check if user has specific permission."""
        return permission in self.permissions
    
    def is_admin(self) -> bool:
        """Check if user is an administrator."""
        return self.role == "admin"
    
    def is_staff(self) -> bool:
        """Check if user is staff member."""
        return self.role in ["admin", "staff"]

class DatabaseUserBackend(AuthenticationBackend):
    """Authentication backend that loads full user data."""
    
    async def authenticate(self, conn):
        # Get user ID from token/session
        user_id = await self.get_user_id(conn)
        if not user_id:
            return None
        
        # Load full user data from database
        user_data = await self.load_user_from_db(user_id)
        if not user_data or not user_data["is_active"]:
            return None
        
        # Create rich user object
        user = User(**user_data)
        
        # Create credentials with user permissions
        scopes = ["authenticated"] + user.permissions
        credentials = AuthCredentials(scopes)
        
        return credentials, user
    
    async def get_user_id(self, conn) -> Optional[int]:
        # Extract user ID from JWT token, session, etc.
        pass
    
    async def load_user_from_db(self, user_id: int) -> Optional[dict]:
        # Load user data from database
        pass

Error Handling

Authentication Error Handling

from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.responses import JSONResponse

async def auth_error_handler(conn, exc):
    """Custom authentication error handler."""
    return JSONResponse(
        {
            "error": "Authentication failed",
            "message": str(exc),
            "type": exc.__class__.__name__
        },
        status_code=401,
        headers={"WWW-Authenticate": "Bearer"}
    )

# Use custom error handler
app.add_middleware(
    AuthenticationMiddleware,
    backend=auth_backend,
    on_error=auth_error_handler
)

Graceful Authentication Failure

class GracefulAuthBackend(AuthenticationBackend):
    """Authentication backend that never raises exceptions."""
    
    async def authenticate(self, conn):
        try:
            # Attempt authentication
            return await self.do_authenticate(conn)
        except Exception as e:
            # Log error but don't raise
            print(f"Authentication error: {e}")
            return None
    
    async def do_authenticate(self, conn):
        # Actual authentication logic that might raise exceptions
        pass

Testing Authentication

Testing with Authentication

from starlette.testclient import TestClient
from starlette.applications import Starlette

def test_protected_endpoint():
    app = Starlette(routes=routes, middleware=middleware)
    client = TestClient(app)
    
    # Test without authentication
    response = client.get("/protected")
    assert response.status_code == 401
    
    # Test with API key
    headers = {"X-API-Key": "valid-key"}
    response = client.get("/protected", headers=headers)
    assert response.status_code == 200
    
    # Test with JWT token
    token = create_test_token("testuser", ["read"])
    headers = {"Authorization": f"Bearer {token}"}
    response = client.get("/protected", headers=headers)
    assert response.status_code == 200

def test_authorization():
    client = TestClient(app)
    
    # User with read scope
    token = create_test_token("user", ["read"])
    headers = {"Authorization": f"Bearer {token}"}
    
    # Should succeed
    response = client.get("/read-only", headers=headers)
    assert response.status_code == 200
    
    # Should fail (needs write scope)
    response = client.post("/write-data", headers=headers)
    assert response.status_code == 403

def create_test_token(username: str, scopes: List[str]) -> str:
    """Helper to create tokens for testing."""
    return jwt_manager.create_token(username, scopes)

Starlette's authentication framework provides flexible, secure user authentication and authorization with support for multiple backends, rich user models, and comprehensive testing capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-starlette

docs

authentication.md

core-application.md

data-structures.md

exceptions-status.md

index.md

middleware.md

requests-responses.md

routing.md

static-files.md

testing.md

websockets.md

tile.json