CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-chainlit

Build production-ready conversational AI applications in minutes with rich UI components and LLM integrations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

advanced.mddocs/

Advanced Features

Advanced functionality including caching, async utilities, Model Context Protocol (MCP) support, server configuration, and production deployment features. These components enable building sophisticated, scalable conversational AI applications.

Capabilities

Caching

Function result caching for improved performance and reduced API calls.

import chainlit as cl

@cl.cache
def expensive_function(arg: str) -> str:
    """
    Simple function result caching decorator for performance optimization.
    
    Usage:
        Apply to functions that perform expensive operations or API calls.
        Results are cached based on function arguments.
        Cache persists for the duration of the application session.
    
    Args:
        Function arguments are used as cache keys
        
    Returns:
        Cached function results
        
    Note:
        Cache is argument-based and not persistent across application restarts.
        Use for computationally expensive operations, API calls, or data processing.
    """

Usage examples for caching:

import chainlit as cl
import time
import hashlib

@cl.cache
def expensive_computation(data: str, iterations: int = 1000) -> dict:
    """Expensive computation with caching"""
    
    # Simulate expensive computation
    result = {"input": data, "hash": None, "processing_time": 0}
    
    start_time = time.time()
    
    # Simulate complex processing
    for i in range(iterations):
        hash_obj = hashlib.sha256(f"{data}_{i}".encode())
        result["hash"] = hash_obj.hexdigest()
    
    result["processing_time"] = time.time() - start_time
    return result

@cl.cache
async def fetch_external_data(api_endpoint: str) -> dict:
    """Cache external API calls"""
    
    # Simulate API call
    await cl.sleep(2)  # Simulate network delay
    
    return {
        "endpoint": api_endpoint,
        "data": f"Response from {api_endpoint}",
        "timestamp": time.time(),
        "cached": False  # First call won't be cached
    }

@cl.cache
def process_document(file_path: str, analysis_type: str = "basic") -> dict:
    """Cache document processing results"""
    
    # Simulate document processing
    with open(file_path, 'r') as f:
        content = f.read()
    
    return {
        "file_path": file_path,
        "word_count": len(content.split()),
        "char_count": len(content),
        "analysis_type": analysis_type,
        "processed_at": time.time()
    }

@cl.on_message
async def demo_caching(message: cl.Message):
    """Demonstrate caching functionality"""
    
    user_input = message.content
    
    # First call - will compute and cache
    async with cl.Step(name="Processing (may be cached)", type="tool") as step:
        step.input = user_input
        
        # This will be fast on subsequent calls with same input
        result = expensive_computation(user_input, 500)
        step.output = result
    
    # API call - cached for same endpoint
    api_result = await fetch_external_data("https://api.example.com/data")
    
    await cl.Message(
        f"Computation hash: {result['hash'][:16]}...\n"
        f"Processing time: {result['processing_time']:.3f}s\n"
        f"API response cached: {'Yes' if 'cached' in api_result else 'No'}"
    ).send()

# Manual cache management (if needed)
def clear_function_cache():
    """Example of manual cache clearing (implementation depends on cache system)"""
    # This would clear all cached results
    # Implementation varies based on caching system used
    pass

Async Utilities

Utilities for async/sync interoperability and execution control.

async def make_async(func: Callable) -> Callable:
    """
    Convert synchronous function to asynchronous function.
    
    Args:
        func: Callable - Synchronous function to convert
        
    Returns:
        Callable - Asynchronous version of the function
        
    Usage:
        Wrap sync functions for use in async contexts.
        Useful for integrating synchronous libraries with async Chainlit code.
    """

def run_sync(coro: Awaitable) -> Any:
    """
    Run async function in synchronous context.
    
    Args:
        coro: Awaitable - Async function or coroutine to execute
        
    Returns:
        Any - Result of the async function
        
    Usage:
        Execute async code from synchronous functions.
        Creates event loop if none exists.
    """

async def sleep(duration: int) -> None:
    """
    Async sleep function for delays in conversational flows.
    
    Args:
        duration: int - Sleep duration in seconds
        
    Returns:
        None
        
    Usage:
        Add delays for natural conversation pacing or processing simulation.
        Wrapper around asyncio.sleep with Chainlit context awareness.
    """

Usage examples for async utilities:

import chainlit as cl
import asyncio
from typing import Callable, Any

# Convert sync functions to async
def sync_heavy_computation(data: str) -> str:
    """Synchronous heavy computation"""
    import time
    time.sleep(2)  # Simulate heavy work
    return f"Processed: {data.upper()}"

