CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-blacksheep

Fast web framework for Python asyncio

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

additional.mddocs/

Additional Features

This document covers additional BlackSheep features including CORS, CSRF protection, sessions, OpenAPI documentation, security headers, compression, templating, and file handling utilities.

CORS (Cross-Origin Resource Sharing)

BlackSheep provides comprehensive CORS support for handling cross-origin requests from web browsers.

Basic CORS Setup

from blacksheep import Application
from blacksheep.server.cors import CORSPolicy, CORSStrategy

app = Application()

# Simple CORS setup (allows all origins)
app.use_cors()

# Configure CORS with specific options
cors_strategy = app.use_cors(
    allow_origins=["https://example.com", "https://app.example.com"],
    allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
    allow_headers=["Content-Type", "Authorization", "X-Requested-With"],
    allow_credentials=True,
    max_age=3600,  # Cache preflight for 1 hour
    expose_headers=["X-Total-Count", "X-Page-Info"]
)

Named CORS Policies

# Multiple CORS policies for different endpoints
app.use_cors()  # Default policy

# API-specific policy
app.add_cors_policy("api", 
    allow_origins=["https://api-client.example.com"],
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    allow_credentials=True
)

# Public API policy
app.add_cors_policy("public",
    allow_origins="*",
    allow_methods=["GET", "OPTIONS"],
    allow_headers=["Content-Type"],
    allow_credentials=False
)

# Widget embed policy
app.add_cors_policy("embed",
    allow_origins=["https://widget.example.com", "https://embed.example.com"],
    allow_methods=["GET", "POST"],
    allow_headers=["Content-Type", "X-Widget-Token"],
    expose_headers=["X-Widget-Version"]
)

CORS Policy Configuration

from blacksheep.server.cors import CORSPolicy

# Manual CORS policy creation
cors_policy = CORSPolicy(
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Content-Type", "Authorization", "X-API-Key"],
    allow_origins=["https://trusted-domain.com"],
    allow_credentials=True,
    max_age=86400,  # 24 hours
    expose_headers=["X-Rate-Limit-Remaining", "X-Rate-Limit-Reset"]
)

# Advanced CORS strategy
strategy = CORSStrategy()
strategy.add_policy("default", cors_policy)

# Custom origin validation
def validate_origin(origin: str) -> bool:
    # Custom logic for validating origins
    allowed_patterns = ["*.example.com", "localhost:*"]
    return any(matches_pattern(origin, pattern) for pattern in allowed_patterns)

cors_policy = CORSPolicy(
    allow_origins=validate_origin,  # Function for dynamic validation
    allow_methods="*",
    allow_headers="*"
)

Route-Specific CORS

from blacksheep.server.cors import cors

# Apply CORS to specific routes
@app.get("/api/public")
@cors("public")  # Use "public" CORS policy
async def public_api():
    return json({"public": True})

@app.get("/api/private") 
@cors("api")  # Use "api" CORS policy
async def private_api():
    return json({"private": True})

# Route without CORS (uses default if set)
@app.get("/internal")
async def internal_api():
    return json({"internal": True})

CSRF Protection

Cross-Site Request Forgery protection for web applications.

CSRF Setup

from blacksheep.server.csrf import CSRFMiddleware, CSRFTokenProvider

# Basic CSRF protection
csrf_middleware = CSRFMiddleware(
    secret_key="csrf-secret-key",
    cookie_name="csrf_token",
    header_name="X-CSRF-Token",
    safe_methods=["GET", "HEAD", "OPTIONS", "TRACE"]
)

app.middlewares.append(csrf_middleware)

# Advanced CSRF configuration
csrf_middleware = CSRFMiddleware(
    secret_key="csrf-secret-key",
    cookie_name="csrf_token",
    header_name="X-CSRF-Token",
    form_field_name="csrf_token",  # Form field name for token
    token_length=32,               # Token length in bytes
    max_age=3600,                  # Token expiration in seconds
    domain=".example.com",         # Cookie domain
    secure=True,                   # Secure cookie flag
    same_site="Strict"             # SameSite policy
)

CSRF Token Usage

from blacksheep import Request, Response

# Generate CSRF token for forms
@app.get("/form")
async def show_form(request: Request):
    csrf_token = request.csrf_token  # Auto-generated token
    
    form_html = f"""
    <form method="post" action="/submit">
        <input type="hidden" name="csrf_token" value="{csrf_token}">
        <input type="text" name="data" placeholder="Enter data">
        <button type="submit">Submit</button>
    </form>
    """
    
    return HTMLContent(form_html)

# CSRF-protected endpoint
@app.post("/submit")
async def submit_form(request: Request, form_data: FromForm[dict]):
    # CSRF validation happens automatically in middleware
    data = form_data.value
    return json({"submitted": True, "data": data})

