CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-litestar

Litestar is a powerful, flexible yet opinionated ASGI web framework specifically focused on building high-performance APIs.

Pending
Overview
Eval results
Files

security.mddocs/

Security and Authentication

Authentication and authorization systems including JWT authentication, session-based auth, and OAuth2 support. Litestar provides comprehensive security primitives for protecting routes and managing user sessions.

Capabilities

JWT Authentication

JSON Web Token-based authentication with support for various token storage methods and OAuth2 flows.

class JWTAuth(BaseJWTAuth):
    def __init__(
        self,
        token_secret: str,
        retrieve_user_handler: UserHandlerProtocol,
        *,
        algorithm: str = "HS256",
        auth_header: str = "Authorization",
        default_token_expiration: timedelta = timedelta(days=1),
        openapi_security_scheme_name: str = "BearerToken",
        description: str = "JWT api-key authentication and authorization.",
        authentication_middleware_class: type[AbstractAuthenticationMiddleware] = JWTAuthenticationMiddleware,
        guards: Sequence[Guard] | None = None,
        exclude: str | list[str] | None = None,
        exclude_opt_key: str = "exclude_from_auth",
        scopes: Scopes | None = None,
        route_handlers: Sequence[BaseRouteHandler] | None = None,
        dependencies: Dependencies | None = None,
        middleware: Sequence[Middleware] | None = None,
    ):
        """
        JWT Authentication configuration.

        Parameters:
        - token_secret: Secret key for signing tokens
        - retrieve_user_handler: Function to retrieve user from token payload
        - algorithm: JWT signing algorithm (HS256, RS256, etc.)
        - auth_header: HTTP header containing the token
        - default_token_expiration: Default token expiration time
        - openapi_security_scheme_name: Name for OpenAPI security scheme
        - description: Description for OpenAPI documentation
        - authentication_middleware_class: Middleware class to use
        - guards: Authorization guards
        - exclude: Paths to exclude from authentication
        - exclude_opt_key: Key for excluding specific routes
        - scopes: OAuth2 scopes configuration
        - route_handlers: Route handlers requiring authentication
        - dependencies: Additional dependencies
        - middleware: Additional middleware
        """

    def create_token(
        self,
        identifier: str,
        *,
        exp: datetime | None = None,
        sub: str | None = None,
        aud: str | list[str] | None = None,
        iss: str | None = None,
        jti: str | None = None,
        **kwargs: Any,
    ) -> str:
        """
        Create a JWT token.

        Parameters:
        - identifier: Unique user identifier
        - exp: Token expiration time
        - sub: Token subject
        - aud: Token audience
        - iss: Token issuer
        - jti: JWT ID
        - **kwargs: Additional claims

        Returns:
        Encoded JWT token string
        """

    def decode_token(self, encoded_token: str) -> dict[str, Any]:
        """
        Decode and validate a JWT token.

        Parameters:
        - encoded_token: JWT token string

        Returns:
        Decoded token payload
        """

class JWTCookieAuth(JWTAuth):
    def __init__(
        self,
        token_secret: str,
        retrieve_user_handler: UserHandlerProtocol,
        *,
        key: str = "token",
        path: str = "/",
        domain: str | None = None,
        secure: bool = True,
        httponly: bool = True,
        samesite: Literal["lax", "strict", "none"] = "lax",
        **kwargs: Any,
    ):
        """
        Cookie-based JWT authentication.

        Parameters:
        - token_secret: Secret key for signing tokens
        - retrieve_user_handler: Function to retrieve user from token payload
        - key: Cookie name for storing the token
        - path: Cookie path
        - domain: Cookie domain
        - secure: Use secure cookies (HTTPS only)
        - httponly: Use HTTP-only cookies
        - samesite: SameSite cookie attribute
        """

    def create_response_with_token(
        self,
        identifier: str,
        response: Response | None = None,
        **kwargs: Any,
    ) -> Response:
        """
        Create a response with JWT token set as cookie.

        Parameters:
        - identifier: User identifier for token
        - response: Existing response to modify (creates new if None)
        - **kwargs: Additional token claims

        Returns:
        Response with token cookie set
        """

    def create_logout_response(self, response: Response | None = None) -> Response:
        """
        Create a response that clears the authentication cookie.

        Parameters:
        - response: Existing response to modify

        Returns:
        Response with cleared authentication cookie
        """

