CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-blacksheep

Fast web framework for Python asyncio

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

auth.mddocs/

Authentication & Authorization

BlackSheep provides comprehensive authentication and authorization features through integration with the guardpost library, supporting JWT tokens, cookie-based authentication, and flexible authorization policies.

Authentication Overview

Authentication in BlackSheep is handled through authentication strategies that can validate user credentials from various sources like JWT tokens, cookies, or custom authentication mechanisms.

Authentication Setup

from blacksheep import Application
from blacksheep.server.authentication import AuthenticationStrategy
from guardpost import Identity

# Basic authentication setup
app = Application()
auth_strategy = app.use_authentication()

# Custom authentication handler
class CustomAuthHandler:
    async def authenticate(self, context: Request) -> Optional[Identity]:
        # Extract and validate credentials
        auth_header = context.get_first_header(b"Authorization")
        if auth_header:
            # Validate token/credentials
            user_id = validate_token(auth_header.decode())
            if user_id:
                return Identity({"sub": user_id, "name": f"User {user_id}"})
        return None

auth_strategy.add(CustomAuthHandler())

Authentication Decorators

from blacksheep import auth, allow_anonymous

# Require authentication
@app.get("/protected")
@auth()  # Requires authenticated user
async def protected_endpoint():
    return json({"message": "You are authenticated"})

# Allow anonymous access (overrides global auth requirements)
@app.get("/public")
@allow_anonymous
async def public_endpoint():
    return json({"message": "Public access"})

# Require specific authentication schemes
@app.get("/jwt-only")  
@auth(authentication_schemes=["JWT Bearer"])
async def jwt_only_endpoint():
    return json({"message": "JWT authenticated"})

JWT Bearer Authentication

BlackSheep provides built-in support for JWT Bearer token authentication with OIDC integration.

JWT Configuration

from blacksheep.server.authentication.jwt import JWTBearerAuthentication

# Basic JWT setup with OIDC discovery
jwt_auth = JWTBearerAuthentication(
    valid_audiences=["api", "myapp"],
    authority="https://auth.example.com",  # OIDC authority for key discovery
    require_kid=True  # Require key ID in JWT header
)

auth_strategy = app.use_authentication()
auth_strategy.add(jwt_auth)

# Manual key configuration
jwt_auth = JWTBearerAuthentication(
    valid_audiences=["api"],
    valid_issuers=["https://auth.example.com"],
    keys_url="https://auth.example.com/.well-known/jwks.json",
    cache_time=3600,  # Cache keys for 1 hour
    auth_mode="JWT Bearer"
)

JWT Claims Access

from guardpost import Identity

@app.get("/profile")
@auth()
async def get_profile(request: Request):
    identity: Identity = request.identity
    
    # Standard JWT claims
    user_id = identity.id          # Subject (sub claim)
    claims = identity.claims       # All claims dict
    
    # Access specific claims
    email = claims.get("email")
    roles = claims.get("roles", [])
    scopes = claims.get("scope", "").split()
    
    return json({
        "user_id": user_id,
        "email": email,
        "roles": roles,
        "scopes": scopes
    })

Custom JWT Key Providers

from typing import Dict, Any, Optional

class CustomKeysProvider:
    """Custom key provider for JWT validation"""
    
    async def get_keys(self) -> Dict[str, Any]:
        """Return JWKS keys for token validation"""
        # Fetch keys from custom source
        return {
            "keys": [
                {
                    "kty": "RSA",
                    "kid": "key1",
                    "n": "...",  # RSA modulus
                    "e": "AQAB"  # RSA exponent
                }
            ]
        }

# Use custom key provider
jwt_auth = JWTBearerAuthentication(
    valid_audiences=["api"],
    keys_provider=CustomKeysProvider(),
    require_kid=True
)

Cookie Authentication

Cookie-based authentication for web applications with secure session management.

Cookie Authentication Setup

from blacksheep.server.authentication.cookie import CookieAuthentication
from blacksheep.sessions.crypto import Serializer

# Basic cookie authentication
cookie_auth = CookieAuthentication(
    cookie_name="auth_session",
    secret_keys=["your-secret-key-here"],
    auth_scheme="Cookie"
)

auth_strategy = app.use_authentication()
auth_strategy.add(cookie_auth)

# Advanced cookie authentication with custom serializer
custom_serializer = Serializer(
    secret_key="encryption-key",
    signing_key="signing-key"
)

cookie_auth = CookieAuthentication(
    cookie_name="secure_session",
    serializer=custom_serializer,
    auth_scheme="SecureCookie"
)

Cookie Authentication Usage

from blacksheep.cookies import Cookie
from datetime import datetime, timedelta

# Login endpoint that sets authentication cookie
@app.post("/login")
async def login(credentials: FromJSON[dict]):
    username = credentials.value.get("username")
    password = credentials.value.get("password")
    
    # Validate credentials
    user = await authenticate_user(username, password)
    if not user:
        raise Unauthorized("Invalid credentials")
    
    # Create response
    response = json({"message": "Logged in successfully"})
    
    # Set authentication cookie
    auth_data = {"user_id": user.id, "username": user.username}
    cookie_auth.set_cookie(auth_data, response, secure=True)
    
    return response