# API endpoint with CSRF token in header
@app.post("/api/action")
async def api_action(request: Request, data: FromJSON[dict]):
    # Client should send X-CSRF-Token header
    return json({"action": "completed", "data": data.value})

CSRF Exemptions

from blacksheep.server.csrf import csrf_exempt

# Exempt specific endpoints from CSRF protection
@app.post("/webhook")
@csrf_exempt
async def webhook_handler(request: Request):
    # Webhooks don't need CSRF protection
    payload = await request.json()
    return json({"webhook": "processed"})

# API endpoints with other authentication
@app.post("/api/oauth")
@csrf_exempt 
async def oauth_endpoint(request: Request):
    # OAuth endpoints have their own protection
    return json({"oauth": "handled"})

Sessions

Secure session management with encryption and signing.

Session Configuration

from blacksheep.sessions import SessionMiddleware
from blacksheep.sessions.crypto import Encryptor, Signer, SessionSerializer

# Basic session setup
app.use_sessions(
    secret_key="session-secret-key",
    session_cookie="session",
    session_max_age=3600  # 1 hour
)

# Advanced session configuration
custom_encryptor = Encryptor("encryption-key-32-bytes-long-key")
custom_signer = Signer("signing-key")
custom_serializer = SessionSerializer(
    encryptor=custom_encryptor,
    signer=custom_signer
)

session_middleware = SessionMiddleware(
    secret_key="master-secret-key",
    session_cookie="secure_session",
    serializer=custom_serializer,
    session_max_age=86400,  # 24 hours
    cookie_domain=".example.com",
    cookie_secure=True,
    cookie_same_site="Lax"
)

app.middlewares.append(session_middleware)

Session Usage

from blacksheep.sessions import Session

# Login with session
@app.post("/login")
async def login(request: Request, credentials: FromJSON[dict]):
    username = credentials.value.get("username")
    password = credentials.value.get("password")
    
    # Validate credentials
    user = await authenticate_user(username, password)
    if not user:
        raise Unauthorized("Invalid credentials")
    
    # Set session data
    session: Session = request.session
    session["user_id"] = user.id
    session["username"] = user.username
    session["login_time"] = datetime.utcnow().isoformat()
    session["permissions"] = user.permissions
    
    return json({"message": "Login successful"})

# Access session data
@app.get("/dashboard")
async def dashboard(request: Request):
    session = request.session
    
    user_id = session.get("user_id")
    if not user_id:
        return redirect("/login")
    
    return json({
        "user_id": user_id,
        "username": session.get("username"),
        "login_time": session.get("login_time")
    })

# Logout
@app.post("/logout")
async def logout(request: Request):
    session = request.session
    session.clear()  # Clear all session data
    return json({"message": "Logged out successfully"})

# Partial session update
@app.post("/preferences")
async def update_preferences(request: Request, prefs: FromJSON[dict]):
    session = request.session
    
    if "user_id" not in session:
        raise Unauthorized("Not logged in")
    
    # Update specific session keys
    session["theme"] = prefs.value.get("theme", "light")
    session["language"] = prefs.value.get("language", "en")
    
    return json({"preferences": "updated"})

OpenAPI Documentation

Automatic API documentation generation with interactive UI.

OpenAPI Setup

from blacksheep.server.openapi.v3 import OpenAPIHandler
from blacksheep.server.openapi.ui import ReDocMiddleware, SwaggerUIMiddleware

app = Application()

# Configure OpenAPI
docs = OpenAPIHandler(
    info={
        "title": "My API",
        "version": "1.0.0",
        "description": "A comprehensive API built with BlackSheep"
    },
    servers=[
        {"url": "https://api.example.com", "description": "Production"},
        {"url": "https://staging-api.example.com", "description": "Staging"}
    ]
)

# Add OpenAPI documentation endpoints
docs.bind_app(app)

# Add Swagger UI
app.middlewares.append(SwaggerUIMiddleware(docs, path="/docs"))

# Add ReDoc UI  
app.middlewares.append(ReDocMiddleware(docs, path="/redoc"))

API Documentation

from typing import Optional, List
from dataclasses import dataclass
from blacksheep.server.openapi.v3 import doc

@dataclass
class User:
    id: int
    name: str
    email: str
    age: Optional[int] = None

@dataclass
class CreateUserRequest:
    name: str
    email: str
    age: Optional[int] = None

@dataclass
class ErrorResponse:
    error: str
    message: str
    code: Optional[int] = None

# Document endpoints with OpenAPI decorators
@app.get("/users")
@doc(
    summary="List all users",
    description="Retrieve a paginated list of all users in the system",
    responses={
        200: "List of users retrieved successfully",
        401: "Authentication required"
    },
    tags=["Users"]
)
async def list_users(
    page: FromQuery[int] = FromQuery(1),
    limit: FromQuery[int] = FromQuery(10)
) -> List[User]:
    # Implementation here
    users = await get_users(page.value, limit.value)
    return json([user.__dict__ for user in users])