OAuth2 Authentication

OAuth2 authentication flows including password bearer and authorization code flows.

class OAuth2PasswordBearerAuth(JWTAuth):
    def __init__(
        self,
        token_url: str,
        token_secret: str,
        retrieve_user_handler: UserHandlerProtocol,
        *,
        scopes: dict[str, str] | None = None,
        **kwargs: Any,
    ):
        """
        OAuth2 password bearer authentication.

        Parameters:
        - token_url: URL endpoint for token requests
        - token_secret: Secret for signing tokens
        - retrieve_user_handler: Function to retrieve user
        - scopes: OAuth2 scopes mapping
        """

class OAuth2Login:
    def __init__(
        self,
        *,
        identifier: str,
        exp: datetime | None = None,
        sub: str | None = None,
        aud: str | list[str] | None = None,
        iss: str | None = None,
        jti: str | None = None,
        **kwargs: Any,
    ):
        """
        OAuth2 login token configuration.

        Parameters:
        - identifier: User identifier
        - exp: Expiration time
        - sub: Subject
        - aud: Audience
        - iss: Issuer
        - jti: JWT ID
        """

Session Authentication

Session-based authentication using server-side session storage.

class SessionAuth:
    def __init__(
        self,
        retrieve_user_handler: UserHandlerProtocol,
        session_backend: BaseSessionBackend,
        *,
        exclude: str | list[str] | None = None,
        exclude_opt_key: str = "exclude_from_auth",
        guards: Sequence[Guard] | None = None,
        route_handlers: Sequence[BaseRouteHandler] | None = None,
        dependencies: Dependencies | None = None,
        middleware: Sequence[Middleware] | None = None,
    ):
        """
        Session-based authentication.

        Parameters:
        - retrieve_user_handler: Function to retrieve user from session
        - session_backend: Session storage backend
        - exclude: Paths to exclude from authentication
        - exclude_opt_key: Key for excluding specific routes
        - guards: Authorization guards
        - route_handlers: Route handlers requiring authentication
        - dependencies: Additional dependencies
        - middleware: Additional middleware
        """

    async def login(
        self,
        identifier: str,
        request: Request,
        *,
        response: Response | None = None,
        **kwargs: Any,
    ) -> Response:
        """
        Log in a user by creating a session.

        Parameters:
        - identifier: User identifier
        - request: HTTP request object
        - response: Existing response to modify
        - **kwargs: Additional session data

        Returns:
        Response with session established
        """

    async def logout(
        self,
        request: Request,
        response: Response | None = None,
    ) -> Response:
        """
        Log out a user by destroying the session.

        Parameters:
        - request: HTTP request object
        - response: Existing response to modify

        Returns:
        Response with session cleared
        """

Authentication Middleware

Base classes for implementing custom authentication middleware.

class AbstractAuthenticationMiddleware(AbstractMiddleware):
    def __init__(
        self,
        app: ASGIApp,
        exclude: str | list[str] | None = None,
        exclude_opt_key: str = "exclude_from_auth",
        scopes: Scopes | None = None,
    ):
        """
        Base authentication middleware.

        Parameters:
        - app: ASGI application
        - exclude: Paths to exclude from authentication
        - exclude_opt_key: Key for excluding specific routes
        - scopes: OAuth2 scopes configuration
        """

    async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:
        """
        Authenticate an incoming request.

        Parameters:
        - connection: ASGI connection object

        Returns:
        Authentication result with user and auth information
        """

class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware):
    """JWT authentication middleware implementation."""

class JWTCookieAuthenticationMiddleware(AbstractAuthenticationMiddleware):
    """JWT cookie authentication middleware implementation."""

class SessionAuthMiddleware(AbstractAuthenticationMiddleware):
    """Session authentication middleware implementation."""

Authorization and Guards

Authorization guards for protecting routes based on user permissions and roles.

def guards(*guards: Guard) -> Callable[[T], T]:
    """
    Decorator to apply authorization guards to route handlers.

    Parameters:
    - *guards: Guard functions to apply

    Returns:
    Decorator function
    """

# Common guard implementations
async def user_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
    """Guard that requires an authenticated user."""
    if not connection.user:
        raise NotAuthorizedException("Authentication required")