@cl.on_message
async def use_sync_function(message: cl.Message):
    """Use synchronous function in async context"""
    
    # Convert sync function to async
    async_computation = await cl.make_async(sync_heavy_computation)
    
    # Now can be used in async context without blocking
    result = await async_computation(message.content)
    
    await cl.Message(result).send()

# Run async code from sync context
def sync_callback_example():
    """Synchronous function that needs to call async code"""
    
    async def async_operation():
        await cl.Message("Running async operation from sync context").send()
        await cl.sleep(1)
        return "Operation complete"
    
    # Run async code from sync context
    result = cl.run_sync(async_operation())
    print(f"Result: {result}")

# Demonstration of async utilities
@cl.on_message
async def async_utilities_demo(message: cl.Message):
    """Demonstrate various async utilities"""
    
    await cl.Message("Starting async operations...").send()
    
    # Use async sleep for pacing
    await cl.sleep(1)
    
    # Convert and use sync function
    sync_func = lambda x: x.replace(" ", "_").lower()
    async_func = await cl.make_async(sync_func)
    processed = await async_func(message.content)
    
    await cl.Message(f"Processed input: {processed}").send()
    
    # Simulate processing with sleep
    await cl.Message("Processing... (3 seconds)").send()
    await cl.sleep(3)
    
    await cl.Message("Processing complete!").send()

# Advanced async patterns
@cl.on_message
async def concurrent_operations(message: cl.Message):
    """Demonstrate concurrent async operations"""
    
    async def operation_1():
        await cl.sleep(2)
        return "Operation 1 complete"
    
    async def operation_2():
        await cl.sleep(3)
        return "Operation 2 complete"
    
    async def operation_3():
        await cl.sleep(1)
        return "Operation 3 complete"
    
    # Run operations concurrently
    await cl.Message("Starting concurrent operations...").send()
    
    results = await asyncio.gather(
        operation_1(),
        operation_2(), 
        operation_3()
    )
    
    await cl.Message(f"All operations complete: {results}").send()

Model Context Protocol (MCP)

Integration with Model Context Protocol for enhanced AI model communication and tool use.

@cl.on_mcp_connect
async def mcp_connect_handler(connection: McpConnection, session: ClientSession) -> None:
    """
    Hook executed when MCP (Model Context Protocol) connection is established.
    
    Args:
        connection: McpConnection - MCP connection object
        session: ClientSession - Client session for MCP communication
        
    Signature: Callable[[McpConnection, ClientSession], Awaitable[None]]
    
    Returns:
        None
        
    Usage:
        Initialize MCP tools, configure model capabilities, setup context.
        Called when connection to MCP server is successfully established.
    """

@cl.on_mcp_disconnect
async def mcp_disconnect_handler(connection_id: str, session: ClientSession) -> None:
    """
    Hook executed when MCP connection is closed or lost.
    
    Args:
        connection_id: str - Identifier of the closed connection
        session: ClientSession - Client session that was disconnected
        
    Signature: Callable[[str, ClientSession], Awaitable[None]]
    
    Returns:
        None
        
    Usage:
        Cleanup MCP resources, handle connection failures, log disconnection events.
    """

Usage examples for MCP integration:

import chainlit as cl
from typing import Dict, Any

# Global MCP state
mcp_connections = {}
mcp_tools = {}

@cl.on_mcp_connect
async def handle_mcp_connection(connection, session):
    """Handle MCP connection establishment"""
    
    connection_id = connection.id
    mcp_connections[connection_id] = connection
    
    # Initialize available tools from MCP server
    try:
        # Query available tools from MCP server
        tools_response = await session.list_tools()
        available_tools = tools_response.get("tools", [])
        
        mcp_tools[connection_id] = available_tools
        
        # Log available tools
        tool_names = [tool.get("name", "unknown") for tool in available_tools]
        
        await cl.Message(
            f"✅ MCP connection established!\n"
            f"Connection ID: {connection_id}\n"
            f"Available tools: {', '.join(tool_names)}"
        ).send()
        
        # Store connection info in session
        cl.user_session.set("mcp_connection_id", connection_id)
        cl.user_session.set("mcp_tools", tool_names)
        
    except Exception as e:
        await cl.Message(f"❌ Error initializing MCP tools: {str(e)}").send()

@cl.on_mcp_disconnect
async def handle_mcp_disconnection(connection_id: str, session):
    """Handle MCP connection loss"""
    
    # Clean up connection state
    if connection_id in mcp_connections:
        del mcp_connections[connection_id]
    
    if connection_id in mcp_tools:
        del mcp_tools[connection_id]
    
    # Notify user of disconnection
    await cl.Message(
        f"🔌 MCP connection {connection_id} disconnected"
    ).send()
    
    # Clear session state
    if cl.user_session.get("mcp_connection_id") == connection_id:
        cl.user_session.set("mcp_connection_id", None)
        cl.user_session.set("mcp_tools", [])