@app.post("/users")
@doc(
    summary="Create new user",
    description="Create a new user account with the provided information",
    responses={
        201: "User created successfully",
        400: "Invalid user data",
        409: "User already exists"
    },
    tags=["Users"]
)
async def create_user(data: FromJSON[CreateUserRequest]) -> User:
    user = await create_user_in_db(data.value)
    return json(user.__dict__)

@app.get("/users/{user_id}")
@doc(
    summary="Get user by ID", 
    description="Retrieve a specific user by their unique identifier",
    responses={
        200: "User found and returned",
        404: "User not found"
    },
    tags=["Users"]
)
async def get_user(user_id: FromRoute[int]) -> User:
    user = await get_user_by_id(user_id.value)
    if not user:
        raise NotFound("User not found")
    return json(user.__dict__)

Security Schemes

# Define security schemes
docs.security_schemes = {
    "BearerAuth": {
        "type": "http",
        "scheme": "bearer",
        "bearerFormat": "JWT"
    },
    "ApiKeyAuth": {
        "type": "apiKey",
        "in": "header",
        "name": "X-API-Key"
    },
    "OAuth2": {
        "type": "oauth2",
        "flows": {
            "authorizationCode": {
                "authorizationUrl": "https://auth.example.com/oauth/authorize",
                "tokenUrl": "https://auth.example.com/oauth/token",
                "scopes": {
                    "read": "Read access",
                    "write": "Write access"
                }
            }
        }
    }
}

# Apply security to endpoints
@app.get("/protected")
@doc(
    summary="Protected endpoint",
    security=[{"BearerAuth": []}],
    tags=["Protected"]
)
@auth()
async def protected_endpoint():
    return json({"protected": True})

Security Headers

HTTP security headers for improved application security.

HSTS (HTTP Strict Transport Security)

from blacksheep.server.security.hsts import HSTSMiddleware

# HSTS middleware
hsts_middleware = HSTSMiddleware(
    max_age=31536000,      # 1 year in seconds
    include_subdomains=True,
    preload=True           # Include in HSTS preload list
)

app.middlewares.append(hsts_middleware)

# Custom HSTS configuration
hsts_middleware = HSTSMiddleware(
    max_age=86400,         # 1 day
    include_subdomains=False,
    preload=False,
    only_https=True        # Only add header for HTTPS requests
)

Security Headers Middleware

async def security_headers_middleware(request: Request, handler):
    response = await handler(request)
    
    # Add security headers
    response.headers.add(b"X-Content-Type-Options", b"nosniff")
    response.headers.add(b"X-Frame-Options", b"DENY") 
    response.headers.add(b"X-XSS-Protection", b"1; mode=block")
    response.headers.add(b"Referrer-Policy", b"strict-origin-when-cross-origin")
    
    # Content Security Policy
    csp = (
        "default-src 'self'; "
        "script-src 'self' 'unsafe-inline'; "
        "style-src 'self' 'unsafe-inline'; "
        "img-src 'self' data: https:; "
        "font-src 'self'; "
        "connect-src 'self'; "
        "frame-ancestors 'none';"
    )
    response.headers.add(b"Content-Security-Policy", csp.encode())
    
    return response

app.middlewares.append(security_headers_middleware)

Response Compression

Compress responses to reduce bandwidth usage.

Gzip Compression

from blacksheep.server.gzip import GzipMiddleware

# Basic gzip compression
gzip_middleware = GzipMiddleware(
    minimum_size=1024,     # Only compress responses > 1KB
    compression_level=6,    # Compression level (1-9)
    content_types={         # MIME types to compress
        "text/html",
        "text/css", 
        "text/javascript",
        "application/json",
        "application/xml"
    }
)

app.middlewares.append(gzip_middleware)

# Advanced compression configuration
gzip_middleware = GzipMiddleware(
    minimum_size=500,
    compression_level=9,    # Maximum compression
    content_types={
        "text/*",           # All text types
        "application/json",
        "application/xml",
        "application/javascript"
    },
    exclude_paths={"/health", "/metrics"}  # Paths to exclude
)

Templating

Template engine integration for server-side rendering.

Template Configuration

from blacksheep import use_templates
from jinja2 import Environment, FileSystemLoader

# Configure Jinja2 templates
templates_env = Environment(
    loader=FileSystemLoader("templates"),
    autoescape=True,
    enable_async=True
)

use_templates(app, templates_env)

# Template rendering endpoint
@app.get("/page/{name}")
async def render_page(name: str, request: Request):
    # Render template with context
    template = templates_env.get_template(f"{name}.html")
    content = await template.render_async({
        "title": f"Page: {name}",
        "user": request.identity,
        "current_time": datetime.now()
    })
    
    return HTMLContent(content)

