CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-robyn

A Super Fast Async Python Web Framework with a Rust runtime.

Pending
Overview
Eval results
Files

authentication.mddocs/

Authentication

Pluggable authentication system with bearer token support, custom authentication handlers, and identity management for securing routes and WebSocket connections.

Capabilities

Authentication Handler

Abstract base class for implementing custom authentication logic.

class AuthenticationHandler:
    def __init__(self, token_getter: TokenGetter):
        """
        Initialize authentication handler.
        
        Args:
            token_getter: TokenGetter instance for extracting tokens from requests
        """
    
    def authenticate(self, request: Request) -> Optional[Identity]:
        """
        Authenticate a request and return user identity.
        
        Args:
            request: HTTP request object
            
        Returns:
            Identity object if authentication successful, None otherwise
            
        Note:
            This is an abstract method that must be implemented by subclasses
        """
    
    @property
    def unauthorized_response(self) -> Response:
        """
        Default response for unauthorized requests.
        
        Returns:
            Response object with 401 status and unauthorized message
        """

Bearer Token Authentication

Built-in token getter for extracting Bearer tokens from Authorization headers.

class BearerGetter:
    def get_token(self, request: Request) -> Optional[str]:
        """
        Extract Bearer token from Authorization header.
        
        Args:
            request: HTTP request object
            
        Returns:
            Token string if found, None otherwise
            
        Example:
            For header "Authorization: Bearer abc123", returns "abc123"
        """
    
    def set_token(self, request: Request, token: str):
        """
        Set Bearer token in request (for testing/internal use).
        
        Args:
            request: HTTP request object
            token: Token string to set
        """

Identity Object

Represents an authenticated user's identity and claims.

@dataclass
class Identity:
    claims: dict[str, str]
    
    """
    User identity containing authentication claims.
    
    Attributes:
        claims: Dictionary of user claims (e.g., user_id, email, roles)
    """

Usage Examples

Basic Authentication Handler

from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity
import jwt

class JWTAuthenticationHandler(AuthenticationHandler):
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        super().__init__(BearerGetter())
    
    def authenticate(self, request) -> Optional[Identity]:
        # Get token using the bearer token getter
        token = self.token_getter.get_token(request)
        
        if not token:
            return None
        
        try:
            # Decode JWT token
            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
            
            # Create identity from token payload
            identity = Identity(claims={
                "user_id": payload.get("user_id"),
                "email": payload.get("email"),
                "role": payload.get("role", "user")
            })
            
            return identity
            
        except jwt.InvalidTokenError:
            return None

app = Robyn(__file__)

# Configure authentication
auth_handler = JWTAuthenticationHandler("your-secret-key")
app.configure_authentication(auth_handler)

# Public route (no authentication required)
@app.get("/")
def public_route(request):
    return {"message": "This is a public endpoint"}

# Protected route (authentication required)
@app.get("/profile", auth_required=True)
def get_profile(request):
    # Access authenticated user's identity
    user_identity = request.identity
    return {
        "user_id": user_identity.claims["user_id"],
        "email": user_identity.claims["email"],
        "role": user_identity.claims["role"]
    }

# Protected route with role checking
@app.get("/admin", auth_required=True)
def admin_only(request):
    user_identity = request.identity
    
    if user_identity.claims.get("role") != "admin":
        return Response(
            status_code=403,
            headers={},
            description="Forbidden: Admin access required"
        )
    
    return {"message": "Welcome, admin!"}

app.start()

API Key Authentication

from robyn import Robyn, AuthenticationHandler, Identity
import hashlib

class APIKeyGetter:
    def get_token(self, request):
        # Get API key from X-API-Key header
        return request.headers.get("X-API-Key")
    
    def set_token(self, request, token):
        request.headers.set("X-API-Key", token)

class APIKeyAuthenticationHandler(AuthenticationHandler):
    def __init__(self, valid_api_keys: dict):
        self.valid_api_keys = valid_api_keys  # api_key -> user_info mapping
        super().__init__(APIKeyGetter())
    
    def authenticate(self, request) -> Optional[Identity]:
        api_key = self.token_getter.get_token(request)
        
        if not api_key or api_key not in self.valid_api_keys:
            return None
        
        user_info = self.valid_api_keys[api_key]
        return Identity(claims=user_info)

app = Robyn(__file__)

# Configure API key authentication
valid_keys = {
    "api_key_123": {"user_id": "user1", "name": "Alice", "tier": "premium"},
    "api_key_456": {"user_id": "user2", "name": "Bob", "tier": "basic"}
}

auth_handler = APIKeyAuthenticationHandler(valid_keys)
app.configure_authentication(auth_handler)

@app.get("/api/data", auth_required=True)
def get_data(request):
    user_identity = request.identity
    tier = user_identity.claims.get("tier", "basic")
    
    if tier == "premium":
        return {"data": "Premium data with extra features"}
    else:
        return {"data": "Basic data"}

app.start()

Database-Based Authentication

from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity
import sqlite3
import bcrypt