@cl.on_message
async def use_mcp_tools(message: cl.Message):
    """Use MCP tools in conversation"""
    
    connection_id = cl.user_session.get("mcp_connection_id")
    available_tools = cl.user_session.get("mcp_tools", [])
    
    if not connection_id or not available_tools:
        await cl.Message("No MCP tools available. Please connect to an MCP server.").send()
        return
    
    user_input = message.content.lower()
    
    # Example: Use file reading tool
    if "read file" in user_input or "file content" in user_input:
        if "file_read" in available_tools:
            await use_mcp_file_tool(connection_id, message.content)
        else:
            await cl.Message("File reading tool not available in MCP server.").send()
    
    # Example: Use web search tool
    elif "search" in user_input or "lookup" in user_input:
        if "web_search" in available_tools:
            await use_mcp_search_tool(connection_id, message.content)
        else:
            await cl.Message("Web search tool not available in MCP server.").send()
    
    else:
        # List available tools
        tools_text = "\n".join([f"• {tool}" for tool in available_tools])
        await cl.Message(
            f"Available MCP tools:\n{tools_text}\n\n"
            f"Try asking me to 'read file' or 'search' something!"
        ).send()

async def use_mcp_file_tool(connection_id: str, user_input: str):
    """Use MCP file reading tool"""
    
    connection = mcp_connections.get(connection_id)
    if not connection:
        return
    
    try:
        async with cl.Step(name="MCP File Operation", type="tool") as step:
            step.input = user_input
            
            # Call MCP file tool (example)
            result = await connection.call_tool("file_read", {
                "path": "/example/file.txt"  # Extract from user input
            })
            
            step.output = result
            
        await cl.Message(f"File content:\n```\n{result.get('content', 'No content')}\n```").send()
        
    except Exception as e:
        await cl.Message(f"MCP file operation failed: {str(e)}").send()

async def use_mcp_search_tool(connection_id: str, query: str):
    """Use MCP web search tool"""
    
    connection = mcp_connections.get(connection_id)
    if not connection:
        return
    
    try:
        async with cl.Step(name="MCP Web Search", type="tool") as step:
            step.input = query
            
            # Call MCP search tool
            result = await connection.call_tool("web_search", {
                "query": query,
                "max_results": 5
            })
            
            step.output = result
            
        # Format search results
        results = result.get("results", [])
        if results:
            formatted_results = "\n\n".join([
                f"**{r.get('title', 'No title')}**\n{r.get('url', '')}\n{r.get('snippet', '')}"
                for r in results[:3]  # Show top 3 results
            ])
            await cl.Message(f"Search Results:\n\n{formatted_results}").send()
        else:
            await cl.Message("No search results found.").send()
            
    except Exception as e:
        await cl.Message(f"MCP search failed: {str(e)}").send()

Copilot Functions

Support for copilot-style function calling and tool integration.

@dataclass
class CopilotFunction:
    """
    Represents a copilot function call with name and arguments.
    
    Fields:
        name: str - Function name to execute
        args: Dict[str, Any] - Function arguments and parameters
        
    Methods:
        acall() - Execute the function call asynchronously
        
    Usage:
        Represents function calls from AI models that can be executed
        in the Chainlit environment with proper tracking and observability.
    """
    name: str
    args: Dict[str, Any]
    
    async def acall(self) -> Any:
        """
        Execute the copilot function call asynchronously.
        
        Returns:
            Any - Result of the function execution
            
        Usage:
            Execute the function with provided arguments.
            Automatically tracked as a step in Chainlit UI.
        """

Usage examples for copilot functions:

import chainlit as cl
from dataclasses import dataclass
from typing import Dict, Any

# Define available copilot functions
COPILOT_FUNCTIONS = {
    "calculate": calculate_function,
    "search_web": web_search_function,
    "analyze_data": data_analysis_function,
    "generate_chart": chart_generation_function
}

async def calculate_function(expression: str) -> Dict[str, Any]:
    """Calculator function for copilot"""
    try:
        result = eval(expression)  # Use safe eval in production
        return {
            "result": result,
            "expression": expression,
            "success": True
        }
    except Exception as e:
        return {
            "error": str(e),
            "expression": expression,
            "success": False
        }

