0
# Advanced Features
1
2
Advanced functionality including caching, async utilities, Model Context Protocol (MCP) support, server configuration, and production deployment features. These components enable building sophisticated, scalable conversational AI applications.
3
4
## Capabilities
5
6
### Caching
7
8
Function result caching for improved performance and reduced API calls.
9
10
```python { .api }
11
import chainlit as cl
12
13
@cl.cache
14
def expensive_function(arg: str) -> str:
15
"""
16
Simple function result caching decorator for performance optimization.
17
18
Usage:
19
Apply to functions that perform expensive operations or API calls.
20
Results are cached based on function arguments.
21
Cache persists for the duration of the application session.
22
23
Args:
24
Function arguments are used as cache keys
25
26
Returns:
27
Cached function results
28
29
Note:
30
Cache is argument-based and not persistent across application restarts.
31
Use for computationally expensive operations, API calls, or data processing.
32
"""
33
```
34
35
Usage examples for caching:
36
37
```python
38
import chainlit as cl
39
import time
40
import hashlib
41
42
@cl.cache
43
def expensive_computation(data: str, iterations: int = 1000) -> dict:
44
"""Expensive computation with caching"""
45
46
# Simulate expensive computation
47
result = {"input": data, "hash": None, "processing_time": 0}
48
49
start_time = time.time()
50
51
# Simulate complex processing
52
for i in range(iterations):
53
hash_obj = hashlib.sha256(f"{data}_{i}".encode())
54
result["hash"] = hash_obj.hexdigest()
55
56
result["processing_time"] = time.time() - start_time
57
return result
58
59
@cl.cache
60
async def fetch_external_data(api_endpoint: str) -> dict:
61
"""Cache external API calls"""
62
63
# Simulate API call
64
await cl.sleep(2) # Simulate network delay
65
66
return {
67
"endpoint": api_endpoint,
68
"data": f"Response from {api_endpoint}",
69
"timestamp": time.time(),
70
"cached": False # First call won't be cached
71
}
72
73
@cl.cache
74
def process_document(file_path: str, analysis_type: str = "basic") -> dict:
75
"""Cache document processing results"""
76
77
# Simulate document processing
78
with open(file_path, 'r') as f:
79
content = f.read()
80
81
return {
82
"file_path": file_path,
83
"word_count": len(content.split()),
84
"char_count": len(content),
85
"analysis_type": analysis_type,
86
"processed_at": time.time()
87
}
88
89
@cl.on_message
90
async def demo_caching(message: cl.Message):
91
"""Demonstrate caching functionality"""
92
93
user_input = message.content
94
95
# First call - will compute and cache
96
async with cl.Step(name="Processing (may be cached)", type="tool") as step:
97
step.input = user_input
98
99
# This will be fast on subsequent calls with same input
100
result = expensive_computation(user_input, 500)
101
step.output = result
102
103
# API call - cached for same endpoint
104
api_result = await fetch_external_data("https://api.example.com/data")
105
106
await cl.Message(
107
f"Computation hash: {result['hash'][:16]}...\n"
108
f"Processing time: {result['processing_time']:.3f}s\n"
109
f"API response cached: {'Yes' if 'cached' in api_result else 'No'}"
110
).send()
111
112
# Manual cache management (if needed)
113
def clear_function_cache():
114
"""Example of manual cache clearing (implementation depends on cache system)"""
115
# This would clear all cached results
116
# Implementation varies based on caching system used
117
pass
118
```
119
120
### Async Utilities
121
122
Utilities for async/sync interoperability and execution control.
123
124
```python { .api }
125
async def make_async(func: Callable) -> Callable:
126
"""
127
Convert synchronous function to asynchronous function.
128
129
Args:
130
func: Callable - Synchronous function to convert
131
132
Returns:
133
Callable - Asynchronous version of the function
134
135
Usage:
136
Wrap sync functions for use in async contexts.
137
Useful for integrating synchronous libraries with async Chainlit code.
138
"""
139
140
def run_sync(coro: Awaitable) -> Any:
141
"""
142
Run async function in synchronous context.
143
144
Args:
145
coro: Awaitable - Async function or coroutine to execute
146
147
Returns:
148
Any - Result of the async function
149
150
Usage:
151
Execute async code from synchronous functions.
152
Creates event loop if none exists.
153
"""
154
155
async def sleep(duration: int) -> None:
156
"""
157
Async sleep function for delays in conversational flows.
158
159
Args:
160
duration: int - Sleep duration in seconds
161
162
Returns:
163
None
164
165
Usage:
166
Add delays for natural conversation pacing or processing simulation.
167
Wrapper around asyncio.sleep with Chainlit context awareness.
168
"""
169
```
170
171
Usage examples for async utilities:
172
173
```python
174
import chainlit as cl
175
import asyncio
176
from typing import Callable, Any
177
178
# Convert sync functions to async
179
def sync_heavy_computation(data: str) -> str:
180
"""Synchronous heavy computation"""
181
import time
182
time.sleep(2) # Simulate heavy work
183
return f"Processed: {data.upper()}"
184
185
@cl.on_message
186
async def use_sync_function(message: cl.Message):
187
"""Use synchronous function in async context"""
188
189
# Convert sync function to async
190
async_computation = await cl.make_async(sync_heavy_computation)
191
192
# Now can be used in async context without blocking
193
result = await async_computation(message.content)
194
195
await cl.Message(result).send()
196
197
# Run async code from sync context
198
def sync_callback_example():
199
"""Synchronous function that needs to call async code"""
200
201
async def async_operation():
202
await cl.Message("Running async operation from sync context").send()
203
await cl.sleep(1)
204
return "Operation complete"
205
206
# Run async code from sync context
207
result = cl.run_sync(async_operation())
208
print(f"Result: {result}")
209
210
# Demonstration of async utilities
211
@cl.on_message
212
async def async_utilities_demo(message: cl.Message):
213
"""Demonstrate various async utilities"""
214
215
await cl.Message("Starting async operations...").send()
216
217
# Use async sleep for pacing
218
await cl.sleep(1)
219
220
# Convert and use sync function
221
sync_func = lambda x: x.replace(" ", "_").lower()
222
async_func = await cl.make_async(sync_func)
223
processed = await async_func(message.content)
224
225
await cl.Message(f"Processed input: {processed}").send()
226
227
# Simulate processing with sleep
228
await cl.Message("Processing... (3 seconds)").send()
229
await cl.sleep(3)
230
231
await cl.Message("Processing complete!").send()
232
233
# Advanced async patterns
234
@cl.on_message
235
async def concurrent_operations(message: cl.Message):
236
"""Demonstrate concurrent async operations"""
237
238
async def operation_1():
239
await cl.sleep(2)
240
return "Operation 1 complete"
241
242
async def operation_2():
243
await cl.sleep(3)
244
return "Operation 2 complete"
245
246
async def operation_3():
247
await cl.sleep(1)
248
return "Operation 3 complete"
249
250
# Run operations concurrently
251
await cl.Message("Starting concurrent operations...").send()
252
253
results = await asyncio.gather(
254
operation_1(),
255
operation_2(),
256
operation_3()
257
)
258
259
await cl.Message(f"All operations complete: {results}").send()
260
```
261
262
### Model Context Protocol (MCP)
263
264
Integration with Model Context Protocol for enhanced AI model communication and tool use.
265
266
```python { .api }
267
@cl.on_mcp_connect
268
async def mcp_connect_handler(connection: McpConnection, session: ClientSession) -> None:
269
"""
270
Hook executed when MCP (Model Context Protocol) connection is established.
271
272
Args:
273
connection: McpConnection - MCP connection object
274
session: ClientSession - Client session for MCP communication
275
276
Signature: Callable[[McpConnection, ClientSession], Awaitable[None]]
277
278
Returns:
279
None
280
281
Usage:
282
Initialize MCP tools, configure model capabilities, setup context.
283
Called when connection to MCP server is successfully established.
284
"""
285
286
@cl.on_mcp_disconnect
287
async def mcp_disconnect_handler(connection_id: str, session: ClientSession) -> None:
288
"""
289
Hook executed when MCP connection is closed or lost.
290
291
Args:
292
connection_id: str - Identifier of the closed connection
293
session: ClientSession - Client session that was disconnected
294
295
Signature: Callable[[str, ClientSession], Awaitable[None]]
296
297
Returns:
298
None
299
300
Usage:
301
Cleanup MCP resources, handle connection failures, log disconnection events.
302
"""
303
```
304
305
Usage examples for MCP integration:
306
307
```python
308
import chainlit as cl
309
from typing import Dict, Any
310
311
# Global MCP state
312
mcp_connections = {}
313
mcp_tools = {}
314
315
@cl.on_mcp_connect
316
async def handle_mcp_connection(connection, session):
317
"""Handle MCP connection establishment"""
318
319
connection_id = connection.id
320
mcp_connections[connection_id] = connection
321
322
# Initialize available tools from MCP server
323
try:
324
# Query available tools from MCP server
325
tools_response = await session.list_tools()
326
available_tools = tools_response.get("tools", [])
327
328
mcp_tools[connection_id] = available_tools
329
330
# Log available tools
331
tool_names = [tool.get("name", "unknown") for tool in available_tools]
332
333
await cl.Message(
334
f"✅ MCP connection established!\n"
335
f"Connection ID: {connection_id}\n"
336
f"Available tools: {', '.join(tool_names)}"
337
).send()
338
339
# Store connection info in session
340
cl.user_session.set("mcp_connection_id", connection_id)
341
cl.user_session.set("mcp_tools", tool_names)
342
343
except Exception as e:
344
await cl.Message(f"❌ Error initializing MCP tools: {str(e)}").send()
345
346
@cl.on_mcp_disconnect
347
async def handle_mcp_disconnection(connection_id: str, session):
348
"""Handle MCP connection loss"""
349
350
# Clean up connection state
351
if connection_id in mcp_connections:
352
del mcp_connections[connection_id]
353
354
if connection_id in mcp_tools:
355
del mcp_tools[connection_id]
356
357
# Notify user of disconnection
358
await cl.Message(
359
f"🔌 MCP connection {connection_id} disconnected"
360
).send()
361
362
# Clear session state
363
if cl.user_session.get("mcp_connection_id") == connection_id:
364
cl.user_session.set("mcp_connection_id", None)
365
cl.user_session.set("mcp_tools", [])
366
367
@cl.on_message
368
async def use_mcp_tools(message: cl.Message):
369
"""Use MCP tools in conversation"""
370
371
connection_id = cl.user_session.get("mcp_connection_id")
372
available_tools = cl.user_session.get("mcp_tools", [])
373
374
if not connection_id or not available_tools:
375
await cl.Message("No MCP tools available. Please connect to an MCP server.").send()
376
return
377
378
user_input = message.content.lower()
379
380
# Example: Use file reading tool
381
if "read file" in user_input or "file content" in user_input:
382
if "file_read" in available_tools:
383
await use_mcp_file_tool(connection_id, message.content)
384
else:
385
await cl.Message("File reading tool not available in MCP server.").send()
386
387
# Example: Use web search tool
388
elif "search" in user_input or "lookup" in user_input:
389
if "web_search" in available_tools:
390
await use_mcp_search_tool(connection_id, message.content)
391
else:
392
await cl.Message("Web search tool not available in MCP server.").send()
393
394
else:
395
# List available tools
396
tools_text = "\n".join([f"• {tool}" for tool in available_tools])
397
await cl.Message(
398
f"Available MCP tools:\n{tools_text}\n\n"
399
f"Try asking me to 'read file' or 'search' something!"
400
).send()
401
402
async def use_mcp_file_tool(connection_id: str, user_input: str):
403
"""Use MCP file reading tool"""
404
405
connection = mcp_connections.get(connection_id)
406
if not connection:
407
return
408
409
try:
410
async with cl.Step(name="MCP File Operation", type="tool") as step:
411
step.input = user_input
412
413
# Call MCP file tool (example)
414
result = await connection.call_tool("file_read", {
415
"path": "/example/file.txt" # Extract from user input
416
})
417
418
step.output = result
419
420
await cl.Message(f"File content:\n```\n{result.get('content', 'No content')}\n```").send()
421
422
except Exception as e:
423
await cl.Message(f"MCP file operation failed: {str(e)}").send()
424
425
async def use_mcp_search_tool(connection_id: str, query: str):
426
"""Use MCP web search tool"""
427
428
connection = mcp_connections.get(connection_id)
429
if not connection:
430
return
431
432
try:
433
async with cl.Step(name="MCP Web Search", type="tool") as step:
434
step.input = query
435
436
# Call MCP search tool
437
result = await connection.call_tool("web_search", {
438
"query": query,
439
"max_results": 5
440
})
441
442
step.output = result
443
444
# Format search results
445
results = result.get("results", [])
446
if results:
447
formatted_results = "\n\n".join([
448
f"**{r.get('title', 'No title')}**\n{r.get('url', '')}\n{r.get('snippet', '')}"
449
for r in results[:3] # Show top 3 results
450
])
451
await cl.Message(f"Search Results:\n\n{formatted_results}").send()
452
else:
453
await cl.Message("No search results found.").send()
454
455
except Exception as e:
456
await cl.Message(f"MCP search failed: {str(e)}").send()
457
```
458
459
### Copilot Functions
460
461
Support for copilot-style function calling and tool integration.
462
463
```python { .api }
464
@dataclass
465
class CopilotFunction:
466
"""
467
Represents a copilot function call with name and arguments.
468
469
Fields:
470
name: str - Function name to execute
471
args: Dict[str, Any] - Function arguments and parameters
472
473
Methods:
474
acall() - Execute the function call asynchronously
475
476
Usage:
477
Represents function calls from AI models that can be executed
478
in the Chainlit environment with proper tracking and observability.
479
"""
480
name: str
481
args: Dict[str, Any]
482
483
async def acall(self) -> Any:
484
"""
485
Execute the copilot function call asynchronously.
486
487
Returns:
488
Any - Result of the function execution
489
490
Usage:
491
Execute the function with provided arguments.
492
Automatically tracked as a step in Chainlit UI.
493
"""
494
```
495
496
Usage examples for copilot functions:
497
498
```python
499
import chainlit as cl
500
from dataclasses import dataclass
501
from typing import Dict, Any
502
503
# Define available copilot functions
504
COPILOT_FUNCTIONS = {
505
"calculate": calculate_function,
506
"search_web": web_search_function,
507
"analyze_data": data_analysis_function,
508
"generate_chart": chart_generation_function
509
}
510
511
async def calculate_function(expression: str) -> Dict[str, Any]:
512
"""Calculator function for copilot"""
513
try:
514
result = eval(expression) # Use safe eval in production
515
return {
516
"result": result,
517
"expression": expression,
518
"success": True
519
}
520
except Exception as e:
521
return {
522
"error": str(e),
523
"expression": expression,
524
"success": False
525
}
526
527
async def web_search_function(query: str, max_results: int = 5) -> Dict[str, Any]:
528
"""Web search function for copilot"""
529
# Mock implementation - replace with real search
530
await cl.sleep(1) # Simulate search delay
531
532
return {
533
"query": query,
534
"results": [
535
{"title": f"Result 1 for '{query}'", "url": "https://example1.com"},
536
{"title": f"Result 2 for '{query}'", "url": "https://example2.com"}
537
],
538
"count": 2,
539
"success": True
540
}
541
542
@cl.on_message
543
async def handle_copilot_functions(message: cl.Message):
544
"""Handle messages that may contain copilot function calls"""
545
546
user_input = message.content
547
548
# Parse potential function calls (this would typically be done by an LLM)
549
if user_input.startswith("/calc "):
550
# Extract calculation expression
551
expression = user_input[6:].strip()
552
553
# Create copilot function
554
calc_function = cl.CopilotFunction(
555
name="calculate",
556
args={"expression": expression}
557
)
558
559
# Execute function
560
result = await execute_copilot_function(calc_function)
561
562
if result.get("success"):
563
await cl.Message(f"Calculation result: {result['result']}").send()
564
else:
565
await cl.Message(f"Calculation error: {result['error']}").send()
566
567
elif user_input.startswith("/search "):
568
# Extract search query
569
query = user_input[8:].strip()
570
571
search_function = cl.CopilotFunction(
572
name="search_web",
573
args={"query": query, "max_results": 3}
574
)
575
576
result = await execute_copilot_function(search_function)
577
578
if result.get("success"):
579
results_text = "\n".join([
580
f"• {r['title']}: {r['url']}"
581
for r in result['results']
582
])
583
await cl.Message(f"Search results for '{query}':\n\n{results_text}").send()
584
585
else:
586
await cl.Message(
587
"Try these copilot functions:\n"
588
"• `/calc 2 + 3 * 4` - Calculate expressions\n"
589
"• `/search python tutorial` - Search the web\n"
590
).send()
591
592
async def execute_copilot_function(copilot_func: cl.CopilotFunction) -> Any:
593
"""Execute a copilot function with step tracking"""
594
595
function_impl = COPILOT_FUNCTIONS.get(copilot_func.name)
596
597
if not function_impl:
598
return {"error": f"Function '{copilot_func.name}' not available", "success": False}
599
600
async with cl.Step(name=f"Copilot: {copilot_func.name}", type="tool") as step:
601
step.input = copilot_func.args
602
603
try:
604
# Execute the copilot function
605
result = await copilot_func.acall()
606
step.output = result
607
return result
608
609
except Exception as e:
610
error_result = {"error": str(e), "success": False}
611
step.output = error_result
612
return error_result
613
614
# Alternative: LLM-driven copilot function detection
615
@cl.on_message
616
async def llm_copilot_integration(message: cl.Message):
617
"""Integrate copilot functions with LLM function calling"""
618
619
# Define function schemas for LLM
620
function_schemas = [
621
{
622
"name": "calculate",
623
"description": "Perform mathematical calculations",
624
"parameters": {
625
"type": "object",
626
"properties": {
627
"expression": {"type": "string", "description": "Mathematical expression"}
628
},
629
"required": ["expression"]
630
}
631
},
632
{
633
"name": "search_web",
634
"description": "Search the web for information",
635
"parameters": {
636
"type": "object",
637
"properties": {
638
"query": {"type": "string", "description": "Search query"},
639
"max_results": {"type": "integer", "description": "Maximum results", "default": 5}
640
},
641
"required": ["query"]
642
}
643
}
644
]
645
646
# Use LLM to determine if function calls are needed
647
# (This would integrate with your preferred LLM service)
648
649
# Example: Mock function call detection
650
if "calculate" in message.content.lower():
651
# Extract expression (in real implementation, LLM would do this)
652
import re
653
expr_match = re.search(r'calculate\s+(.+)', message.content, re.IGNORECASE)
654
if expr_match:
655
expression = expr_match.group(1).strip()
656
657
copilot_func = cl.CopilotFunction(
658
name="calculate",
659
args={"expression": expression}
660
)
661
662
result = await execute_copilot_function(copilot_func)
663
await cl.Message(f"Calculation: {expression} = {result.get('result', 'Error')}").send()
664
```
665
666
### Server and Configuration Features
667
668
Advanced server configuration, data layer management, and production deployment features.
669
670
```python { .api }
671
@cl.data_layer
672
def configure_data_layer() -> BaseDataLayer:
673
"""
674
Configure custom data layer for persistence and data management.
675
676
Signature: Callable[[], BaseDataLayer]
677
678
Returns:
679
BaseDataLayer - Custom data layer implementation
680
681
Usage:
682
Configure database connections, file storage, caching layers.
683
Customize how Chainlit stores and retrieves conversation data.
684
"""
685
```
686
687
Usage examples for server configuration:
688
689
```python
690
import chainlit as cl
691
from typing import Any, Dict
692
693
# Custom data layer implementation
694
class CustomDataLayer:
695
"""Custom data layer for production deployment"""
696
697
def __init__(self, database_url: str, redis_url: str):
698
self.database_url = database_url
699
self.redis_url = redis_url
700
# Initialize connections
701
702
async def create_user(self, user_data: Dict[str, Any]) -> str:
703
"""Create user in database"""
704
# Implementation for user creation
705
pass
706
707
async def get_user(self, user_id: str) -> Dict[str, Any]:
708
"""Retrieve user from database"""
709
# Implementation for user retrieval
710
pass
711
712
async def store_message(self, message_data: Dict[str, Any]) -> str:
713
"""Store message in database"""
714
# Implementation for message storage
715
pass
716
717
@cl.data_layer
718
def setup_production_data_layer():
719
"""Configure production data layer"""
720
import os
721
722
database_url = os.getenv("DATABASE_URL")
723
redis_url = os.getenv("REDIS_URL")
724
725
if not database_url:
726
raise ValueError("DATABASE_URL environment variable required")
727
728
return CustomDataLayer(database_url, redis_url)
729
730
# Production configuration
731
@cl.on_app_startup
732
async def production_startup():
733
"""Production application startup configuration"""
734
import os
735
import logging
736
737
# Configure logging
738
logging.basicConfig(
739
level=logging.INFO,
740
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
741
)
742
743
# Initialize external services
744
await init_database_pool()
745
await init_redis_cache()
746
await init_monitoring_services()
747
748
# Load configuration from environment
749
config = {
750
"max_concurrent_users": int(os.getenv("MAX_CONCURRENT_USERS", "100")),
751
"rate_limit": int(os.getenv("RATE_LIMIT", "60")),
752
"session_timeout": int(os.getenv("SESSION_TIMEOUT", "3600")),
753
}
754
755
cl.user_session.set("app_config", config)
756
757
logging.info("Production application startup complete")
758
759
@cl.on_app_shutdown
760
async def production_shutdown():
761
"""Production application shutdown"""
762
import logging
763
764
# Cleanup resources
765
await cleanup_database_connections()
766
await cleanup_redis_connections()
767
await save_application_state()
768
769
logging.info("Production application shutdown complete")
770
771
# Health check and monitoring
772
async def health_check() -> Dict[str, Any]:
773
"""Application health check for monitoring"""
774
775
try:
776
# Check database connectivity
777
db_status = await check_database_health()
778
779
# Check Redis connectivity
780
cache_status = await check_redis_health()
781
782
# Check external service dependencies
783
external_services = await check_external_services()
784
785
return {
786
"status": "healthy",
787
"timestamp": time.time(),
788
"checks": {
789
"database": db_status,
790
"cache": cache_status,
791
"external_services": external_services
792
}
793
}
794
795
except Exception as e:
796
return {
797
"status": "unhealthy",
798
"error": str(e),
799
"timestamp": time.time()
800
}
801
802
# Configuration management
803
class AppConfig:
804
"""Application configuration management"""
805
806
def __init__(self):
807
self.load_config()
808
809
def load_config(self):
810
"""Load configuration from environment and files"""
811
import os
812
813
self.debug = os.getenv("DEBUG", "false").lower() == "true"
814
self.environment = os.getenv("ENVIRONMENT", "development")
815
self.secret_key = os.getenv("SECRET_KEY", "dev-secret")
816
817
# Load feature flags
818
self.features = {
819
"oauth_enabled": os.getenv("OAUTH_ENABLED", "true").lower() == "true",
820
"mcp_enabled": os.getenv("MCP_ENABLED", "false").lower() == "true",
821
"analytics_enabled": os.getenv("ANALYTICS_ENABLED", "true").lower() == "true"
822
}
823
824
def get_database_config(self):
825
"""Get database configuration"""
826
import os
827
return {
828
"url": os.getenv("DATABASE_URL"),
829
"pool_size": int(os.getenv("DB_POOL_SIZE", "10")),
830
"max_overflow": int(os.getenv("DB_MAX_OVERFLOW", "20"))
831
}
832
833
# Initialize global configuration
834
app_config = AppConfig()
835
```
836
837
## Core Types
838
839
```python { .api }
840
from typing import Any, Dict, Callable, Awaitable
841
from dataclasses import dataclass
842
843
# MCP (Model Context Protocol) types
844
class McpConnection:
845
"""MCP connection object"""
846
id: str
847
# Additional connection properties
848
849
class ClientSession:
850
"""MCP client session"""
851
# Session methods and properties
852
853
# Copilot function type
854
@dataclass
855
class CopilotFunction:
856
name: str
857
args: Dict[str, Any]
858
859
async def acall(self) -> Any: ...
860
861
# Data layer interface
862
class BaseDataLayer:
863
"""Base interface for custom data layers"""
864
865
async def create_user(self, user_data: Dict) -> str: ...
866
async def get_user(self, user_id: str) -> Dict: ...
867
async def store_message(self, message_data: Dict) -> str: ...
868
# Additional data layer methods
869
870
# Configuration types
871
AppConfiguration = Dict[str, Any]
872
FeatureFlags = Dict[str, bool]
873
HealthStatus = Dict[str, Any]
874
875
# Utility function types
876
AsyncUtilityFunction = Callable[..., Awaitable[Any]]
877
SyncToAsyncWrapper = Callable[[Callable], Awaitable[Callable]]
878
```