CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-chainlit

Build production-ready conversational AI applications in minutes with rich UI components and LLM integrations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

authentication.mddocs/

Authentication

Secure user authentication and authorization with support for password authentication, header-based auth, OAuth providers, and custom authentication workflows. These components enable building secure conversational applications with user management and access control.

Capabilities

Password Authentication

Username and password-based authentication with custom validation logic.

import chainlit as cl
from typing import Optional

@cl.password_auth_callback
async def password_auth(username: str, password: str) -> Optional[cl.User]:
    """
    Authentication callback for username/password login.
    Implement custom validation logic for user credentials.
    
    Args:
        username: str - Username or email provided by user
        password: str - Password provided by user
        
    Signature: Callable[[str, str], Awaitable[Optional[User]]]
    
    Returns:
        Optional[cl.User] - User object if authentication succeeds, None if fails
        
    Usage:
        Return a User object for successful authentication, None for failure.
        The User object will be stored in the session for subsequent requests.
    """

Usage examples for password authentication:

import chainlit as cl
import bcrypt
from typing import Optional

# Mock user database (replace with real database in production)
USERS_DB = {
    "alice@example.com": {
        "password_hash": "$2b$12$...",  # bcrypt hash
        "display_name": "Alice Smith",
        "role": "admin",
        "metadata": {"department": "engineering"}
    },
    "bob@example.com": {
        "password_hash": "$2b$12$...", 
        "display_name": "Bob Johnson",
        "role": "user",
        "metadata": {"department": "sales"}
    }
}

@cl.password_auth_callback
async def authenticate_user(username: str, password: str) -> Optional[cl.User]:
    """Authenticate user with username/password"""
    
    # Look up user in database
    user_data = USERS_DB.get(username.lower())
    if not user_data:
        # User not found
        return None
    
    # Verify password using bcrypt
    password_bytes = password.encode('utf-8')
    hash_bytes = user_data["password_hash"].encode('utf-8')
    
    if not bcrypt.checkpw(password_bytes, hash_bytes):
        # Invalid password
        return None
    
    # Authentication successful - create User object
    return cl.User(
        identifier=username,
        display_name=user_data["display_name"],
        metadata={
            "role": user_data["role"],
            **user_data["metadata"],
            "last_login": datetime.now().isoformat()
        }
    )

# Alternative: Simple authentication for development/testing
@cl.password_auth_callback
async def simple_auth(username: str, password: str) -> Optional[cl.User]:
    """Simple authentication for development"""
    
    # Simple hardcoded authentication (NOT for production)
    valid_users = {
        "admin": "secret123",
        "user": "password", 
        "demo": "demo"
    }
    
    if username in valid_users and valid_users[username] == password:
        return cl.User(
            identifier=username,
            display_name=username.title(),
            metadata={"auth_method": "simple"}
        )
    
    return None

# Database-backed authentication example
async def database_auth_example(username: str, password: str) -> Optional[cl.User]:
    """Database-backed authentication with async database calls"""
    
    try:
        # Query database for user (example with async database client)
        user_record = await db_client.fetch_user_by_email(username)
        
        if not user_record:
            return None
            
        # Verify password hash
        if not verify_password(password, user_record.password_hash):
            return None
            
        # Check if account is active
        if not user_record.is_active:
            return None
            
        # Update last login timestamp
        await db_client.update_last_login(user_record.id)
        
        # Create Chainlit User object
        return cl.User(
            identifier=user_record.email,
            display_name=user_record.full_name,
            metadata={
                "user_id": user_record.id,
                "role": user_record.role,
                "permissions": user_record.permissions,
                "created_at": user_record.created_at.isoformat()
            }
        )
        
    except Exception as e:
        # Log authentication error
        print(f"Authentication error for {username}: {e}")
        return None

Header-Based Authentication

Authentication using HTTP headers, useful for API keys, JWT tokens, or proxy authentication.

from fastapi import Headers

@cl.header_auth_callback
async def header_auth(headers: Headers) -> Optional[cl.User]:
    """
    Authentication callback for header-based authentication.
    Extract and validate authentication information from HTTP headers.
    
    Args:
        headers: Headers - FastAPI Headers object containing HTTP request headers
        
    Signature: Callable[[Headers], Awaitable[Optional[User]]]
    
    Returns:
        Optional[cl.User] - User object if authentication succeeds, None if fails
        
    Usage:
        Extract tokens, API keys, or other auth data from headers.
        Validate credentials and return User object for successful auth.
    """