async def web_search_function(query: str, max_results: int = 5) -> Dict[str, Any]:
    """Web search function for copilot"""
    # Mock implementation - replace with real search
    await cl.sleep(1)  # Simulate search delay
    
    return {
        "query": query,
        "results": [
            {"title": f"Result 1 for '{query}'", "url": "https://example1.com"},
            {"title": f"Result 2 for '{query}'", "url": "https://example2.com"}
        ],
        "count": 2,
        "success": True
    }

@cl.on_message
async def handle_copilot_functions(message: cl.Message):
    """Handle messages that may contain copilot function calls"""
    
    user_input = message.content
    
    # Parse potential function calls (this would typically be done by an LLM)
    if user_input.startswith("/calc "):
        # Extract calculation expression
        expression = user_input[6:].strip()
        
        # Create copilot function
        calc_function = cl.CopilotFunction(
            name="calculate",
            args={"expression": expression}
        )
        
        # Execute function
        result = await execute_copilot_function(calc_function)
        
        if result.get("success"):
            await cl.Message(f"Calculation result: {result['result']}").send()
        else:
            await cl.Message(f"Calculation error: {result['error']}").send()
    
    elif user_input.startswith("/search "):
        # Extract search query
        query = user_input[8:].strip()
        
        search_function = cl.CopilotFunction(
            name="search_web",
            args={"query": query, "max_results": 3}
        )
        
        result = await execute_copilot_function(search_function)
        
        if result.get("success"):
            results_text = "\n".join([
                f"• {r['title']}: {r['url']}"
                for r in result['results']
            ])
            await cl.Message(f"Search results for '{query}':\n\n{results_text}").send()
    
    else:
        await cl.Message(
            "Try these copilot functions:\n"
            "• `/calc 2 + 3 * 4` - Calculate expressions\n"
            "• `/search python tutorial` - Search the web\n"
        ).send()

async def execute_copilot_function(copilot_func: cl.CopilotFunction) -> Any:
    """Execute a copilot function with step tracking"""
    
    function_impl = COPILOT_FUNCTIONS.get(copilot_func.name)
    
    if not function_impl:
        return {"error": f"Function '{copilot_func.name}' not available", "success": False}
    
    async with cl.Step(name=f"Copilot: {copilot_func.name}", type="tool") as step:
        step.input = copilot_func.args
        
        try:
            # Execute the copilot function
            result = await copilot_func.acall()
            step.output = result
            return result
            
        except Exception as e:
            error_result = {"error": str(e), "success": False}
            step.output = error_result
            return error_result

# Alternative: LLM-driven copilot function detection
@cl.on_message
async def llm_copilot_integration(message: cl.Message):
    """Integrate copilot functions with LLM function calling"""
    
    # Define function schemas for LLM
    function_schemas = [
        {
            "name": "calculate", 
            "description": "Perform mathematical calculations",
            "parameters": {
                "type": "object",
                "properties": {
                    "expression": {"type": "string", "description": "Mathematical expression"}
                },
                "required": ["expression"]
            }
        },
        {
            "name": "search_web",
            "description": "Search the web for information", 
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Search query"},
                    "max_results": {"type": "integer", "description": "Maximum results", "default": 5}
                },
                "required": ["query"]
            }
        }
    ]
    
    # Use LLM to determine if function calls are needed
    # (This would integrate with your preferred LLM service)
    
    # Example: Mock function call detection
    if "calculate" in message.content.lower():
        # Extract expression (in real implementation, LLM would do this)
        import re
        expr_match = re.search(r'calculate\s+(.+)', message.content, re.IGNORECASE)
        if expr_match:
            expression = expr_match.group(1).strip()
            
            copilot_func = cl.CopilotFunction(
                name="calculate",
                args={"expression": expression}
            )
            
            result = await execute_copilot_function(copilot_func)
            await cl.Message(f"Calculation: {expression} = {result.get('result', 'Error')}").send()

Server and Configuration Features

Advanced server configuration, data layer management, and production deployment features.

@cl.data_layer
def configure_data_layer() -> BaseDataLayer:
    """
    Configure custom data layer for persistence and data management.
    
    Signature: Callable[[], BaseDataLayer]
    
    Returns:
        BaseDataLayer - Custom data layer implementation
        
    Usage:
        Configure database connections, file storage, caching layers.
        Customize how Chainlit stores and retrieves conversation data.
    """

Usage examples for server configuration:

import chainlit as cl
from typing import Any, Dict

# Custom data layer implementation
class CustomDataLayer:
    """Custom data layer for production deployment"""
    
    def __init__(self, database_url: str, redis_url: str):
        self.database_url = database_url
        self.redis_url = redis_url
        # Initialize connections
    
    async def create_user(self, user_data: Dict[str, Any]) -> str:
        """Create user in database"""
        # Implementation for user creation
        pass
    
    async def get_user(self, user_id: str) -> Dict[str, Any]:
        """Retrieve user from database"""
        # Implementation for user retrieval
        pass
    
    async def store_message(self, message_data: Dict[str, Any]) -> str:
        """Store message in database"""
        # Implementation for message storage
        pass

