Build production-ready conversational AI applications in minutes with rich UI components and LLM integrations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced functionality including caching, async utilities, Model Context Protocol (MCP) support, server configuration, and production deployment features. These components enable building sophisticated, scalable conversational AI applications.
Function result caching for improved performance and reduced API calls.
import chainlit as cl
@cl.cache
def expensive_function(arg: str) -> str:
"""
Simple function result caching decorator for performance optimization.
Usage:
Apply to functions that perform expensive operations or API calls.
Results are cached based on function arguments.
Cache persists for the duration of the application session.
Args:
Function arguments are used as cache keys
Returns:
Cached function results
Note:
Cache is argument-based and not persistent across application restarts.
Use for computationally expensive operations, API calls, or data processing.
"""Usage examples for caching:
import chainlit as cl
import time
import hashlib
@cl.cache
def expensive_computation(data: str, iterations: int = 1000) -> dict:
"""Expensive computation with caching"""
# Simulate expensive computation
result = {"input": data, "hash": None, "processing_time": 0}
start_time = time.time()
# Simulate complex processing
for i in range(iterations):
hash_obj = hashlib.sha256(f"{data}_{i}".encode())
result["hash"] = hash_obj.hexdigest()
result["processing_time"] = time.time() - start_time
return result
@cl.cache
async def fetch_external_data(api_endpoint: str) -> dict:
"""Cache external API calls"""
# Simulate API call
await cl.sleep(2) # Simulate network delay
return {
"endpoint": api_endpoint,
"data": f"Response from {api_endpoint}",
"timestamp": time.time(),
"cached": False # First call won't be cached
}
@cl.cache
def process_document(file_path: str, analysis_type: str = "basic") -> dict:
"""Cache document processing results"""
# Simulate document processing
with open(file_path, 'r') as f:
content = f.read()
return {
"file_path": file_path,
"word_count": len(content.split()),
"char_count": len(content),
"analysis_type": analysis_type,
"processed_at": time.time()
}
@cl.on_message
async def demo_caching(message: cl.Message):
"""Demonstrate caching functionality"""
user_input = message.content
# First call - will compute and cache
async with cl.Step(name="Processing (may be cached)", type="tool") as step:
step.input = user_input
# This will be fast on subsequent calls with same input
result = expensive_computation(user_input, 500)
step.output = result
# API call - cached for same endpoint
api_result = await fetch_external_data("https://api.example.com/data")
await cl.Message(
f"Computation hash: {result['hash'][:16]}...\n"
f"Processing time: {result['processing_time']:.3f}s\n"
f"API response cached: {'Yes' if 'cached' in api_result else 'No'}"
).send()
# Manual cache management (if needed)
def clear_function_cache():
"""Example of manual cache clearing (implementation depends on cache system)"""
# This would clear all cached results
# Implementation varies based on caching system used
passUtilities for async/sync interoperability and execution control.
async def make_async(func: Callable) -> Callable:
"""
Convert synchronous function to asynchronous function.
Args:
func: Callable - Synchronous function to convert
Returns:
Callable - Asynchronous version of the function
Usage:
Wrap sync functions for use in async contexts.
Useful for integrating synchronous libraries with async Chainlit code.
"""
def run_sync(coro: Awaitable) -> Any:
"""
Run async function in synchronous context.
Args:
coro: Awaitable - Async function or coroutine to execute
Returns:
Any - Result of the async function
Usage:
Execute async code from synchronous functions.
Creates event loop if none exists.
"""
async def sleep(duration: int) -> None:
"""
Async sleep function for delays in conversational flows.
Args:
duration: int - Sleep duration in seconds
Returns:
None
Usage:
Add delays for natural conversation pacing or processing simulation.
Wrapper around asyncio.sleep with Chainlit context awareness.
"""Usage examples for async utilities:
import chainlit as cl
import asyncio
from typing import Callable, Any
# Convert sync functions to async
def sync_heavy_computation(data: str) -> str:
"""Synchronous heavy computation"""
import time
time.sleep(2) # Simulate heavy work
return f"Processed: {data.upper()}"
@cl.on_message
async def use_sync_function(message: cl.Message):
"""Use synchronous function in async context"""
# Convert sync function to async
async_computation = await cl.make_async(sync_heavy_computation)
# Now can be used in async context without blocking
result = await async_computation(message.content)
await cl.Message(result).send()
# Run async code from sync context
def sync_callback_example():
"""Synchronous function that needs to call async code"""
async def async_operation():
await cl.Message("Running async operation from sync context").send()
await cl.sleep(1)
return "Operation complete"
# Run async code from sync context
result = cl.run_sync(async_operation())
print(f"Result: {result}")
# Demonstration of async utilities
@cl.on_message
async def async_utilities_demo(message: cl.Message):
"""Demonstrate various async utilities"""
await cl.Message("Starting async operations...").send()
# Use async sleep for pacing
await cl.sleep(1)
# Convert and use sync function
sync_func = lambda x: x.replace(" ", "_").lower()
async_func = await cl.make_async(sync_func)
processed = await async_func(message.content)
await cl.Message(f"Processed input: {processed}").send()
# Simulate processing with sleep
await cl.Message("Processing... (3 seconds)").send()
await cl.sleep(3)
await cl.Message("Processing complete!").send()
# Advanced async patterns
@cl.on_message
async def concurrent_operations(message: cl.Message):
"""Demonstrate concurrent async operations"""
async def operation_1():
await cl.sleep(2)
return "Operation 1 complete"
async def operation_2():
await cl.sleep(3)
return "Operation 2 complete"
async def operation_3():
await cl.sleep(1)
return "Operation 3 complete"
# Run operations concurrently
await cl.Message("Starting concurrent operations...").send()
results = await asyncio.gather(
operation_1(),
operation_2(),
operation_3()
)
await cl.Message(f"All operations complete: {results}").send()Integration with Model Context Protocol for enhanced AI model communication and tool use.
@cl.on_mcp_connect
async def mcp_connect_handler(connection: McpConnection, session: ClientSession) -> None:
"""
Hook executed when MCP (Model Context Protocol) connection is established.
Args:
connection: McpConnection - MCP connection object
session: ClientSession - Client session for MCP communication
Signature: Callable[[McpConnection, ClientSession], Awaitable[None]]
Returns:
None
Usage:
Initialize MCP tools, configure model capabilities, setup context.
Called when connection to MCP server is successfully established.
"""
@cl.on_mcp_disconnect
async def mcp_disconnect_handler(connection_id: str, session: ClientSession) -> None:
"""
Hook executed when MCP connection is closed or lost.
Args:
connection_id: str - Identifier of the closed connection
session: ClientSession - Client session that was disconnected
Signature: Callable[[str, ClientSession], Awaitable[None]]
Returns:
None
Usage:
Cleanup MCP resources, handle connection failures, log disconnection events.
"""Usage examples for MCP integration:
import chainlit as cl
from typing import Dict, Any
# Global MCP state
mcp_connections = {}
mcp_tools = {}
@cl.on_mcp_connect
async def handle_mcp_connection(connection, session):
"""Handle MCP connection establishment"""
connection_id = connection.id
mcp_connections[connection_id] = connection
# Initialize available tools from MCP server
try:
# Query available tools from MCP server
tools_response = await session.list_tools()
available_tools = tools_response.get("tools", [])
mcp_tools[connection_id] = available_tools
# Log available tools
tool_names = [tool.get("name", "unknown") for tool in available_tools]
await cl.Message(
f"✅ MCP connection established!\n"
f"Connection ID: {connection_id}\n"
f"Available tools: {', '.join(tool_names)}"
).send()
# Store connection info in session
cl.user_session.set("mcp_connection_id", connection_id)
cl.user_session.set("mcp_tools", tool_names)
except Exception as e:
await cl.Message(f"❌ Error initializing MCP tools: {str(e)}").send()
@cl.on_mcp_disconnect
async def handle_mcp_disconnection(connection_id: str, session):
"""Handle MCP connection loss"""
# Clean up connection state
if connection_id in mcp_connections:
del mcp_connections[connection_id]
if connection_id in mcp_tools:
del mcp_tools[connection_id]
# Notify user of disconnection
await cl.Message(
f"🔌 MCP connection {connection_id} disconnected"
).send()
# Clear session state
if cl.user_session.get("mcp_connection_id") == connection_id:
cl.user_session.set("mcp_connection_id", None)
cl.user_session.set("mcp_tools", [])
@cl.on_message
async def use_mcp_tools(message: cl.Message):
"""Use MCP tools in conversation"""
connection_id = cl.user_session.get("mcp_connection_id")
available_tools = cl.user_session.get("mcp_tools", [])
if not connection_id or not available_tools:
await cl.Message("No MCP tools available. Please connect to an MCP server.").send()
return
user_input = message.content.lower()
# Example: Use file reading tool
if "read file" in user_input or "file content" in user_input:
if "file_read" in available_tools:
await use_mcp_file_tool(connection_id, message.content)
else:
await cl.Message("File reading tool not available in MCP server.").send()
# Example: Use web search tool
elif "search" in user_input or "lookup" in user_input:
if "web_search" in available_tools:
await use_mcp_search_tool(connection_id, message.content)
else:
await cl.Message("Web search tool not available in MCP server.").send()
else:
# List available tools
tools_text = "\n".join([f"• {tool}" for tool in available_tools])
await cl.Message(
f"Available MCP tools:\n{tools_text}\n\n"
f"Try asking me to 'read file' or 'search' something!"
).send()
async def use_mcp_file_tool(connection_id: str, user_input: str):
"""Use MCP file reading tool"""
connection = mcp_connections.get(connection_id)
if not connection:
return
try:
async with cl.Step(name="MCP File Operation", type="tool") as step:
step.input = user_input
# Call MCP file tool (example)
result = await connection.call_tool("file_read", {
"path": "/example/file.txt" # Extract from user input
})
step.output = result
await cl.Message(f"File content:\n```\n{result.get('content', 'No content')}\n```").send()
except Exception as e:
await cl.Message(f"MCP file operation failed: {str(e)}").send()
async def use_mcp_search_tool(connection_id: str, query: str):
"""Use MCP web search tool"""
connection = mcp_connections.get(connection_id)
if not connection:
return
try:
async with cl.Step(name="MCP Web Search", type="tool") as step:
step.input = query
# Call MCP search tool
result = await connection.call_tool("web_search", {
"query": query,
"max_results": 5
})
step.output = result
# Format search results
results = result.get("results", [])
if results:
formatted_results = "\n\n".join([
f"**{r.get('title', 'No title')}**\n{r.get('url', '')}\n{r.get('snippet', '')}"
for r in results[:3] # Show top 3 results
])
await cl.Message(f"Search Results:\n\n{formatted_results}").send()
else:
await cl.Message("No search results found.").send()
except Exception as e:
await cl.Message(f"MCP search failed: {str(e)}").send()Support for copilot-style function calling and tool integration.
@dataclass
class CopilotFunction:
"""
Represents a copilot function call with name and arguments.
Fields:
name: str - Function name to execute
args: Dict[str, Any] - Function arguments and parameters
Methods:
acall() - Execute the function call asynchronously
Usage:
Represents function calls from AI models that can be executed
in the Chainlit environment with proper tracking and observability.
"""
name: str
args: Dict[str, Any]
async def acall(self) -> Any:
"""
Execute the copilot function call asynchronously.
Returns:
Any - Result of the function execution
Usage:
Execute the function with provided arguments.
Automatically tracked as a step in Chainlit UI.
"""Usage examples for copilot functions:
import chainlit as cl
from dataclasses import dataclass
from typing import Dict, Any
# Define available copilot functions
COPILOT_FUNCTIONS = {
"calculate": calculate_function,
"search_web": web_search_function,
"analyze_data": data_analysis_function,
"generate_chart": chart_generation_function
}
async def calculate_function(expression: str) -> Dict[str, Any]:
"""Calculator function for copilot"""
try:
result = eval(expression) # Use safe eval in production
return {
"result": result,
"expression": expression,
"success": True
}
except Exception as e:
return {
"error": str(e),
"expression": expression,
"success": False
}
async def web_search_function(query: str, max_results: int = 5) -> Dict[str, Any]:
"""Web search function for copilot"""
# Mock implementation - replace with real search
await cl.sleep(1) # Simulate search delay
return {
"query": query,
"results": [
{"title": f"Result 1 for '{query}'", "url": "https://example1.com"},
{"title": f"Result 2 for '{query}'", "url": "https://example2.com"}
],
"count": 2,
"success": True
}
@cl.on_message
async def handle_copilot_functions(message: cl.Message):
"""Handle messages that may contain copilot function calls"""
user_input = message.content
# Parse potential function calls (this would typically be done by an LLM)
if user_input.startswith("/calc "):
# Extract calculation expression
expression = user_input[6:].strip()
# Create copilot function
calc_function = cl.CopilotFunction(
name="calculate",
args={"expression": expression}
)
# Execute function
result = await execute_copilot_function(calc_function)
if result.get("success"):
await cl.Message(f"Calculation result: {result['result']}").send()
else:
await cl.Message(f"Calculation error: {result['error']}").send()
elif user_input.startswith("/search "):
# Extract search query
query = user_input[8:].strip()
search_function = cl.CopilotFunction(
name="search_web",
args={"query": query, "max_results": 3}
)
result = await execute_copilot_function(search_function)
if result.get("success"):
results_text = "\n".join([
f"• {r['title']}: {r['url']}"
for r in result['results']
])
await cl.Message(f"Search results for '{query}':\n\n{results_text}").send()
else:
await cl.Message(
"Try these copilot functions:\n"
"• `/calc 2 + 3 * 4` - Calculate expressions\n"
"• `/search python tutorial` - Search the web\n"
).send()
async def execute_copilot_function(copilot_func: cl.CopilotFunction) -> Any:
"""Execute a copilot function with step tracking"""
function_impl = COPILOT_FUNCTIONS.get(copilot_func.name)
if not function_impl:
return {"error": f"Function '{copilot_func.name}' not available", "success": False}
async with cl.Step(name=f"Copilot: {copilot_func.name}", type="tool") as step:
step.input = copilot_func.args
try:
# Execute the copilot function
result = await copilot_func.acall()
step.output = result
return result
except Exception as e:
error_result = {"error": str(e), "success": False}
step.output = error_result
return error_result
# Alternative: LLM-driven copilot function detection
@cl.on_message
async def llm_copilot_integration(message: cl.Message):
"""Integrate copilot functions with LLM function calling"""
# Define function schemas for LLM
function_schemas = [
{
"name": "calculate",
"description": "Perform mathematical calculations",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Mathematical expression"}
},
"required": ["expression"]
}
},
{
"name": "search_web",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"max_results": {"type": "integer", "description": "Maximum results", "default": 5}
},
"required": ["query"]
}
}
]
# Use LLM to determine if function calls are needed
# (This would integrate with your preferred LLM service)
# Example: Mock function call detection
if "calculate" in message.content.lower():
# Extract expression (in real implementation, LLM would do this)
import re
expr_match = re.search(r'calculate\s+(.+)', message.content, re.IGNORECASE)
if expr_match:
expression = expr_match.group(1).strip()
copilot_func = cl.CopilotFunction(
name="calculate",
args={"expression": expression}
)
result = await execute_copilot_function(copilot_func)
await cl.Message(f"Calculation: {expression} = {result.get('result', 'Error')}").send()Advanced server configuration, data layer management, and production deployment features.
@cl.data_layer
def configure_data_layer() -> BaseDataLayer:
"""
Configure custom data layer for persistence and data management.
Signature: Callable[[], BaseDataLayer]
Returns:
BaseDataLayer - Custom data layer implementation
Usage:
Configure database connections, file storage, caching layers.
Customize how Chainlit stores and retrieves conversation data.
"""Usage examples for server configuration:
import chainlit as cl
from typing import Any, Dict
# Custom data layer implementation
class CustomDataLayer:
"""Custom data layer for production deployment"""
def __init__(self, database_url: str, redis_url: str):
self.database_url = database_url
self.redis_url = redis_url
# Initialize connections
async def create_user(self, user_data: Dict[str, Any]) -> str:
"""Create user in database"""
# Implementation for user creation
pass
async def get_user(self, user_id: str) -> Dict[str, Any]:
"""Retrieve user from database"""
# Implementation for user retrieval
pass
async def store_message(self, message_data: Dict[str, Any]) -> str:
"""Store message in database"""
# Implementation for message storage
pass
@cl.data_layer
def setup_production_data_layer():
"""Configure production data layer"""
import os
database_url = os.getenv("DATABASE_URL")
redis_url = os.getenv("REDIS_URL")
if not database_url:
raise ValueError("DATABASE_URL environment variable required")
return CustomDataLayer(database_url, redis_url)
# Production configuration
@cl.on_app_startup
async def production_startup():
"""Production application startup configuration"""
import os
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Initialize external services
await init_database_pool()
await init_redis_cache()
await init_monitoring_services()
# Load configuration from environment
config = {
"max_concurrent_users": int(os.getenv("MAX_CONCURRENT_USERS", "100")),
"rate_limit": int(os.getenv("RATE_LIMIT", "60")),
"session_timeout": int(os.getenv("SESSION_TIMEOUT", "3600")),
}
cl.user_session.set("app_config", config)
logging.info("Production application startup complete")
@cl.on_app_shutdown
async def production_shutdown():
"""Production application shutdown"""
import logging
# Cleanup resources
await cleanup_database_connections()
await cleanup_redis_connections()
await save_application_state()
logging.info("Production application shutdown complete")
# Health check and monitoring
async def health_check() -> Dict[str, Any]:
"""Application health check for monitoring"""
try:
# Check database connectivity
db_status = await check_database_health()
# Check Redis connectivity
cache_status = await check_redis_health()
# Check external service dependencies
external_services = await check_external_services()
return {
"status": "healthy",
"timestamp": time.time(),
"checks": {
"database": db_status,
"cache": cache_status,
"external_services": external_services
}
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": time.time()
}
# Configuration management
class AppConfig:
"""Application configuration management"""
def __init__(self):
self.load_config()
def load_config(self):
"""Load configuration from environment and files"""
import os
self.debug = os.getenv("DEBUG", "false").lower() == "true"
self.environment = os.getenv("ENVIRONMENT", "development")
self.secret_key = os.getenv("SECRET_KEY", "dev-secret")
# Load feature flags
self.features = {
"oauth_enabled": os.getenv("OAUTH_ENABLED", "true").lower() == "true",
"mcp_enabled": os.getenv("MCP_ENABLED", "false").lower() == "true",
"analytics_enabled": os.getenv("ANALYTICS_ENABLED", "true").lower() == "true"
}
def get_database_config(self):
"""Get database configuration"""
import os
return {
"url": os.getenv("DATABASE_URL"),
"pool_size": int(os.getenv("DB_POOL_SIZE", "10")),
"max_overflow": int(os.getenv("DB_MAX_OVERFLOW", "20"))
}
# Initialize global configuration
app_config = AppConfig()from typing import Any, Dict, Callable, Awaitable
from dataclasses import dataclass
# MCP (Model Context Protocol) types
class McpConnection:
"""MCP connection object"""
id: str
# Additional connection properties
class ClientSession:
"""MCP client session"""
# Session methods and properties
# Copilot function type
@dataclass
class CopilotFunction:
name: str
args: Dict[str, Any]
async def acall(self) -> Any: ...
# Data layer interface
class BaseDataLayer:
"""Base interface for custom data layers"""
async def create_user(self, user_data: Dict) -> str: ...
async def get_user(self, user_id: str) -> Dict: ...
async def store_message(self, message_data: Dict) -> str: ...
# Additional data layer methods
# Configuration types
AppConfiguration = Dict[str, Any]
FeatureFlags = Dict[str, bool]
HealthStatus = Dict[str, Any]
# Utility function types
AsyncUtilityFunction = Callable[..., Awaitable[Any]]
SyncToAsyncWrapper = Callable[[Callable], Awaitable[Callable]]Install with Tessl CLI
npx tessl i tessl/pypi-chainlit