# Template with form
@app.get("/contact")
async def contact_form():
    template = templates_env.get_template("contact.html")
    content = await template.render_async({
        "csrf_token": generate_csrf_token()
    })
    return HTMLContent(content)

File Handling

Advanced file serving and handling capabilities.

Dynamic File Serving

from blacksheep.server.files.dynamic import serve_files_dynamic
from blacksheep.server.files import DefaultFileOptions

# Dynamic file serving with real-time discovery
serve_files_dynamic(
    app,
    source_folder="./uploads",
    discovery=True,                # Allow directory listing
    cache_time=300,                # Cache for 5 minutes
    extensions={".jpg", ".png", ".pdf", ".doc"},
    root_path="/uploads",
    allow_anonymous=False,         # Require authentication
    default_file_options=DefaultFileOptions(
        cache_time=3600,
        content_disposition_type="attachment"  # Force download
    )
)

# Secure file serving
@app.get("/secure-files/{filename}")
@auth()
async def secure_file_download(filename: str, request: Request):
    # Validate user has access to file
    user_id = request.identity.id
    if not await user_can_access_file(user_id, filename):
        raise Forbidden("Access denied")
    
    # Serve file securely
    file_path = f"secure_uploads/{filename}"
    return file(file_path, content_disposition_type="attachment")

File Upload Handling

import os
from pathlib import Path

@app.post("/upload")
async def upload_files(files: FromFiles, request: Request):
    uploaded_files = files.value
    results = []
    
    for file_part in uploaded_files:
        # Validate file
        if not file_part.file_name:
            continue
        
        filename = file_part.file_name.decode()
        content_type = file_part.content_type.decode() if file_part.content_type else "unknown"
        
        # Security: validate file type
        allowed_types = {"image/jpeg", "image/png", "application/pdf"}
        if content_type not in allowed_types:
            results.append({"filename": filename, "error": "File type not allowed"})
            continue
        
        # Security: sanitize filename
        safe_filename = sanitize_filename(filename)
        
        # Save file
        upload_path = Path("uploads") / safe_filename
        upload_path.parent.mkdir(exist_ok=True)
        
        with open(upload_path, "wb") as f:
            f.write(file_part.data)
        
        results.append({
            "filename": safe_filename,
            "size": len(file_part.data),
            "type": content_type,
            "uploaded": True
        })
    
    return json({"files": results})

def sanitize_filename(filename: str) -> str:
    """Sanitize filename to prevent directory traversal"""
    # Remove path components and dangerous characters
    safe_name = os.path.basename(filename)
    safe_name = "".join(c for c in safe_name if c.isalnum() or c in "._-")
    return safe_name[:100]  # Limit length

Utilities and Helpers

Various utility functions and helper classes.

URL and Path Utilities

from blacksheep.utils import ensure_bytes, ensure_str, join_fragments

# String/bytes conversion
text = "Hello, World!"
bytes_data = ensure_bytes(text)  # b"Hello, World!"
text_again = ensure_str(bytes_data)  # "Hello, World!"

# URL path joining
path = join_fragments("api", "v1", "users", "123")  # "api/v1/users/123"
path_with_slashes = join_fragments("/api/", "/v1/", "/users/")  # "api/v1/users"

Asyncio Utilities

from blacksheep.utils.aio import get_running_loop
import asyncio

# Get current event loop
loop = get_running_loop()

# Async context management helpers
@asynccontextmanager
async def database_context():
    db = await connect_database()
    try:
        yield db
    finally:
        await db.close()

# Use context manager
async with database_context() as db:
    users = await db.query("SELECT * FROM users")

Middleware Utilities

from blacksheep.middlewares import get_middlewares_chain

# Create middleware chain
async def middleware1(request, handler):
    print("Middleware 1 - before")
    response = await handler(request)
    print("Middleware 1 - after")
    return response

async def middleware2(request, handler):
    print("Middleware 2 - before") 
    response = await handler(request)
    print("Middleware 2 - after")
    return response

async def final_handler(request):
    return json({"message": "Final handler"})

# Chain middlewares
middlewares = [middleware1, middleware2]
chained_handler = get_middlewares_chain(middlewares, final_handler)

# Execute chain
response = await chained_handler(request)
# Output:
# Middleware 1 - before
# Middleware 2 - before  
# Middleware 2 - after
# Middleware 1 - after

These additional features provide comprehensive functionality for building secure, performant, and well-documented web applications with BlackSheep. The framework's modular design allows you to use only the features you need while maintaining excellent performance and developer experience.

Install with Tessl CLI

npx tessl i tessl/pypi-blacksheep

docs

additional.md

auth.md

client.md

core-server.md

index.md

request-response.md

testing.md

websockets.md

tile.json