@cl.data_layer
def setup_production_data_layer():
    """Configure production data layer"""
    import os
    
    database_url = os.getenv("DATABASE_URL")
    redis_url = os.getenv("REDIS_URL")
    
    if not database_url:
        raise ValueError("DATABASE_URL environment variable required")
    
    return CustomDataLayer(database_url, redis_url)

# Production configuration
@cl.on_app_startup
async def production_startup():
    """Production application startup configuration"""
    import os
    import logging
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Initialize external services
    await init_database_pool()
    await init_redis_cache()
    await init_monitoring_services()
    
    # Load configuration from environment
    config = {
        "max_concurrent_users": int(os.getenv("MAX_CONCURRENT_USERS", "100")),
        "rate_limit": int(os.getenv("RATE_LIMIT", "60")),
        "session_timeout": int(os.getenv("SESSION_TIMEOUT", "3600")),
    }
    
    cl.user_session.set("app_config", config)
    
    logging.info("Production application startup complete")

@cl.on_app_shutdown
async def production_shutdown():
    """Production application shutdown"""
    import logging
    
    # Cleanup resources
    await cleanup_database_connections()
    await cleanup_redis_connections()
    await save_application_state()
    
    logging.info("Production application shutdown complete")

# Health check and monitoring
async def health_check() -> Dict[str, Any]:
    """Application health check for monitoring"""
    
    try:
        # Check database connectivity
        db_status = await check_database_health()
        
        # Check Redis connectivity
        cache_status = await check_redis_health()
        
        # Check external service dependencies
        external_services = await check_external_services()
        
        return {
            "status": "healthy",
            "timestamp": time.time(),
            "checks": {
                "database": db_status,
                "cache": cache_status,
                "external_services": external_services
            }
        }
        
    except Exception as e:
        return {
            "status": "unhealthy", 
            "error": str(e),
            "timestamp": time.time()
        }

# Configuration management
class AppConfig:
    """Application configuration management"""
    
    def __init__(self):
        self.load_config()
    
    def load_config(self):
        """Load configuration from environment and files"""
        import os
        
        self.debug = os.getenv("DEBUG", "false").lower() == "true"
        self.environment = os.getenv("ENVIRONMENT", "development")
        self.secret_key = os.getenv("SECRET_KEY", "dev-secret")
        
        # Load feature flags
        self.features = {
            "oauth_enabled": os.getenv("OAUTH_ENABLED", "true").lower() == "true",
            "mcp_enabled": os.getenv("MCP_ENABLED", "false").lower() == "true",
            "analytics_enabled": os.getenv("ANALYTICS_ENABLED", "true").lower() == "true"
        }
    
    def get_database_config(self):
        """Get database configuration"""
        import os
        return {
            "url": os.getenv("DATABASE_URL"),
            "pool_size": int(os.getenv("DB_POOL_SIZE", "10")),
            "max_overflow": int(os.getenv("DB_MAX_OVERFLOW", "20"))
        }

# Initialize global configuration
app_config = AppConfig()

Core Types

from typing import Any, Dict, Callable, Awaitable
from dataclasses import dataclass

# MCP (Model Context Protocol) types
class McpConnection:
    """MCP connection object"""
    id: str
    # Additional connection properties

class ClientSession:
    """MCP client session"""
    # Session methods and properties

# Copilot function type
@dataclass
class CopilotFunction:
    name: str
    args: Dict[str, Any]
    
    async def acall(self) -> Any: ...

# Data layer interface
class BaseDataLayer:
    """Base interface for custom data layers"""
    
    async def create_user(self, user_data: Dict) -> str: ...
    async def get_user(self, user_id: str) -> Dict: ...
    async def store_message(self, message_data: Dict) -> str: ...
    # Additional data layer methods

# Configuration types
AppConfiguration = Dict[str, Any]
FeatureFlags = Dict[str, bool]
HealthStatus = Dict[str, Any]

# Utility function types
AsyncUtilityFunction = Callable[..., Awaitable[Any]]
SyncToAsyncWrapper = Callable[[Callable], Awaitable[Callable]]

Install with Tessl CLI

npx tessl i tessl/pypi-chainlit

docs

advanced.md

authentication.md

callbacks.md

index.md

input-widgets.md

integrations.md

messaging.md

ui-elements.md

user-management.md

tile.json