Usage examples for header authentication:

import chainlit as cl
from fastapi import Headers
import jwt
from typing import Optional

@cl.header_auth_callback
async def authenticate_with_headers(headers: Headers) -> Optional[cl.User]:
    """Authenticate using JWT token in Authorization header"""
    
    # Extract Authorization header
    auth_header = headers.get("authorization")
    if not auth_header:
        return None
    
    # Check for Bearer token format
    if not auth_header.startswith("Bearer "):
        return None
    
    token = auth_header[7:]  # Remove "Bearer " prefix
    
    try:
        # Decode JWT token (replace with your JWT secret)
        payload = jwt.decode(token, "your-jwt-secret", algorithms=["HS256"])
        
        # Extract user information from token
        user_id = payload.get("user_id")
        email = payload.get("email")
        name = payload.get("name")
        role = payload.get("role", "user")
        
        if not user_id or not email:
            return None
        
        # Create User object from token data
        return cl.User(
            identifier=email,
            display_name=name or email,
            metadata={
                "user_id": user_id,
                "role": role,
                "token_exp": payload.get("exp"),
                "auth_method": "jwt"
            }
        )
        
    except jwt.ExpiredSignatureError:
        # Token expired
        return None
    except jwt.InvalidTokenError:
        # Invalid token
        return None

@cl.header_auth_callback  
async def api_key_auth(headers: Headers) -> Optional[cl.User]:
    """Authenticate using API key in custom header"""
    
    # Extract API key from custom header
    api_key = headers.get("x-api-key")
    if not api_key:
        return None
    
    # Validate API key (example with database lookup)
    try:
        api_key_record = await db_client.validate_api_key(api_key)
        
        if not api_key_record or not api_key_record.is_active:
            return None
        
        # Get associated user
        user = await db_client.get_user_by_id(api_key_record.user_id)
        
        if not user:
            return None
            
        return cl.User(
            identifier=user.email,
            display_name=user.name,
            metadata={
                "user_id": user.id,
                "api_key_id": api_key_record.id,
                "auth_method": "api_key",
                "permissions": api_key_record.permissions
            }
        )
        
    except Exception as e:
        print(f"API key validation error: {e}")
        return None

@cl.header_auth_callback
async def proxy_auth(headers: Headers) -> Optional[cl.User]:
    """Authenticate using headers from authentication proxy"""
    
    # Extract user information from proxy headers
    user_id = headers.get("x-forwarded-user")
    user_email = headers.get("x-forwarded-email") 
    user_name = headers.get("x-forwarded-name")
    user_groups = headers.get("x-forwarded-groups", "")
    
    if not user_id or not user_email:
        return None
    
    # Parse groups/roles
    groups = [g.strip() for g in user_groups.split(",") if g.strip()]
    
    return cl.User(
        identifier=user_email,
        display_name=user_name or user_email,
        metadata={
            "proxy_user_id": user_id,
            "groups": groups,
            "auth_method": "proxy"
        }
    )

OAuth Integration

OAuth authentication with support for multiple providers and custom user mapping.

@cl.oauth_callback
async def oauth_callback(
    provider_id: str,
    token: str, 
    raw_user_data: Dict[str, str],
    default_app_user: cl.User,
    id_token: Optional[str] = None
) -> Optional[cl.User]:
    """
    OAuth authentication callback for handling OAuth provider responses.
    
    Args:
        provider_id: str - OAuth provider identifier (e.g., "google", "github")
        token: str - OAuth access token
        raw_user_data: Dict[str, str] - Raw user data from OAuth provider
        default_app_user: cl.User - Default user object created by Chainlit
        id_token: Optional[str] - JWT ID token (if available)
        
    Signature: Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]
    
    Returns:
        Optional[cl.User] - Customized user object or None to use default
        
    Usage:
        Customize user data mapping, add additional metadata, validate permissions.
        Return None to use the default user object, or return a custom User object.
    """

Usage examples for OAuth integration:

import chainlit as cl
from typing import Dict, Optional