async def admin_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
    """Guard that requires admin privileges."""
    if not connection.user or not getattr(connection.user, "is_admin", False):
        raise PermissionDeniedException("Admin access required")

def role_required_guard(*required_roles: str) -> Guard:
    """
    Create a guard that requires specific roles.

    Parameters:
    - *required_roles: Required role names

    Returns:
    Guard function
    """
    async def guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
        if not connection.user:
            raise NotAuthorizedException("Authentication required")
        
        user_roles = getattr(connection.user, "roles", [])
        if not any(role in user_roles for role in required_roles):
            raise PermissionDeniedException("Insufficient permissions")
    
    return guard

Authentication Result

Data structures for authentication results and user information.

class AuthenticationResult:
    def __init__(self, user: Any | None = None, auth: Any | None = None):
        """
        Authentication result.

        Parameters:
        - user: Authenticated user object
        - auth: Authentication information
        """
        self.user = user
        self.auth = auth

class Token:
    def __init__(
        self,
        token: str,
        *,
        exp: datetime | None = None,
        sub: str | None = None,
        aud: str | list[str] | None = None,
        iss: str | None = None,
        jti: str | None = None,
        **claims: Any,
    ):
        """
        JWT token representation.

        Parameters:
        - token: Encoded token string
        - exp: Expiration time
        - sub: Subject
        - aud: Audience
        - iss: Issuer
        - jti: JWT ID
        - **claims: Additional token claims
        """

Usage Examples

Basic JWT Authentication

from litestar import Litestar, get, post
from litestar.security.jwt import JWTAuth, Token
from litestar.connection import ASGIConnection
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class User:
    id: int
    username: str
    email: str

# User retrieval function
async def retrieve_user_handler(token: Token, connection: ASGIConnection) -> User | None:
    # In a real app, you'd query your database
    if token.sub == "123":
        return User(id=123, username="alice", email="alice@example.com")
    return None

# JWT configuration
jwt_auth = JWTAuth(
    token_secret="your-secret-key",
    retrieve_user_handler=retrieve_user_handler,
    default_token_expiration=timedelta(hours=24),
)

@post("/login", exclude_from_auth=True)
async def login_handler(data: dict) -> dict:
    # Validate credentials (simplified)
    if data.get("username") == "alice" and data.get("password") == "secret":
        token = jwt_auth.create_token(identifier="123")
        return {"access_token": token, "token_type": "bearer"}
    
    raise NotAuthorizedException("Invalid credentials")

@get("/profile")
async def get_profile(request: Request) -> dict:
    # request.user is automatically populated by JWT middleware
    user = request.user
    return {
        "id": user.id,
        "username": user.username,
        "email": user.email
    }

app = Litestar(
    route_handlers=[login_handler, get_profile],
    on_app_init=[jwt_auth.on_app_init],
)

Cookie-Based JWT Authentication

from litestar.security.jwt import JWTCookieAuth
from litestar.response import Response

jwt_cookie_auth = JWTCookieAuth(
    token_secret="your-secret-key",
    retrieve_user_handler=retrieve_user_handler,
    key="access_token",
    httponly=True,
    secure=True,
    samesite="strict",
)

@post("/login", exclude_from_auth=True)
async def login_with_cookie(data: dict) -> Response:
    if data.get("username") == "alice" and data.get("password") == "secret":
        response = Response({"status": "logged in"})
        return jwt_cookie_auth.create_response_with_token(
            identifier="123",
            response=response
        )
    
    raise NotAuthorizedException("Invalid credentials")

@post("/logout")
async def logout() -> Response:
    response = Response({"status": "logged out"})
    return jwt_cookie_auth.create_logout_response(response)

Session Authentication

from litestar.security.session_auth import SessionAuth
from litestar.middleware.session import SessionMiddleware
from litestar.stores.memory import MemoryStore

# Session storage
session_store = MemoryStore()

# Session middleware
session_middleware = SessionMiddleware(
    store=session_store,
    secret="session-secret-key",
)

# Session auth
session_auth = SessionAuth(
    retrieve_user_handler=retrieve_user_handler,
    session_backend=session_store,
)