# Logout endpoint that removes cookie
@app.post("/logout")
@auth()
async def logout():
    response = json({"message": "Logged out successfully"})
    cookie_auth.unset_cookie(response)
    return response

# Access authenticated user data
@app.get("/me")
@auth()
async def get_current_user(request: Request):
    # User identity is automatically available
    identity = request.identity
    return json({
        "user_id": identity.id,
        "claims": identity.claims
    })

Authorization

Authorization in BlackSheep is policy-based, allowing fine-grained access control through roles, permissions, and custom requirements.

Authorization Setup

from blacksheep.server.authorization import AuthorizationStrategy
from guardpost.authorization import Policy, Requirement

# Basic authorization setup
app.use_authentication()  # Authentication required first
authz_strategy = app.use_authorization()

# Define authorization policies
admin_policy = Policy("admin", Requirement("role:admin"))
moderator_policy = Policy("moderator", Requirement("role:moderator"))
api_access_policy = Policy("api_access", Requirement("scope:api.read"))

authz_strategy.add(admin_policy)
authz_strategy.add(moderator_policy)  
authz_strategy.add(api_access_policy)

# Default policy (just requires authentication)
default_policy = Policy("authenticated", AuthenticatedRequirement())
authz_strategy.default_policy = default_policy

Authorization Decorators

# Require specific policy
@app.get("/admin/users")
@auth("admin")  # Requires "admin" policy
async def admin_users():
    return json({"admin_access": True})

# Require multiple policies (all must pass)
@app.get("/moderator/posts")
@auth(["moderator", "api_access"])
async def moderate_posts():
    return json({"moderator_access": True})

# Default authenticated policy
@app.get("/profile")
@auth()  # Uses default "authenticated" policy  
async def user_profile():
    return json({"user_access": True})

# Allow anonymous (bypass authorization)
@app.get("/public")
@allow_anonymous
async def public_access():
    return json({"public": True})

Custom Requirements

from guardpost.authorization import Requirement, AsyncRequirement
from guardpost import Identity

class RoleRequirement(Requirement):
    """Require specific role"""
    
    def __init__(self, required_role: str):
        self.required_role = required_role
    
    def handle(self, identity: Identity) -> bool:
        user_roles = identity.claims.get("roles", [])
        return self.required_role in user_roles

class PermissionRequirement(AsyncRequirement):
    """Require specific permission (async validation)"""
    
    def __init__(self, permission: str):
        self.permission = permission
    
    async def handle(self, identity: Identity) -> bool:
        user_id = identity.id
        # Check permission in database
        has_permission = await check_user_permission(user_id, self.permission)
        return has_permission

# Use custom requirements
admin_role_policy = Policy("admin_role", RoleRequirement("admin"))
write_permission_policy = Policy("can_write", PermissionRequirement("posts:write"))

authz_strategy.add(admin_role_policy)
authz_strategy.add(write_permission_policy)

@app.post("/posts")
@auth("can_write")  # Uses async permission check
async def create_post(post_data: FromJSON[dict]):
    return json({"post_created": True})

Resource-Based Authorization

class ResourceOwnerRequirement(AsyncRequirement):
    """Check if user owns the resource"""
    
    async def handle(self, identity: Identity, resource_id: str = None) -> bool:
        if not resource_id:
            return False
        
        user_id = identity.id
        resource = await get_resource(resource_id)
        return resource and resource.owner_id == user_id

# Resource-specific authorization
@app.get("/posts/{post_id:int}")
@auth()
async def get_post(post_id: FromRoute[int], request: Request):
    # Manual authorization check
    identity = request.identity
    post = await get_post_by_id(post_id.value)
    
    if post.is_private and post.owner_id != identity.id:
        raise Forbidden("Access denied to private post")
    
    return json(post.to_dict())

Authentication Challenges

Handle authentication challenges and provide appropriate responses for different authentication failures.

Challenge Handling

from blacksheep.server.authentication import AuthenticateChallenge

# Custom challenge handler
async def custom_auth_challenge_handler(app: Application, request: Request, exception: AuthenticateChallenge):
    """Handle authentication challenges"""
    
    # Get WWW-Authenticate header
    auth_header = exception.get_header()
    
    # Custom challenge response
    if request.path.startswith("/api/"):
        # API endpoints get JSON response
        response = Response(401, content=JSONContent({
            "error": "authentication_required",
            "message": "Valid authentication required",
            "scheme": exception.scheme
        }))
    else:
        # Web endpoints redirect to login
        response = redirect("/login")
    
    response.headers.add(*auth_header)
    return response

# Register challenge handler
app.exception_handler(AuthenticateChallenge)(custom_auth_challenge_handler)

Error Handling

Comprehensive error handling for authentication and authorization failures.

Authentication Errors

