A Super Fast Async Python Web Framework with a Rust runtime.
—
Pluggable authentication system with bearer token support, custom authentication handlers, and identity management for securing routes and WebSocket connections.
Abstract base class for implementing custom authentication logic.
class AuthenticationHandler:
def __init__(self, token_getter: TokenGetter):
"""
Initialize authentication handler.
Args:
token_getter: TokenGetter instance for extracting tokens from requests
"""
def authenticate(self, request: Request) -> Optional[Identity]:
"""
Authenticate a request and return user identity.
Args:
request: HTTP request object
Returns:
Identity object if authentication successful, None otherwise
Note:
This is an abstract method that must be implemented by subclasses
"""
@property
def unauthorized_response(self) -> Response:
"""
Default response for unauthorized requests.
Returns:
Response object with 401 status and unauthorized message
"""Built-in token getter for extracting Bearer tokens from Authorization headers.
class BearerGetter:
def get_token(self, request: Request) -> Optional[str]:
"""
Extract Bearer token from Authorization header.
Args:
request: HTTP request object
Returns:
Token string if found, None otherwise
Example:
For header "Authorization: Bearer abc123", returns "abc123"
"""
def set_token(self, request: Request, token: str):
"""
Set Bearer token in request (for testing/internal use).
Args:
request: HTTP request object
token: Token string to set
"""Represents an authenticated user's identity and claims.
@dataclass
class Identity:
claims: dict[str, str]
"""
User identity containing authentication claims.
Attributes:
claims: Dictionary of user claims (e.g., user_id, email, roles)
"""from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity
import jwt
class JWTAuthenticationHandler(AuthenticationHandler):
def __init__(self, secret_key: str):
self.secret_key = secret_key
super().__init__(BearerGetter())
def authenticate(self, request) -> Optional[Identity]:
# Get token using the bearer token getter
token = self.token_getter.get_token(request)
if not token:
return None
try:
# Decode JWT token
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
# Create identity from token payload
identity = Identity(claims={
"user_id": payload.get("user_id"),
"email": payload.get("email"),
"role": payload.get("role", "user")
})
return identity
except jwt.InvalidTokenError:
return None
app = Robyn(__file__)
# Configure authentication
auth_handler = JWTAuthenticationHandler("your-secret-key")
app.configure_authentication(auth_handler)
# Public route (no authentication required)
@app.get("/")
def public_route(request):
return {"message": "This is a public endpoint"}
# Protected route (authentication required)
@app.get("/profile", auth_required=True)
def get_profile(request):
# Access authenticated user's identity
user_identity = request.identity
return {
"user_id": user_identity.claims["user_id"],
"email": user_identity.claims["email"],
"role": user_identity.claims["role"]
}
# Protected route with role checking
@app.get("/admin", auth_required=True)
def admin_only(request):
user_identity = request.identity
if user_identity.claims.get("role") != "admin":
return Response(
status_code=403,
headers={},
description="Forbidden: Admin access required"
)
return {"message": "Welcome, admin!"}
app.start()from robyn import Robyn, AuthenticationHandler, Identity
import hashlib
class APIKeyGetter:
def get_token(self, request):
# Get API key from X-API-Key header
return request.headers.get("X-API-Key")
def set_token(self, request, token):
request.headers.set("X-API-Key", token)
class APIKeyAuthenticationHandler(AuthenticationHandler):
def __init__(self, valid_api_keys: dict):
self.valid_api_keys = valid_api_keys # api_key -> user_info mapping
super().__init__(APIKeyGetter())
def authenticate(self, request) -> Optional[Identity]:
api_key = self.token_getter.get_token(request)
if not api_key or api_key not in self.valid_api_keys:
return None
user_info = self.valid_api_keys[api_key]
return Identity(claims=user_info)
app = Robyn(__file__)
# Configure API key authentication
valid_keys = {
"api_key_123": {"user_id": "user1", "name": "Alice", "tier": "premium"},
"api_key_456": {"user_id": "user2", "name": "Bob", "tier": "basic"}
}
auth_handler = APIKeyAuthenticationHandler(valid_keys)
app.configure_authentication(auth_handler)
@app.get("/api/data", auth_required=True)
def get_data(request):
user_identity = request.identity
tier = user_identity.claims.get("tier", "basic")
if tier == "premium":
return {"data": "Premium data with extra features"}
else:
return {"data": "Basic data"}
app.start()from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity
import sqlite3
import bcrypt
class DatabaseAuthenticationHandler(AuthenticationHandler):
def __init__(self, db_path: str):
self.db_path = db_path
super().__init__(BearerGetter())
self._init_db()
def _init_db(self):
# Initialize database with users and sessions tables
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
email TEXT UNIQUE,
password_hash TEXT,
role TEXT DEFAULT 'user'
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER,
expires_at DATETIME,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
conn.commit()
conn.close()
def authenticate(self, request) -> Optional[Identity]:
token = self.token_getter.get_token(request)
if not token:
return None
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if token is valid and not expired
cursor.execute('''
SELECT u.id, u.username, u.email, u.role
FROM users u
JOIN sessions s ON u.id = s.user_id
WHERE s.token = ? AND s.expires_at > datetime('now')
''', (token,))
result = cursor.fetchone()
conn.close()
if result:
user_id, username, email, role = result
return Identity(claims={
"user_id": str(user_id),
"username": username,
"email": email,
"role": role
})
return None
def create_user(self, username: str, email: str, password: str, role: str = "user"):
"""Helper method to create a new user"""
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO users (username, email, password_hash, role)
VALUES (?, ?, ?, ?)
''', (username, email, password_hash, role))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
return None
finally:
conn.close()
def create_session(self, username: str, password: str) -> Optional[str]:
"""Helper method to create a session token"""
import secrets
from datetime import datetime, timedelta
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Verify user credentials
cursor.execute('''
SELECT id, password_hash FROM users WHERE username = ?
''', (username,))
result = cursor.fetchone()
if not result:
conn.close()
return None
user_id, stored_hash = result
if not bcrypt.checkpw(password.encode('utf-8'), stored_hash):
conn.close()
return None
# Create session token
token = secrets.token_urlsafe(32)
expires_at = datetime.now() + timedelta(hours=24)
cursor.execute('''
INSERT INTO sessions (token, user_id, expires_at)
VALUES (?, ?, ?)
''', (token, user_id, expires_at))
conn.commit()
conn.close()
return token
app = Robyn(__file__)
# Configure database authentication
auth_handler = DatabaseAuthenticationHandler("users.db")
app.configure_authentication(auth_handler)
# Login endpoint
@app.post("/login")
def login(request):
data = request.json()
username = data.get("username")
password = data.get("password")
token = auth_handler.create_session(username, password)
if token:
return {"token": token, "message": "Login successful"}
else:
return Response(401, {}, {"error": "Invalid credentials"})
# Registration endpoint
@app.post("/register")
def register(request):
data = request.json()
username = data.get("username")
email = data.get("email")
password = data.get("password")
user_id = auth_handler.create_user(username, email, password)
if user_id:
return {"message": "User created successfully", "user_id": user_id}
else:
return Response(400, {}, {"error": "User already exists"})
# Protected endpoints
@app.get("/profile", auth_required=True)
def get_profile(request):
return {"user": request.identity.claims}
@app.get("/admin/users", auth_required=True)
def list_users(request):
if request.identity.claims.get("role") != "admin":
return Response(403, {}, {"error": "Admin access required"})
# Return list of users (implementation depends on your needs)
return {"users": ["user1", "user2"]}
app.start()from robyn import Robyn, AuthenticationHandler, Identity
class CookieTokenGetter:
def __init__(self, cookie_name: str = "auth_token"):
self.cookie_name = cookie_name
def get_token(self, request) -> Optional[str]:
# Extract token from cookie
cookie_header = request.headers.get("Cookie")
if not cookie_header:
return None
# Simple cookie parsing (in production, use a proper cookie parser)
cookies = {}
for cookie in cookie_header.split(';'):
if '=' in cookie:
name, value = cookie.strip().split('=', 1)
cookies[name] = value
return cookies.get(self.cookie_name)
def set_token(self, request, token):
# This would typically be used in tests or internal processing
pass
class SessionAuthenticationHandler(AuthenticationHandler):
def __init__(self):
# Use custom cookie token getter
super().__init__(CookieTokenGetter("session_id"))
self.sessions = {} # In production, use Redis or database
def authenticate(self, request) -> Optional[Identity]:
session_id = self.token_getter.get_token(request)
if session_id and session_id in self.sessions:
user_data = self.sessions[session_id]
return Identity(claims=user_data)
return None
def create_session(self, user_data: dict) -> str:
import uuid
session_id = str(uuid.uuid4())
self.sessions[session_id] = user_data
return session_id
app = Robyn(__file__)
auth_handler = SessionAuthenticationHandler()
app.configure_authentication(auth_handler)
@app.post("/login")
def login(request):
# Simulate login validation
data = request.json()
username = data.get("username")
if username: # Simplified validation
session_id = auth_handler.create_session({
"username": username,
"user_id": "123",
"role": "user"
})
# Set session cookie in response
response = Response(200, {}, {"message": "Login successful"})
response.set_cookie("session_id", session_id)
return response
return Response(401, {}, {"error": "Invalid login"})
@app.get("/dashboard", auth_required=True)
def dashboard(request):
username = request.identity.claims["username"]
return {"message": f"Welcome to your dashboard, {username}!"}
app.start()Install with Tessl CLI
npx tessl i tessl/pypi-robyn