class DatabaseAuthenticationHandler(AuthenticationHandler):
    def __init__(self, db_path: str):
        self.db_path = db_path
        super().__init__(BearerGetter())
        self._init_db()
    
    def _init_db(self):
        # Initialize database with users and sessions tables
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                username TEXT UNIQUE,
                email TEXT UNIQUE,
                password_hash TEXT,
                role TEXT DEFAULT 'user'
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS sessions (
                token TEXT PRIMARY KEY,
                user_id INTEGER,
                expires_at DATETIME,
                FOREIGN KEY (user_id) REFERENCES users (id)
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def authenticate(self, request) -> Optional[Identity]:
        token = self.token_getter.get_token(request)
        
        if not token:
            return None
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Check if token is valid and not expired
        cursor.execute('''
            SELECT u.id, u.username, u.email, u.role
            FROM users u
            JOIN sessions s ON u.id = s.user_id
            WHERE s.token = ? AND s.expires_at > datetime('now')
        ''', (token,))
        
        result = cursor.fetchone()
        conn.close()
        
        if result:
            user_id, username, email, role = result
            return Identity(claims={
                "user_id": str(user_id),
                "username": username,
                "email": email,
                "role": role
            })
        
        return None
    
    def create_user(self, username: str, email: str, password: str, role: str = "user"):
        """Helper method to create a new user"""
        password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
                INSERT INTO users (username, email, password_hash, role)
                VALUES (?, ?, ?, ?)
            ''', (username, email, password_hash, role))
            conn.commit()
            return cursor.lastrowid
        except sqlite3.IntegrityError:
            return None
        finally:
            conn.close()
    
    def create_session(self, username: str, password: str) -> Optional[str]:
        """Helper method to create a session token"""
        import secrets
        from datetime import datetime, timedelta
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Verify user credentials
        cursor.execute('''
            SELECT id, password_hash FROM users WHERE username = ?
        ''', (username,))
        
        result = cursor.fetchone()
        if not result:
            conn.close()
            return None
        
        user_id, stored_hash = result
        
        if not bcrypt.checkpw(password.encode('utf-8'), stored_hash):
            conn.close()
            return None
        
        # Create session token
        token = secrets.token_urlsafe(32)
        expires_at = datetime.now() + timedelta(hours=24)
        
        cursor.execute('''
            INSERT INTO sessions (token, user_id, expires_at)
            VALUES (?, ?, ?)
        ''', (token, user_id, expires_at))
        
        conn.commit()
        conn.close()
        
        return token

app = Robyn(__file__)

# Configure database authentication
auth_handler = DatabaseAuthenticationHandler("users.db")
app.configure_authentication(auth_handler)

# Login endpoint
@app.post("/login")
def login(request):
    data = request.json()
    username = data.get("username")
    password = data.get("password")
    
    token = auth_handler.create_session(username, password)
    
    if token:
        return {"token": token, "message": "Login successful"}
    else:
        return Response(401, {}, {"error": "Invalid credentials"})

# Registration endpoint
@app.post("/register")
def register(request):
    data = request.json()
    username = data.get("username")
    email = data.get("email")
    password = data.get("password")
    
    user_id = auth_handler.create_user(username, email, password)
    
    if user_id:
        return {"message": "User created successfully", "user_id": user_id}
    else:
        return Response(400, {}, {"error": "User already exists"})

# Protected endpoints
@app.get("/profile", auth_required=True)
def get_profile(request):
    return {"user": request.identity.claims}

@app.get("/admin/users", auth_required=True)
def list_users(request):
    if request.identity.claims.get("role") != "admin":
        return Response(403, {}, {"error": "Admin access required"})
    
    # Return list of users (implementation depends on your needs)
    return {"users": ["user1", "user2"]}

app.start()

Custom Token Extraction

from robyn import Robyn, AuthenticationHandler, Identity

class CookieTokenGetter:
    def __init__(self, cookie_name: str = "auth_token"):
        self.cookie_name = cookie_name
    
    def get_token(self, request) -> Optional[str]:
        # Extract token from cookie
        cookie_header = request.headers.get("Cookie")
        if not cookie_header:
            return None
        
        # Simple cookie parsing (in production, use a proper cookie parser)
        cookies = {}
        for cookie in cookie_header.split(';'):
            if '=' in cookie:
                name, value = cookie.strip().split('=', 1)
                cookies[name] = value
        
        return cookies.get(self.cookie_name)
    
    def set_token(self, request, token):
        # This would typically be used in tests or internal processing
        pass

class SessionAuthenticationHandler(AuthenticationHandler):
    def __init__(self):
        # Use custom cookie token getter
        super().__init__(CookieTokenGetter("session_id"))
        self.sessions = {}  # In production, use Redis or database
    
    def authenticate(self, request) -> Optional[Identity]:
        session_id = self.token_getter.get_token(request)
        
        if session_id and session_id in self.sessions:
            user_data = self.sessions[session_id]
            return Identity(claims=user_data)
        
        return None
    
    def create_session(self, user_data: dict) -> str:
        import uuid
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = user_data
        return session_id

app = Robyn(__file__)

auth_handler = SessionAuthenticationHandler()
app.configure_authentication(auth_handler)

@app.post("/login")
def login(request):
    # Simulate login validation
    data = request.json()
    username = data.get("username")
    
    if username:  # Simplified validation
        session_id = auth_handler.create_session({
            "username": username,
            "user_id": "123",
            "role": "user"
        })
        
        # Set session cookie in response
        response = Response(200, {}, {"message": "Login successful"})
        response.set_cookie("session_id", session_id)
        return response
    
    return Response(401, {}, {"error": "Invalid login"})

@app.get("/dashboard", auth_required=True)
def dashboard(request):
    username = request.identity.claims["username"]
    return {"message": f"Welcome to your dashboard, {username}!"}

app.start()

Install with Tessl CLI

npx tessl i tessl/pypi-robyn

docs

authentication.md

core-app.md

exceptions.md

index.md

mcp.md

openapi.md

request-response.md

status-codes.md

templating.md

websocket.md

tile.json