from blacksheep.exceptions import Unauthorized, Forbidden
from blacksheep.server.authorization import ForbiddenError
from guardpost.authorization import UnauthorizedError

# Handle 401 Unauthorized
@app.exception_handler(401)
async def handle_unauthorized(app: Application, request: Request, exc: HTTPException):
    if request.path.startswith("/api/"):
        return Response(401, content=JSONContent({
            "error": "unauthorized",
            "message": "Authentication required"
        }))
    else:
        return redirect("/login")

# Handle 403 Forbidden  
@app.exception_handler(403)
async def handle_forbidden(app: Application, request: Request, exc: HTTPException):
    return Response(403, content=JSONContent({
        "error": "forbidden", 
        "message": "Insufficient permissions"
    }))

# Handle authorization errors
@app.exception_handler(UnauthorizedError)
async def handle_authz_error(app: Application, request: Request, exc: UnauthorizedError):
    if isinstance(exc, ForbiddenError):
        status = 403
        message = "Access forbidden"
    else:
        status = 401
        message = "Authentication required"
    
    return Response(status, content=JSONContent({
        "error": "authorization_failed",
        "message": message
    }))

Session Management

Secure session management with encryption and signing for web applications.

Session Configuration

from blacksheep.sessions import SessionMiddleware
from blacksheep.sessions.crypto import Encryptor, Signer

# Basic session setup
app.use_sessions(
    secret_key="your-secret-key-here",
    session_cookie="session",
    session_max_age=3600  # 1 hour
)

# Advanced session configuration
custom_encryptor = Encryptor("encryption-key")
custom_signer = Signer("signing-key")

session_middleware = SessionMiddleware(
    secret_key="session-secret",
    session_cookie="secure_session",
    encryptor=custom_encryptor,
    signer=custom_signer,
    session_max_age=86400  # 24 hours
)

app.middlewares.append(session_middleware)

Session Usage

from blacksheep.sessions import Session

# Access session in handlers
@app.get("/dashboard")
async def dashboard(request: Request):
    session: Session = request.session
    
    # Get session data
    user_id = session.get("user_id")
    preferences = session.get("preferences", {})
    
    if not user_id:
        return redirect("/login")
    
    return json({
        "user_id": user_id,
        "preferences": preferences
    })

# Set session data
@app.post("/login")
async def login(credentials: FromJSON[dict]):
    # Authenticate user
    user = await authenticate_user(credentials.value)
    if not user:
        raise Unauthorized("Invalid credentials")
    
    # Set session data
    session = request.session
    session["user_id"] = user.id
    session["username"] = user.username
    session["login_time"] = datetime.utcnow().isoformat()
    
    return json({"message": "Login successful"})

# Clear session
@app.post("/logout")
async def logout(request: Request):
    session = request.session
    session.clear()  # Clear all session data
    return json({"message": "Logged out"})

Complete Authentication Example

Here's a complete example showing authentication and authorization in a BlackSheep application:

from blacksheep import Application, json, Request, FromJSON
from blacksheep.server.authentication.jwt import JWTBearerAuthentication
from blacksheep.server.authentication.cookie import CookieAuthentication
from blacksheep import auth, allow_anonymous
from guardpost.authorization import Policy, Requirement
from guardpost import Identity

app = Application()

# Setup authentication strategies
auth_strategy = app.use_authentication()

# JWT for API access
jwt_auth = JWTBearerAuthentication(
    valid_audiences=["api"],
    authority="https://auth.example.com"
)
auth_strategy.add(jwt_auth)

# Cookie auth for web interface
cookie_auth = CookieAuthentication(
    cookie_name="webapp_session",
    secret_keys=["webapp-secret-key"]
)
auth_strategy.add(cookie_auth)

# Setup authorization
authz_strategy = app.use_authorization()
authz_strategy.add(Policy("admin", Requirement("role:admin")))
authz_strategy.add(Policy("user", Requirement("role:user")))

# Public endpoints
@app.get("/")
@allow_anonymous
async def home():
    return json({"message": "Welcome to BlackSheep API"})

# User endpoints (require authentication)
@app.get("/profile")
@auth()
async def get_profile(request: Request):
    identity: Identity = request.identity
    return json({
        "user_id": identity.id,
        "claims": identity.claims
    })

# Admin endpoints (require admin role)
@app.get("/admin/users")
@auth("admin")
async def list_all_users():
    # Only admins can access this
    users = await get_all_users()
    return json({"users": users})

# API endpoints (JWT only)
@app.get("/api/data")
@auth(authentication_schemes=["JWT Bearer"])
async def api_data():
    return json({"data": "API response"})

# Run with: uvicorn main:app

This authentication and authorization system provides secure, flexible access control for both API and web application use cases with support for multiple authentication methods and fine-grained authorization policies.

Install with Tessl CLI

npx tessl i tessl/pypi-blacksheep

docs

additional.md

auth.md

client.md

core-server.md

index.md

request-response.md

testing.md

websockets.md

tile.json