Build production-ready conversational AI applications in minutes with rich UI components and LLM integrations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Secure user authentication and authorization with support for password authentication, header-based auth, OAuth providers, and custom authentication workflows. These components enable building secure conversational applications with user management and access control.
Username and password-based authentication with custom validation logic.
import chainlit as cl
from typing import Optional
@cl.password_auth_callback
async def password_auth(username: str, password: str) -> Optional[cl.User]:
"""
Authentication callback for username/password login.
Implement custom validation logic for user credentials.
Args:
username: str - Username or email provided by user
password: str - Password provided by user
Signature: Callable[[str, str], Awaitable[Optional[User]]]
Returns:
Optional[cl.User] - User object if authentication succeeds, None if fails
Usage:
Return a User object for successful authentication, None for failure.
The User object will be stored in the session for subsequent requests.
"""Usage examples for password authentication:
import chainlit as cl
import bcrypt
from typing import Optional
# Mock user database (replace with real database in production)
USERS_DB = {
"alice@example.com": {
"password_hash": "$2b$12$...", # bcrypt hash
"display_name": "Alice Smith",
"role": "admin",
"metadata": {"department": "engineering"}
},
"bob@example.com": {
"password_hash": "$2b$12$...",
"display_name": "Bob Johnson",
"role": "user",
"metadata": {"department": "sales"}
}
}
@cl.password_auth_callback
async def authenticate_user(username: str, password: str) -> Optional[cl.User]:
"""Authenticate user with username/password"""
# Look up user in database
user_data = USERS_DB.get(username.lower())
if not user_data:
# User not found
return None
# Verify password using bcrypt
password_bytes = password.encode('utf-8')
hash_bytes = user_data["password_hash"].encode('utf-8')
if not bcrypt.checkpw(password_bytes, hash_bytes):
# Invalid password
return None
# Authentication successful - create User object
return cl.User(
identifier=username,
display_name=user_data["display_name"],
metadata={
"role": user_data["role"],
**user_data["metadata"],
"last_login": datetime.now().isoformat()
}
)
# Alternative: Simple authentication for development/testing
@cl.password_auth_callback
async def simple_auth(username: str, password: str) -> Optional[cl.User]:
"""Simple authentication for development"""
# Simple hardcoded authentication (NOT for production)
valid_users = {
"admin": "secret123",
"user": "password",
"demo": "demo"
}
if username in valid_users and valid_users[username] == password:
return cl.User(
identifier=username,
display_name=username.title(),
metadata={"auth_method": "simple"}
)
return None
# Database-backed authentication example
async def database_auth_example(username: str, password: str) -> Optional[cl.User]:
"""Database-backed authentication with async database calls"""
try:
# Query database for user (example with async database client)
user_record = await db_client.fetch_user_by_email(username)
if not user_record:
return None
# Verify password hash
if not verify_password(password, user_record.password_hash):
return None
# Check if account is active
if not user_record.is_active:
return None
# Update last login timestamp
await db_client.update_last_login(user_record.id)
# Create Chainlit User object
return cl.User(
identifier=user_record.email,
display_name=user_record.full_name,
metadata={
"user_id": user_record.id,
"role": user_record.role,
"permissions": user_record.permissions,
"created_at": user_record.created_at.isoformat()
}
)
except Exception as e:
# Log authentication error
print(f"Authentication error for {username}: {e}")
return NoneAuthentication using HTTP headers, useful for API keys, JWT tokens, or proxy authentication.
from fastapi import Headers
@cl.header_auth_callback
async def header_auth(headers: Headers) -> Optional[cl.User]:
"""
Authentication callback for header-based authentication.
Extract and validate authentication information from HTTP headers.
Args:
headers: Headers - FastAPI Headers object containing HTTP request headers
Signature: Callable[[Headers], Awaitable[Optional[User]]]
Returns:
Optional[cl.User] - User object if authentication succeeds, None if fails
Usage:
Extract tokens, API keys, or other auth data from headers.
Validate credentials and return User object for successful auth.
"""Usage examples for header authentication:
import chainlit as cl
from fastapi import Headers
import jwt
from typing import Optional
@cl.header_auth_callback
async def authenticate_with_headers(headers: Headers) -> Optional[cl.User]:
"""Authenticate using JWT token in Authorization header"""
# Extract Authorization header
auth_header = headers.get("authorization")
if not auth_header:
return None
# Check for Bearer token format
if not auth_header.startswith("Bearer "):
return None
token = auth_header[7:] # Remove "Bearer " prefix
try:
# Decode JWT token (replace with your JWT secret)
payload = jwt.decode(token, "your-jwt-secret", algorithms=["HS256"])
# Extract user information from token
user_id = payload.get("user_id")
email = payload.get("email")
name = payload.get("name")
role = payload.get("role", "user")
if not user_id or not email:
return None
# Create User object from token data
return cl.User(
identifier=email,
display_name=name or email,
metadata={
"user_id": user_id,
"role": role,
"token_exp": payload.get("exp"),
"auth_method": "jwt"
}
)
except jwt.ExpiredSignatureError:
# Token expired
return None
except jwt.InvalidTokenError:
# Invalid token
return None
@cl.header_auth_callback
async def api_key_auth(headers: Headers) -> Optional[cl.User]:
"""Authenticate using API key in custom header"""
# Extract API key from custom header
api_key = headers.get("x-api-key")
if not api_key:
return None
# Validate API key (example with database lookup)
try:
api_key_record = await db_client.validate_api_key(api_key)
if not api_key_record or not api_key_record.is_active:
return None
# Get associated user
user = await db_client.get_user_by_id(api_key_record.user_id)
if not user:
return None
return cl.User(
identifier=user.email,
display_name=user.name,
metadata={
"user_id": user.id,
"api_key_id": api_key_record.id,
"auth_method": "api_key",
"permissions": api_key_record.permissions
}
)
except Exception as e:
print(f"API key validation error: {e}")
return None
@cl.header_auth_callback
async def proxy_auth(headers: Headers) -> Optional[cl.User]:
"""Authenticate using headers from authentication proxy"""
# Extract user information from proxy headers
user_id = headers.get("x-forwarded-user")
user_email = headers.get("x-forwarded-email")
user_name = headers.get("x-forwarded-name")
user_groups = headers.get("x-forwarded-groups", "")
if not user_id or not user_email:
return None
# Parse groups/roles
groups = [g.strip() for g in user_groups.split(",") if g.strip()]
return cl.User(
identifier=user_email,
display_name=user_name or user_email,
metadata={
"proxy_user_id": user_id,
"groups": groups,
"auth_method": "proxy"
}
)OAuth authentication with support for multiple providers and custom user mapping.
@cl.oauth_callback
async def oauth_callback(
provider_id: str,
token: str,
raw_user_data: Dict[str, str],
default_app_user: cl.User,
id_token: Optional[str] = None
) -> Optional[cl.User]:
"""
OAuth authentication callback for handling OAuth provider responses.
Args:
provider_id: str - OAuth provider identifier (e.g., "google", "github")
token: str - OAuth access token
raw_user_data: Dict[str, str] - Raw user data from OAuth provider
default_app_user: cl.User - Default user object created by Chainlit
id_token: Optional[str] - JWT ID token (if available)
Signature: Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]
Returns:
Optional[cl.User] - Customized user object or None to use default
Usage:
Customize user data mapping, add additional metadata, validate permissions.
Return None to use the default user object, or return a custom User object.
"""Usage examples for OAuth integration:
import chainlit as cl
from typing import Dict, Optional
@cl.oauth_callback
async def process_oauth_user(
provider_id: str,
token: str,
raw_user_data: Dict[str, str],
default_user: cl.User,
id_token: Optional[str] = None
) -> Optional[cl.User]:
"""Process OAuth authentication and customize user data"""
# Extract provider-specific user information
if provider_id == "google":
email = raw_user_data.get("email")
name = raw_user_data.get("name")
picture = raw_user_data.get("picture")
verified_email = raw_user_data.get("verified_email") == "true"
if not verified_email:
# Reject unverified email addresses
return None
elif provider_id == "github":
email = raw_user_data.get("email")
name = raw_user_data.get("name") or raw_user_data.get("login")
avatar_url = raw_user_data.get("avatar_url")
github_id = raw_user_data.get("id")
elif provider_id == "microsoft":
email = raw_user_data.get("mail") or raw_user_data.get("userPrincipalName")
name = raw_user_data.get("displayName")
else:
# Unknown provider - use default user
return default_user
# Check if user exists in database and get additional info
try:
existing_user = await db_client.get_user_by_oauth(provider_id, email)
if existing_user:
# Update existing user with fresh OAuth data
await db_client.update_user_oauth_data(
existing_user.id,
provider_id,
raw_user_data,
token
)
user_metadata = {
"user_id": existing_user.id,
"role": existing_user.role,
"oauth_provider": provider_id,
"permissions": existing_user.permissions,
"last_oauth_login": datetime.now().isoformat()
}
else:
# Create new user in database
new_user = await db_client.create_user_from_oauth(
email=email,
name=name,
provider_id=provider_id,
oauth_data=raw_user_data
)
user_metadata = {
"user_id": new_user.id,
"role": "user", # Default role for new users
"oauth_provider": provider_id,
"is_new_user": True,
"permissions": ["read"], # Default permissions
"created_at": datetime.now().isoformat()
}
# Add provider-specific metadata
if provider_id == "google" and picture:
user_metadata["avatar_url"] = picture
elif provider_id == "github" and avatar_url:
user_metadata["avatar_url"] = avatar_url
user_metadata["github_id"] = github_id
# Create customized User object
return cl.User(
identifier=email,
display_name=name or email,
metadata=user_metadata
)
except Exception as e:
print(f"OAuth user processing error: {e}")
# Return default user as fallback
return default_user
@cl.oauth_callback
async def admin_only_oauth(
provider_id: str,
token: str,
raw_user_data: Dict[str, str],
default_user: cl.User,
id_token: Optional[str] = None
) -> Optional[cl.User]:
"""Example: Only allow OAuth for admin users"""
email = raw_user_data.get("email", "").lower()
# Define admin email domains/addresses
admin_domains = ["@yourcompany.com", "@admin.example.org"]
admin_emails = ["admin@example.com", "support@example.com"]
# Check if user is authorized
is_admin = (
any(email.endswith(domain) for domain in admin_domains) or
email in admin_emails
)
if not is_admin:
# Reject non-admin users
return None
# Grant admin privileges
return cl.User(
identifier=email,
display_name=raw_user_data.get("name", email),
metadata={
"role": "admin",
"oauth_provider": provider_id,
"admin_verified": True,
"permissions": ["read", "write", "admin"]
}
)Handle user logout events and cleanup sessions.
from fastapi import Request, Response
@cl.on_logout
async def logout_handler(request: Request, response: Response) -> None:
"""
Hook executed when user logs out of the application.
Args:
request: Request - FastAPI Request object
response: Response - FastAPI Response object for setting cookies/headers
Signature: Callable[[Request, Response], Any]
Returns:
Any - Return value ignored
Usage:
Cleanup user sessions, invalidate tokens, log audit events.
Modify response headers/cookies for additional cleanup.
"""Usage examples for logout handling:
import chainlit as cl
from fastapi import Request, Response
@cl.on_logout
async def handle_logout(request: Request, response: Response):
"""Handle user logout and cleanup"""
# Get user information before logout
user = cl.user_session.get("user")
if user:
# Log logout event
await log_user_event(
user_id=user.metadata.get("user_id"),
event_type="logout",
timestamp=datetime.now().isoformat(),
ip_address=request.client.host
)
# Invalidate user tokens in database
if user.metadata.get("auth_method") == "jwt":
token = request.headers.get("authorization", "").replace("Bearer ", "")
await db_client.invalidate_token(token)
# Cleanup user-specific resources
await cleanup_user_resources(user.identifier)
# Clear additional cookies
response.delete_cookie("session_id")
response.delete_cookie("user_preferences")
# Optional: Redirect to custom logout page
# response.headers["Location"] = "/logout-success"
print(f"User {user.identifier if user else 'unknown'} logged out")
async def log_user_event(user_id: str, event_type: str, timestamp: str, **kwargs):
"""Log user authentication events for auditing"""
try:
await db_client.insert_audit_log({
"user_id": user_id,
"event_type": event_type,
"timestamp": timestamp,
**kwargs
})
except Exception as e:
print(f"Failed to log user event: {e}")
async def cleanup_user_resources(user_identifier: str):
"""Cleanup user-specific resources on logout"""
try:
# Clear user caches
await cache_client.delete_pattern(f"user:{user_identifier}:*")
# Cleanup temporary files
await cleanup_temp_files(user_identifier)
# Notify other services of logout
await notify_logout_event(user_identifier)
except Exception as e:
print(f"Failed to cleanup resources for {user_identifier}: {e}")Configure authentication settings and customize the authentication flow.
import chainlit as cl
# Example: Configure authentication in app startup
@cl.on_app_startup
async def configure_auth():
"""Configure authentication settings"""
# Set up database connections for auth
await initialize_auth_database()
# Configure OAuth providers (if using external OAuth)
configure_oauth_providers()
# Set up session management
configure_session_settings()
print("Authentication system initialized")
# Example: Conditional authentication based on environment
@cl.password_auth_callback
async def environment_aware_auth(username: str, password: str) -> Optional[cl.User]:
"""Authentication that varies by environment"""
import os
environment = os.getenv("ENVIRONMENT", "development")
if environment == "development":
# Simplified auth for development
if username == "dev" and password == "dev":
return cl.User(
identifier="dev@example.com",
display_name="Developer",
metadata={"role": "admin", "env": "development"}
)
elif environment == "production":
# Full authentication for production
return await full_database_auth(username, password)
return None
# Example: Multi-method authentication
@cl.password_auth_callback
async def multi_method_auth(username: str, password: str) -> Optional[cl.User]:
"""Try multiple authentication methods"""
# Try LDAP authentication first
user = await try_ldap_auth(username, password)
if user:
return user
# Fall back to local database
user = await try_database_auth(username, password)
if user:
return user
# Fall back to external API
user = await try_external_auth(username, password)
return userfrom typing import Dict, Any, Optional
from fastapi import Headers, Request, Response
# Authentication callback types
PasswordAuthCallback = Callable[[str, str], Awaitable[Optional[User]]]
HeaderAuthCallback = Callable[[Headers], Awaitable[Optional[User]]]
OAuthCallback = Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]
LogoutCallback = Callable[[Request, Response], Any]
# OAuth provider data
OAuthProviderData = Dict[str, str] # Raw user data from OAuth provider
OAuthToken = str # OAuth access token
ProviderID = str # OAuth provider identifier
# User metadata structure
UserMetadata = Dict[str, Any] # Additional user information and permissionsInstall with Tessl CLI
npx tessl i tessl/pypi-chainlit