@cl.oauth_callback
async def process_oauth_user(
    provider_id: str,
    token: str,
    raw_user_data: Dict[str, str], 
    default_user: cl.User,
    id_token: Optional[str] = None
) -> Optional[cl.User]:
    """Process OAuth authentication and customize user data"""
    
    # Extract provider-specific user information
    if provider_id == "google":
        email = raw_user_data.get("email")
        name = raw_user_data.get("name")
        picture = raw_user_data.get("picture")
        verified_email = raw_user_data.get("verified_email") == "true"
        
        if not verified_email:
            # Reject unverified email addresses
            return None
            
    elif provider_id == "github":
        email = raw_user_data.get("email")
        name = raw_user_data.get("name") or raw_user_data.get("login")
        avatar_url = raw_user_data.get("avatar_url")
        github_id = raw_user_data.get("id")
        
    elif provider_id == "microsoft":
        email = raw_user_data.get("mail") or raw_user_data.get("userPrincipalName")
        name = raw_user_data.get("displayName")
        
    else:
        # Unknown provider - use default user
        return default_user
    
    # Check if user exists in database and get additional info
    try:
        existing_user = await db_client.get_user_by_oauth(provider_id, email)
        
        if existing_user:
            # Update existing user with fresh OAuth data
            await db_client.update_user_oauth_data(
                existing_user.id,
                provider_id,
                raw_user_data,
                token
            )
            
            user_metadata = {
                "user_id": existing_user.id,
                "role": existing_user.role,
                "oauth_provider": provider_id,
                "permissions": existing_user.permissions,
                "last_oauth_login": datetime.now().isoformat()
            }
            
        else:
            # Create new user in database
            new_user = await db_client.create_user_from_oauth(
                email=email,
                name=name,
                provider_id=provider_id,
                oauth_data=raw_user_data
            )
            
            user_metadata = {
                "user_id": new_user.id,
                "role": "user",  # Default role for new users
                "oauth_provider": provider_id,
                "is_new_user": True,
                "permissions": ["read"],  # Default permissions
                "created_at": datetime.now().isoformat()
            }
        
        # Add provider-specific metadata
        if provider_id == "google" and picture:
            user_metadata["avatar_url"] = picture
        elif provider_id == "github" and avatar_url:
            user_metadata["avatar_url"] = avatar_url
            user_metadata["github_id"] = github_id
            
        # Create customized User object
        return cl.User(
            identifier=email,
            display_name=name or email,
            metadata=user_metadata
        )
        
    except Exception as e:
        print(f"OAuth user processing error: {e}")
        # Return default user as fallback
        return default_user

@cl.oauth_callback
async def admin_only_oauth(
    provider_id: str,
    token: str,
    raw_user_data: Dict[str, str],
    default_user: cl.User,
    id_token: Optional[str] = None
) -> Optional[cl.User]:
    """Example: Only allow OAuth for admin users"""
    
    email = raw_user_data.get("email", "").lower()
    
    # Define admin email domains/addresses
    admin_domains = ["@yourcompany.com", "@admin.example.org"]
    admin_emails = ["admin@example.com", "support@example.com"]
    
    # Check if user is authorized
    is_admin = (
        any(email.endswith(domain) for domain in admin_domains) or
        email in admin_emails
    )
    
    if not is_admin:
        # Reject non-admin users
        return None
    
    # Grant admin privileges
    return cl.User(
        identifier=email,
        display_name=raw_user_data.get("name", email),
        metadata={
            "role": "admin",
            "oauth_provider": provider_id,
            "admin_verified": True,
            "permissions": ["read", "write", "admin"]
        }
    )

Logout Handling

Handle user logout events and cleanup sessions.

from fastapi import Request, Response

@cl.on_logout
async def logout_handler(request: Request, response: Response) -> None:
    """
    Hook executed when user logs out of the application.
    
    Args:
        request: Request - FastAPI Request object
        response: Response - FastAPI Response object for setting cookies/headers
        
    Signature: Callable[[Request, Response], Any]
    
    Returns:
        Any - Return value ignored
        
    Usage:
        Cleanup user sessions, invalidate tokens, log audit events.
        Modify response headers/cookies for additional cleanup.
    """

Usage examples for logout handling:

