Fast web framework for Python asyncio
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
BlackSheep provides comprehensive authentication and authorization features through integration with the guardpost library, supporting JWT tokens, cookie-based authentication, and flexible authorization policies.
Authentication in BlackSheep is handled through authentication strategies that can validate user credentials from various sources like JWT tokens, cookies, or custom authentication mechanisms.
from blacksheep import Application
from blacksheep.server.authentication import AuthenticationStrategy
from guardpost import Identity
# Basic authentication setup
app = Application()
auth_strategy = app.use_authentication()
# Custom authentication handler
class CustomAuthHandler:
async def authenticate(self, context: Request) -> Optional[Identity]:
# Extract and validate credentials
auth_header = context.get_first_header(b"Authorization")
if auth_header:
# Validate token/credentials
user_id = validate_token(auth_header.decode())
if user_id:
return Identity({"sub": user_id, "name": f"User {user_id}"})
return None
auth_strategy.add(CustomAuthHandler())from blacksheep import auth, allow_anonymous
# Require authentication
@app.get("/protected")
@auth() # Requires authenticated user
async def protected_endpoint():
return json({"message": "You are authenticated"})
# Allow anonymous access (overrides global auth requirements)
@app.get("/public")
@allow_anonymous
async def public_endpoint():
return json({"message": "Public access"})
# Require specific authentication schemes
@app.get("/jwt-only")
@auth(authentication_schemes=["JWT Bearer"])
async def jwt_only_endpoint():
return json({"message": "JWT authenticated"})BlackSheep provides built-in support for JWT Bearer token authentication with OIDC integration.
from blacksheep.server.authentication.jwt import JWTBearerAuthentication
# Basic JWT setup with OIDC discovery
jwt_auth = JWTBearerAuthentication(
valid_audiences=["api", "myapp"],
authority="https://auth.example.com", # OIDC authority for key discovery
require_kid=True # Require key ID in JWT header
)
auth_strategy = app.use_authentication()
auth_strategy.add(jwt_auth)
# Manual key configuration
jwt_auth = JWTBearerAuthentication(
valid_audiences=["api"],
valid_issuers=["https://auth.example.com"],
keys_url="https://auth.example.com/.well-known/jwks.json",
cache_time=3600, # Cache keys for 1 hour
auth_mode="JWT Bearer"
)from guardpost import Identity
@app.get("/profile")
@auth()
async def get_profile(request: Request):
identity: Identity = request.identity
# Standard JWT claims
user_id = identity.id # Subject (sub claim)
claims = identity.claims # All claims dict
# Access specific claims
email = claims.get("email")
roles = claims.get("roles", [])
scopes = claims.get("scope", "").split()
return json({
"user_id": user_id,
"email": email,
"roles": roles,
"scopes": scopes
})from typing import Dict, Any, Optional
class CustomKeysProvider:
"""Custom key provider for JWT validation"""
async def get_keys(self) -> Dict[str, Any]:
"""Return JWKS keys for token validation"""
# Fetch keys from custom source
return {
"keys": [
{
"kty": "RSA",
"kid": "key1",
"n": "...", # RSA modulus
"e": "AQAB" # RSA exponent
}
]
}
# Use custom key provider
jwt_auth = JWTBearerAuthentication(
valid_audiences=["api"],
keys_provider=CustomKeysProvider(),
require_kid=True
)Cookie-based authentication for web applications with secure session management.
from blacksheep.server.authentication.cookie import CookieAuthentication
from blacksheep.sessions.crypto import Serializer
# Basic cookie authentication
cookie_auth = CookieAuthentication(
cookie_name="auth_session",
secret_keys=["your-secret-key-here"],
auth_scheme="Cookie"
)
auth_strategy = app.use_authentication()
auth_strategy.add(cookie_auth)
# Advanced cookie authentication with custom serializer
custom_serializer = Serializer(
secret_key="encryption-key",
signing_key="signing-key"
)
cookie_auth = CookieAuthentication(
cookie_name="secure_session",
serializer=custom_serializer,
auth_scheme="SecureCookie"
)from blacksheep.cookies import Cookie
from datetime import datetime, timedelta
# Login endpoint that sets authentication cookie
@app.post("/login")
async def login(credentials: FromJSON[dict]):
username = credentials.value.get("username")
password = credentials.value.get("password")
# Validate credentials
user = await authenticate_user(username, password)
if not user:
raise Unauthorized("Invalid credentials")
# Create response
response = json({"message": "Logged in successfully"})
# Set authentication cookie
auth_data = {"user_id": user.id, "username": user.username}
cookie_auth.set_cookie(auth_data, response, secure=True)
return response
# Logout endpoint that removes cookie
@app.post("/logout")
@auth()
async def logout():
response = json({"message": "Logged out successfully"})
cookie_auth.unset_cookie(response)
return response
# Access authenticated user data
@app.get("/me")
@auth()
async def get_current_user(request: Request):
# User identity is automatically available
identity = request.identity
return json({
"user_id": identity.id,
"claims": identity.claims
})Authorization in BlackSheep is policy-based, allowing fine-grained access control through roles, permissions, and custom requirements.
from blacksheep.server.authorization import AuthorizationStrategy
from guardpost.authorization import Policy, Requirement
# Basic authorization setup
app.use_authentication() # Authentication required first
authz_strategy = app.use_authorization()
# Define authorization policies
admin_policy = Policy("admin", Requirement("role:admin"))
moderator_policy = Policy("moderator", Requirement("role:moderator"))
api_access_policy = Policy("api_access", Requirement("scope:api.read"))
authz_strategy.add(admin_policy)
authz_strategy.add(moderator_policy)
authz_strategy.add(api_access_policy)
# Default policy (just requires authentication)
default_policy = Policy("authenticated", AuthenticatedRequirement())
authz_strategy.default_policy = default_policy# Require specific policy
@app.get("/admin/users")
@auth("admin") # Requires "admin" policy
async def admin_users():
return json({"admin_access": True})
# Require multiple policies (all must pass)
@app.get("/moderator/posts")
@auth(["moderator", "api_access"])
async def moderate_posts():
return json({"moderator_access": True})
# Default authenticated policy
@app.get("/profile")
@auth() # Uses default "authenticated" policy
async def user_profile():
return json({"user_access": True})
# Allow anonymous (bypass authorization)
@app.get("/public")
@allow_anonymous
async def public_access():
return json({"public": True})from guardpost.authorization import Requirement, AsyncRequirement
from guardpost import Identity
class RoleRequirement(Requirement):
"""Require specific role"""
def __init__(self, required_role: str):
self.required_role = required_role
def handle(self, identity: Identity) -> bool:
user_roles = identity.claims.get("roles", [])
return self.required_role in user_roles
class PermissionRequirement(AsyncRequirement):
"""Require specific permission (async validation)"""
def __init__(self, permission: str):
self.permission = permission
async def handle(self, identity: Identity) -> bool:
user_id = identity.id
# Check permission in database
has_permission = await check_user_permission(user_id, self.permission)
return has_permission
# Use custom requirements
admin_role_policy = Policy("admin_role", RoleRequirement("admin"))
write_permission_policy = Policy("can_write", PermissionRequirement("posts:write"))
authz_strategy.add(admin_role_policy)
authz_strategy.add(write_permission_policy)
@app.post("/posts")
@auth("can_write") # Uses async permission check
async def create_post(post_data: FromJSON[dict]):
return json({"post_created": True})class ResourceOwnerRequirement(AsyncRequirement):
"""Check if user owns the resource"""
async def handle(self, identity: Identity, resource_id: str = None) -> bool:
if not resource_id:
return False
user_id = identity.id
resource = await get_resource(resource_id)
return resource and resource.owner_id == user_id
# Resource-specific authorization
@app.get("/posts/{post_id:int}")
@auth()
async def get_post(post_id: FromRoute[int], request: Request):
# Manual authorization check
identity = request.identity
post = await get_post_by_id(post_id.value)
if post.is_private and post.owner_id != identity.id:
raise Forbidden("Access denied to private post")
return json(post.to_dict())Handle authentication challenges and provide appropriate responses for different authentication failures.
from blacksheep.server.authentication import AuthenticateChallenge
# Custom challenge handler
async def custom_auth_challenge_handler(app: Application, request: Request, exception: AuthenticateChallenge):
"""Handle authentication challenges"""
# Get WWW-Authenticate header
auth_header = exception.get_header()
# Custom challenge response
if request.path.startswith("/api/"):
# API endpoints get JSON response
response = Response(401, content=JSONContent({
"error": "authentication_required",
"message": "Valid authentication required",
"scheme": exception.scheme
}))
else:
# Web endpoints redirect to login
response = redirect("/login")
response.headers.add(*auth_header)
return response
# Register challenge handler
app.exception_handler(AuthenticateChallenge)(custom_auth_challenge_handler)Comprehensive error handling for authentication and authorization failures.
from blacksheep.exceptions import Unauthorized, Forbidden
from blacksheep.server.authorization import ForbiddenError
from guardpost.authorization import UnauthorizedError
# Handle 401 Unauthorized
@app.exception_handler(401)
async def handle_unauthorized(app: Application, request: Request, exc: HTTPException):
if request.path.startswith("/api/"):
return Response(401, content=JSONContent({
"error": "unauthorized",
"message": "Authentication required"
}))
else:
return redirect("/login")
# Handle 403 Forbidden
@app.exception_handler(403)
async def handle_forbidden(app: Application, request: Request, exc: HTTPException):
return Response(403, content=JSONContent({
"error": "forbidden",
"message": "Insufficient permissions"
}))
# Handle authorization errors
@app.exception_handler(UnauthorizedError)
async def handle_authz_error(app: Application, request: Request, exc: UnauthorizedError):
if isinstance(exc, ForbiddenError):
status = 403
message = "Access forbidden"
else:
status = 401
message = "Authentication required"
return Response(status, content=JSONContent({
"error": "authorization_failed",
"message": message
}))Secure session management with encryption and signing for web applications.
from blacksheep.sessions import SessionMiddleware
from blacksheep.sessions.crypto import Encryptor, Signer
# Basic session setup
app.use_sessions(
secret_key="your-secret-key-here",
session_cookie="session",
session_max_age=3600 # 1 hour
)
# Advanced session configuration
custom_encryptor = Encryptor("encryption-key")
custom_signer = Signer("signing-key")
session_middleware = SessionMiddleware(
secret_key="session-secret",
session_cookie="secure_session",
encryptor=custom_encryptor,
signer=custom_signer,
session_max_age=86400 # 24 hours
)
app.middlewares.append(session_middleware)from blacksheep.sessions import Session
# Access session in handlers
@app.get("/dashboard")
async def dashboard(request: Request):
session: Session = request.session
# Get session data
user_id = session.get("user_id")
preferences = session.get("preferences", {})
if not user_id:
return redirect("/login")
return json({
"user_id": user_id,
"preferences": preferences
})
# Set session data
@app.post("/login")
async def login(credentials: FromJSON[dict]):
# Authenticate user
user = await authenticate_user(credentials.value)
if not user:
raise Unauthorized("Invalid credentials")
# Set session data
session = request.session
session["user_id"] = user.id
session["username"] = user.username
session["login_time"] = datetime.utcnow().isoformat()
return json({"message": "Login successful"})
# Clear session
@app.post("/logout")
async def logout(request: Request):
session = request.session
session.clear() # Clear all session data
return json({"message": "Logged out"})Here's a complete example showing authentication and authorization in a BlackSheep application:
from blacksheep import Application, json, Request, FromJSON
from blacksheep.server.authentication.jwt import JWTBearerAuthentication
from blacksheep.server.authentication.cookie import CookieAuthentication
from blacksheep import auth, allow_anonymous
from guardpost.authorization import Policy, Requirement
from guardpost import Identity
app = Application()
# Setup authentication strategies
auth_strategy = app.use_authentication()
# JWT for API access
jwt_auth = JWTBearerAuthentication(
valid_audiences=["api"],
authority="https://auth.example.com"
)
auth_strategy.add(jwt_auth)
# Cookie auth for web interface
cookie_auth = CookieAuthentication(
cookie_name="webapp_session",
secret_keys=["webapp-secret-key"]
)
auth_strategy.add(cookie_auth)
# Setup authorization
authz_strategy = app.use_authorization()
authz_strategy.add(Policy("admin", Requirement("role:admin")))
authz_strategy.add(Policy("user", Requirement("role:user")))
# Public endpoints
@app.get("/")
@allow_anonymous
async def home():
return json({"message": "Welcome to BlackSheep API"})
# User endpoints (require authentication)
@app.get("/profile")
@auth()
async def get_profile(request: Request):
identity: Identity = request.identity
return json({
"user_id": identity.id,
"claims": identity.claims
})
# Admin endpoints (require admin role)
@app.get("/admin/users")
@auth("admin")
async def list_all_users():
# Only admins can access this
users = await get_all_users()
return json({"users": users})
# API endpoints (JWT only)
@app.get("/api/data")
@auth(authentication_schemes=["JWT Bearer"])
async def api_data():
return json({"data": "API response"})
# Run with: uvicorn main:appThis authentication and authorization system provides secure, flexible access control for both API and web application use cases with support for multiple authentication methods and fine-grained authorization policies.
Install with Tessl CLI
npx tessl i tessl/pypi-blacksheep