@post("/login", exclude_from_auth=True)
async def session_login(request: Request, data: dict) -> Response:
    if data.get("username") == "alice" and data.get("password") == "secret":
        return await session_auth.login(
            identifier="123",
            request=request,
            response=Response({"status": "logged in"})
        )
    
    raise NotAuthorizedException("Invalid credentials")

@post("/logout")
async def session_logout(request: Request) -> Response:
    return await session_auth.logout(
        request=request,
        response=Response({"status": "logged out"})
    )

app = Litestar(
    route_handlers=[session_login, session_logout, get_profile],
    middleware=[session_middleware],
    on_app_init=[session_auth.on_app_init],
)

OAuth2 Password Bearer

from litestar.security.jwt import OAuth2PasswordBearerAuth

oauth2_auth = OAuth2PasswordBearerAuth(
    token_url="/token",
    token_secret="your-secret-key",
    retrieve_user_handler=retrieve_user_handler,
    scopes={
        "read": "Read access",
        "write": "Write access",
        "admin": "Administrative access"
    }
)

@post("/token", exclude_from_auth=True)
async def get_token(data: dict) -> dict:
    # OAuth2 token endpoint
    if data.get("username") == "alice" and data.get("password") == "secret":
        token = oauth2_auth.create_token(
            identifier="123",
            scopes=["read", "write"]
        )
        return {
            "access_token": token,
            "token_type": "bearer",
            "expires_in": 3600,
            "scope": "read write"
        }
    
    raise NotAuthorizedException("Invalid credentials")

Authorization Guards

from litestar.security import guards

@dataclass
class User:
    id: int
    username: str
    roles: list[str]
    is_active: bool = True

async def admin_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:
    user = connection.user
    if not user or "admin" not in user.roles:
        raise PermissionDeniedException("Admin access required")

@get("/admin/users", guards=[admin_guard])
async def list_all_users() -> list[dict]:
    # Only accessible by admins
    return [{"id": 1, "username": "alice"}]

@get("/profile", guards=[user_required_guard])
async def get_user_profile(request: Request) -> dict:
    # Requires any authenticated user
    return {"user": request.user.username}

# Custom role-based guard
editor_guard = role_required_guard("editor", "admin")

@post("/content", guards=[editor_guard])
async def create_content(data: dict) -> dict:
    # Requires editor or admin role
    return {"status": "created", "content": data}

Custom Authentication Middleware

from litestar.middleware.authentication import AbstractAuthenticationMiddleware, AuthenticationResult

class APIKeyAuthMiddleware(AbstractAuthenticationMiddleware):
    def __init__(self, app: ASGIApp, api_keys: dict[str, dict]):
        super().__init__(app)
        self.api_keys = api_keys  # Map API keys to user data

    async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:
        api_key = connection.headers.get("X-API-Key")
        
        if not api_key:
            return AuthenticationResult()
        
        user_data = self.api_keys.get(api_key)
        if user_data:
            user = User(**user_data)
            return AuthenticationResult(user=user, auth={"api_key": api_key})
        
        return AuthenticationResult()

# Usage
api_keys = {
    "abc123": {"id": 1, "username": "service_user", "roles": ["api"]},
    "def456": {"id": 2, "username": "admin_user", "roles": ["api", "admin"]},
}

app = Litestar(
    route_handlers=[...],
    middleware=[APIKeyAuthMiddleware(api_keys=api_keys)],
)

Types

# User handler protocol
class UserHandlerProtocol(Protocol):
    async def __call__(self, token: Token, connection: ASGIConnection) -> Any | None:
        """Retrieve user from token and connection."""

# Guard type
Guard = Callable[[ASGIConnection, BaseRouteHandler], bool | None | Awaitable[bool | None]]

# Scopes type
Scopes = dict[str, str]

# JWT algorithm types
JWTAlgorithm = Literal["HS256", "HS384", "HS512", "RS256", "RS384", "RS512", "ES256", "ES384", "ES512"]

# Authentication result types
AuthType = Any
UserType = Any

Install with Tessl CLI

npx tessl i tessl/pypi-litestar

docs

application-routing.md

configuration.md

dto.md

exceptions.md

http-handlers.md

index.md

middleware.md

openapi.md

plugins.md

request-response.md

security.md

testing.md

websocket.md

tile.json