import chainlit as cl
from fastapi import Request, Response

@cl.on_logout
async def handle_logout(request: Request, response: Response):
    """Handle user logout and cleanup"""
    
    # Get user information before logout
    user = cl.user_session.get("user")
    
    if user:
        # Log logout event
        await log_user_event(
            user_id=user.metadata.get("user_id"),
            event_type="logout",
            timestamp=datetime.now().isoformat(),
            ip_address=request.client.host
        )
        
        # Invalidate user tokens in database
        if user.metadata.get("auth_method") == "jwt":
            token = request.headers.get("authorization", "").replace("Bearer ", "")
            await db_client.invalidate_token(token)
        
        # Cleanup user-specific resources
        await cleanup_user_resources(user.identifier)
    
    # Clear additional cookies
    response.delete_cookie("session_id")
    response.delete_cookie("user_preferences")
    
    # Optional: Redirect to custom logout page
    # response.headers["Location"] = "/logout-success"
    
    print(f"User {user.identifier if user else 'unknown'} logged out")

async def log_user_event(user_id: str, event_type: str, timestamp: str, **kwargs):
    """Log user authentication events for auditing"""
    try:
        await db_client.insert_audit_log({
            "user_id": user_id,
            "event_type": event_type,
            "timestamp": timestamp,
            **kwargs
        })
    except Exception as e:
        print(f"Failed to log user event: {e}")

async def cleanup_user_resources(user_identifier: str):
    """Cleanup user-specific resources on logout"""
    try:
        # Clear user caches
        await cache_client.delete_pattern(f"user:{user_identifier}:*")
        
        # Cleanup temporary files
        await cleanup_temp_files(user_identifier)
        
        # Notify other services of logout
        await notify_logout_event(user_identifier)
        
    except Exception as e:
        print(f"Failed to cleanup resources for {user_identifier}: {e}")

Authentication Configuration

Configure authentication settings and customize the authentication flow.

import chainlit as cl

# Example: Configure authentication in app startup
@cl.on_app_startup
async def configure_auth():
    """Configure authentication settings"""
    
    # Set up database connections for auth
    await initialize_auth_database()
    
    # Configure OAuth providers (if using external OAuth)
    configure_oauth_providers()
    
    # Set up session management
    configure_session_settings()
    
    print("Authentication system initialized")

# Example: Conditional authentication based on environment
@cl.password_auth_callback
async def environment_aware_auth(username: str, password: str) -> Optional[cl.User]:
    """Authentication that varies by environment"""
    
    import os
    environment = os.getenv("ENVIRONMENT", "development")
    
    if environment == "development":
        # Simplified auth for development
        if username == "dev" and password == "dev":
            return cl.User(
                identifier="dev@example.com",
                display_name="Developer",
                metadata={"role": "admin", "env": "development"}
            )
    
    elif environment == "production":
        # Full authentication for production
        return await full_database_auth(username, password)
    
    return None

# Example: Multi-method authentication
@cl.password_auth_callback
async def multi_method_auth(username: str, password: str) -> Optional[cl.User]:
    """Try multiple authentication methods"""
    
    # Try LDAP authentication first
    user = await try_ldap_auth(username, password)
    if user:
        return user
    
    # Fall back to local database
    user = await try_database_auth(username, password)
    if user:
        return user
    
    # Fall back to external API
    user = await try_external_auth(username, password)
    return user

Core Types

from typing import Dict, Any, Optional
from fastapi import Headers, Request, Response

# Authentication callback types
PasswordAuthCallback = Callable[[str, str], Awaitable[Optional[User]]]
HeaderAuthCallback = Callable[[Headers], Awaitable[Optional[User]]]
OAuthCallback = Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]
LogoutCallback = Callable[[Request, Response], Any]

# OAuth provider data
OAuthProviderData = Dict[str, str]  # Raw user data from OAuth provider
OAuthToken = str                    # OAuth access token
ProviderID = str                   # OAuth provider identifier

# User metadata structure
UserMetadata = Dict[str, Any]      # Additional user information and permissions

Install with Tessl CLI

npx tessl i tessl/pypi-chainlit

docs

advanced.md

authentication.md

callbacks.md

index.md

input-widgets.md

integrations.md

messaging.md

ui-elements.md

user-management.md

tile.json