or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mdauthentication.mdcallbacks.mdindex.mdinput-widgets.mdintegrations.mdmessaging.mdui-elements.mduser-management.md

advanced.mddocs/

0

# Advanced Features

1

2

Advanced functionality including caching, async utilities, Model Context Protocol (MCP) support, server configuration, and production deployment features. These components enable building sophisticated, scalable conversational AI applications.

3

4

## Capabilities

5

6

### Caching

7

8

Function result caching for improved performance and reduced API calls.

9

10

```python { .api }

11

import chainlit as cl

12

13

@cl.cache

14

def expensive_function(arg: str) -> str:

15

"""

16

Simple function result caching decorator for performance optimization.

17

18

Usage:

19

Apply to functions that perform expensive operations or API calls.

20

Results are cached based on function arguments.

21

Cache persists for the duration of the application session.

22

23

Args:

24

Function arguments are used as cache keys

25

26

Returns:

27

Cached function results

28

29

Note:

30

Cache is argument-based and not persistent across application restarts.

31

Use for computationally expensive operations, API calls, or data processing.

32

"""

33

```

34

35

Usage examples for caching:

36

37

```python

38

import chainlit as cl

39

import time

40

import hashlib

41

42

@cl.cache

43

def expensive_computation(data: str, iterations: int = 1000) -> dict:

44

"""Expensive computation with caching"""

45

46

# Simulate expensive computation

47

result = {"input": data, "hash": None, "processing_time": 0}

48

49

start_time = time.time()

50

51

# Simulate complex processing

52

for i in range(iterations):

53

hash_obj = hashlib.sha256(f"{data}_{i}".encode())

54

result["hash"] = hash_obj.hexdigest()

55

56

result["processing_time"] = time.time() - start_time

57

return result

58

59

@cl.cache

60

async def fetch_external_data(api_endpoint: str) -> dict:

61

"""Cache external API calls"""

62

63

# Simulate API call

64

await cl.sleep(2) # Simulate network delay

65

66

return {

67

"endpoint": api_endpoint,

68

"data": f"Response from {api_endpoint}",

69

"timestamp": time.time(),

70

"cached": False # First call won't be cached

71

}

72

73

@cl.cache

74

def process_document(file_path: str, analysis_type: str = "basic") -> dict:

75

"""Cache document processing results"""

76

77

# Simulate document processing

78

with open(file_path, 'r') as f:

79

content = f.read()

80

81

return {

82

"file_path": file_path,

83

"word_count": len(content.split()),

84

"char_count": len(content),

85

"analysis_type": analysis_type,

86

"processed_at": time.time()

87

}

88

89

@cl.on_message

90

async def demo_caching(message: cl.Message):

91

"""Demonstrate caching functionality"""

92

93

user_input = message.content

94

95

# First call - will compute and cache

96

async with cl.Step(name="Processing (may be cached)", type="tool") as step:

97

step.input = user_input

98

99

# This will be fast on subsequent calls with same input

100

result = expensive_computation(user_input, 500)

101

step.output = result

102

103

# API call - cached for same endpoint

104

api_result = await fetch_external_data("https://api.example.com/data")

105

106

await cl.Message(

107

f"Computation hash: {result['hash'][:16]}...\n"

108

f"Processing time: {result['processing_time']:.3f}s\n"

109

f"API response cached: {'Yes' if 'cached' in api_result else 'No'}"

110

).send()

111

112

# Manual cache management (if needed)

113

def clear_function_cache():

114

"""Example of manual cache clearing (implementation depends on cache system)"""

115

# This would clear all cached results

116

# Implementation varies based on caching system used

117

pass

118

```

119

120

### Async Utilities

121

122

Utilities for async/sync interoperability and execution control.

123

124

```python { .api }

125

async def make_async(func: Callable) -> Callable:

126

"""

127

Convert synchronous function to asynchronous function.

128

129

Args:

130

func: Callable - Synchronous function to convert

131

132

Returns:

133

Callable - Asynchronous version of the function

134

135

Usage:

136

Wrap sync functions for use in async contexts.

137

Useful for integrating synchronous libraries with async Chainlit code.

138

"""

139

140

def run_sync(coro: Awaitable) -> Any:

141

"""

142

Run async function in synchronous context.

143

144

Args:

145

coro: Awaitable - Async function or coroutine to execute

146

147

Returns:

148

Any - Result of the async function

149

150

Usage:

151

Execute async code from synchronous functions.

152

Creates event loop if none exists.

153

"""

154

155

async def sleep(duration: int) -> None:

156

"""

157

Async sleep function for delays in conversational flows.

158

159

Args:

160

duration: int - Sleep duration in seconds

161

162

Returns:

163

None

164

165

Usage:

166

Add delays for natural conversation pacing or processing simulation.

167

Wrapper around asyncio.sleep with Chainlit context awareness.

168

"""

169

```

170

171

Usage examples for async utilities:

172

173

```python

174

import chainlit as cl

175

import asyncio

176

from typing import Callable, Any

177

178

# Convert sync functions to async

179

def sync_heavy_computation(data: str) -> str:

180

"""Synchronous heavy computation"""

181

import time

182

time.sleep(2) # Simulate heavy work

183

return f"Processed: {data.upper()}"

184

185

@cl.on_message

186

async def use_sync_function(message: cl.Message):

187

"""Use synchronous function in async context"""

188

189

# Convert sync function to async

190

async_computation = await cl.make_async(sync_heavy_computation)

191

192

# Now can be used in async context without blocking

193

result = await async_computation(message.content)

194

195

await cl.Message(result).send()

196

197

# Run async code from sync context

198

def sync_callback_example():

199

"""Synchronous function that needs to call async code"""

200

201

async def async_operation():

202

await cl.Message("Running async operation from sync context").send()

203

await cl.sleep(1)

204

return "Operation complete"

205

206

# Run async code from sync context

207

result = cl.run_sync(async_operation())

208

print(f"Result: {result}")

209

210

# Demonstration of async utilities

211

@cl.on_message

212

async def async_utilities_demo(message: cl.Message):

213

"""Demonstrate various async utilities"""

214

215

await cl.Message("Starting async operations...").send()

216

217

# Use async sleep for pacing

218

await cl.sleep(1)

219

220

# Convert and use sync function

221

sync_func = lambda x: x.replace(" ", "_").lower()

222

async_func = await cl.make_async(sync_func)

223

processed = await async_func(message.content)

224

225

await cl.Message(f"Processed input: {processed}").send()

226

227

# Simulate processing with sleep

228

await cl.Message("Processing... (3 seconds)").send()

229

await cl.sleep(3)

230

231

await cl.Message("Processing complete!").send()

232

233

# Advanced async patterns

234

@cl.on_message

235

async def concurrent_operations(message: cl.Message):

236

"""Demonstrate concurrent async operations"""

237

238

async def operation_1():

239

await cl.sleep(2)

240

return "Operation 1 complete"

241

242

async def operation_2():

243

await cl.sleep(3)

244

return "Operation 2 complete"

245

246

async def operation_3():

247

await cl.sleep(1)

248

return "Operation 3 complete"

249

250

# Run operations concurrently

251

await cl.Message("Starting concurrent operations...").send()

252

253

results = await asyncio.gather(

254

operation_1(),

255

operation_2(),

256

operation_3()

257

)

258

259

await cl.Message(f"All operations complete: {results}").send()

260

```

261

262

### Model Context Protocol (MCP)

263

264

Integration with Model Context Protocol for enhanced AI model communication and tool use.

265

266

```python { .api }

267

@cl.on_mcp_connect

268

async def mcp_connect_handler(connection: McpConnection, session: ClientSession) -> None:

269

"""

270

Hook executed when MCP (Model Context Protocol) connection is established.

271

272

Args:

273

connection: McpConnection - MCP connection object

274

session: ClientSession - Client session for MCP communication

275

276

Signature: Callable[[McpConnection, ClientSession], Awaitable[None]]

277

278

Returns:

279

None

280

281

Usage:

282

Initialize MCP tools, configure model capabilities, setup context.

283

Called when connection to MCP server is successfully established.

284

"""

285

286

@cl.on_mcp_disconnect

287

async def mcp_disconnect_handler(connection_id: str, session: ClientSession) -> None:

288

"""

289

Hook executed when MCP connection is closed or lost.

290

291

Args:

292

connection_id: str - Identifier of the closed connection

293

session: ClientSession - Client session that was disconnected

294

295

Signature: Callable[[str, ClientSession], Awaitable[None]]

296

297

Returns:

298

None

299

300

Usage:

301

Cleanup MCP resources, handle connection failures, log disconnection events.

302

"""

303

```

304

305

Usage examples for MCP integration:

306

307

```python

308

import chainlit as cl

309

from typing import Dict, Any

310

311

# Global MCP state

312

mcp_connections = {}

313

mcp_tools = {}

314

315

@cl.on_mcp_connect

316

async def handle_mcp_connection(connection, session):

317

"""Handle MCP connection establishment"""

318

319

connection_id = connection.id

320

mcp_connections[connection_id] = connection

321

322

# Initialize available tools from MCP server

323

try:

324

# Query available tools from MCP server

325

tools_response = await session.list_tools()

326

available_tools = tools_response.get("tools", [])

327

328

mcp_tools[connection_id] = available_tools

329

330

# Log available tools

331

tool_names = [tool.get("name", "unknown") for tool in available_tools]

332

333

await cl.Message(

334

f"✅ MCP connection established!\n"

335

f"Connection ID: {connection_id}\n"

336

f"Available tools: {', '.join(tool_names)}"

337

).send()

338

339

# Store connection info in session

340

cl.user_session.set("mcp_connection_id", connection_id)

341

cl.user_session.set("mcp_tools", tool_names)

342

343

except Exception as e:

344

await cl.Message(f"❌ Error initializing MCP tools: {str(e)}").send()

345

346

@cl.on_mcp_disconnect

347

async def handle_mcp_disconnection(connection_id: str, session):

348

"""Handle MCP connection loss"""

349

350

# Clean up connection state

351

if connection_id in mcp_connections:

352

del mcp_connections[connection_id]

353

354

if connection_id in mcp_tools:

355

del mcp_tools[connection_id]

356

357

# Notify user of disconnection

358

await cl.Message(

359

f"🔌 MCP connection {connection_id} disconnected"

360

).send()

361

362

# Clear session state

363

if cl.user_session.get("mcp_connection_id") == connection_id:

364

cl.user_session.set("mcp_connection_id", None)

365

cl.user_session.set("mcp_tools", [])

366

367

@cl.on_message

368

async def use_mcp_tools(message: cl.Message):

369

"""Use MCP tools in conversation"""

370

371

connection_id = cl.user_session.get("mcp_connection_id")

372

available_tools = cl.user_session.get("mcp_tools", [])

373

374

if not connection_id or not available_tools:

375

await cl.Message("No MCP tools available. Please connect to an MCP server.").send()

376

return

377

378

user_input = message.content.lower()

379

380

# Example: Use file reading tool

381

if "read file" in user_input or "file content" in user_input:

382

if "file_read" in available_tools:

383

await use_mcp_file_tool(connection_id, message.content)

384

else:

385

await cl.Message("File reading tool not available in MCP server.").send()

386

387

# Example: Use web search tool

388

elif "search" in user_input or "lookup" in user_input:

389

if "web_search" in available_tools:

390

await use_mcp_search_tool(connection_id, message.content)

391

else:

392

await cl.Message("Web search tool not available in MCP server.").send()

393

394

else:

395

# List available tools

396

tools_text = "\n".join([f"• {tool}" for tool in available_tools])

397

await cl.Message(

398

f"Available MCP tools:\n{tools_text}\n\n"

399

f"Try asking me to 'read file' or 'search' something!"

400

).send()

401

402

async def use_mcp_file_tool(connection_id: str, user_input: str):

403

"""Use MCP file reading tool"""

404

405

connection = mcp_connections.get(connection_id)

406

if not connection:

407

return

408

409

try:

410

async with cl.Step(name="MCP File Operation", type="tool") as step:

411

step.input = user_input

412

413

# Call MCP file tool (example)

414

result = await connection.call_tool("file_read", {

415

"path": "/example/file.txt" # Extract from user input

416

})

417

418

step.output = result

419

420

await cl.Message(f"File content:\n```\n{result.get('content', 'No content')}\n```").send()

421

422

except Exception as e:

423

await cl.Message(f"MCP file operation failed: {str(e)}").send()

424

425

async def use_mcp_search_tool(connection_id: str, query: str):

426

"""Use MCP web search tool"""

427

428

connection = mcp_connections.get(connection_id)

429

if not connection:

430

return

431

432

try:

433

async with cl.Step(name="MCP Web Search", type="tool") as step:

434

step.input = query

435

436

# Call MCP search tool

437

result = await connection.call_tool("web_search", {

438

"query": query,

439

"max_results": 5

440

})

441

442

step.output = result

443

444

# Format search results

445

results = result.get("results", [])

446

if results:

447

formatted_results = "\n\n".join([

448

f"**{r.get('title', 'No title')}**\n{r.get('url', '')}\n{r.get('snippet', '')}"

449

for r in results[:3] # Show top 3 results

450

])

451

await cl.Message(f"Search Results:\n\n{formatted_results}").send()

452

else:

453

await cl.Message("No search results found.").send()

454

455

except Exception as e:

456

await cl.Message(f"MCP search failed: {str(e)}").send()

457

```

458

459

### Copilot Functions

460

461

Support for copilot-style function calling and tool integration.

462

463

```python { .api }

464

@dataclass

465

class CopilotFunction:

466

"""

467

Represents a copilot function call with name and arguments.

468

469

Fields:

470

name: str - Function name to execute

471

args: Dict[str, Any] - Function arguments and parameters

472

473

Methods:

474

acall() - Execute the function call asynchronously

475

476

Usage:

477

Represents function calls from AI models that can be executed

478

in the Chainlit environment with proper tracking and observability.

479

"""

480

name: str

481

args: Dict[str, Any]

482

483

async def acall(self) -> Any:

484

"""

485

Execute the copilot function call asynchronously.

486

487

Returns:

488

Any - Result of the function execution

489

490

Usage:

491

Execute the function with provided arguments.

492

Automatically tracked as a step in Chainlit UI.

493

"""

494

```

495

496

Usage examples for copilot functions:

497

498

```python

499

import chainlit as cl

500

from dataclasses import dataclass

501

from typing import Dict, Any

502

503

# Define available copilot functions

504

COPILOT_FUNCTIONS = {

505

"calculate": calculate_function,

506

"search_web": web_search_function,

507

"analyze_data": data_analysis_function,

508

"generate_chart": chart_generation_function

509

}

510

511

async def calculate_function(expression: str) -> Dict[str, Any]:

512

"""Calculator function for copilot"""

513

try:

514

result = eval(expression) # Use safe eval in production

515

return {

516

"result": result,

517

"expression": expression,

518

"success": True

519

}

520

except Exception as e:

521

return {

522

"error": str(e),

523

"expression": expression,

524

"success": False

525

}

526

527

async def web_search_function(query: str, max_results: int = 5) -> Dict[str, Any]:

528

"""Web search function for copilot"""

529

# Mock implementation - replace with real search

530

await cl.sleep(1) # Simulate search delay

531

532

return {

533

"query": query,

534

"results": [

535

{"title": f"Result 1 for '{query}'", "url": "https://example1.com"},

536

{"title": f"Result 2 for '{query}'", "url": "https://example2.com"}

537

],

538

"count": 2,

539

"success": True

540

}

541

542

@cl.on_message

543

async def handle_copilot_functions(message: cl.Message):

544

"""Handle messages that may contain copilot function calls"""

545

546

user_input = message.content

547

548

# Parse potential function calls (this would typically be done by an LLM)

549

if user_input.startswith("/calc "):

550

# Extract calculation expression

551

expression = user_input[6:].strip()

552

553

# Create copilot function

554

calc_function = cl.CopilotFunction(

555

name="calculate",

556

args={"expression": expression}

557

)

558

559

# Execute function

560

result = await execute_copilot_function(calc_function)

561

562

if result.get("success"):

563

await cl.Message(f"Calculation result: {result['result']}").send()

564

else:

565

await cl.Message(f"Calculation error: {result['error']}").send()

566

567

elif user_input.startswith("/search "):

568

# Extract search query

569

query = user_input[8:].strip()

570

571

search_function = cl.CopilotFunction(

572

name="search_web",

573

args={"query": query, "max_results": 3}

574

)

575

576

result = await execute_copilot_function(search_function)

577

578

if result.get("success"):

579

results_text = "\n".join([

580

f"• {r['title']}: {r['url']}"

581

for r in result['results']

582

])

583

await cl.Message(f"Search results for '{query}':\n\n{results_text}").send()

584

585

else:

586

await cl.Message(

587

"Try these copilot functions:\n"

588

"• `/calc 2 + 3 * 4` - Calculate expressions\n"

589

"• `/search python tutorial` - Search the web\n"

590

).send()

591

592

async def execute_copilot_function(copilot_func: cl.CopilotFunction) -> Any:

593

"""Execute a copilot function with step tracking"""

594

595

function_impl = COPILOT_FUNCTIONS.get(copilot_func.name)

596

597

if not function_impl:

598

return {"error": f"Function '{copilot_func.name}' not available", "success": False}

599

600

async with cl.Step(name=f"Copilot: {copilot_func.name}", type="tool") as step:

601

step.input = copilot_func.args

602

603

try:

604

# Execute the copilot function

605

result = await copilot_func.acall()

606

step.output = result

607

return result

608

609

except Exception as e:

610

error_result = {"error": str(e), "success": False}

611

step.output = error_result

612

return error_result

613

614

# Alternative: LLM-driven copilot function detection

615

@cl.on_message

616

async def llm_copilot_integration(message: cl.Message):

617

"""Integrate copilot functions with LLM function calling"""

618

619

# Define function schemas for LLM

620

function_schemas = [

621

{

622

"name": "calculate",

623

"description": "Perform mathematical calculations",

624

"parameters": {

625

"type": "object",

626

"properties": {

627

"expression": {"type": "string", "description": "Mathematical expression"}

628

},

629

"required": ["expression"]

630

}

631

},

632

{

633

"name": "search_web",

634

"description": "Search the web for information",

635

"parameters": {

636

"type": "object",

637

"properties": {

638

"query": {"type": "string", "description": "Search query"},

639

"max_results": {"type": "integer", "description": "Maximum results", "default": 5}

640

},

641

"required": ["query"]

642

}

643

}

644

]

645

646

# Use LLM to determine if function calls are needed

647

# (This would integrate with your preferred LLM service)

648

649

# Example: Mock function call detection

650

if "calculate" in message.content.lower():

651

# Extract expression (in real implementation, LLM would do this)

652

import re

653

expr_match = re.search(r'calculate\s+(.+)', message.content, re.IGNORECASE)

654

if expr_match:

655

expression = expr_match.group(1).strip()

656

657

copilot_func = cl.CopilotFunction(

658

name="calculate",

659

args={"expression": expression}

660

)

661

662

result = await execute_copilot_function(copilot_func)

663

await cl.Message(f"Calculation: {expression} = {result.get('result', 'Error')}").send()

664

```

665

666

### Server and Configuration Features

667

668

Advanced server configuration, data layer management, and production deployment features.

669

670

```python { .api }

671

@cl.data_layer

672

def configure_data_layer() -> BaseDataLayer:

673

"""

674

Configure custom data layer for persistence and data management.

675

676

Signature: Callable[[], BaseDataLayer]

677

678

Returns:

679

BaseDataLayer - Custom data layer implementation

680

681

Usage:

682

Configure database connections, file storage, caching layers.

683

Customize how Chainlit stores and retrieves conversation data.

684

"""

685

```

686

687

Usage examples for server configuration:

688

689

```python

690

import chainlit as cl

691

from typing import Any, Dict

692

693

# Custom data layer implementation

694

class CustomDataLayer:

695

"""Custom data layer for production deployment"""

696

697

def __init__(self, database_url: str, redis_url: str):

698

self.database_url = database_url

699

self.redis_url = redis_url

700

# Initialize connections

701

702

async def create_user(self, user_data: Dict[str, Any]) -> str:

703

"""Create user in database"""

704

# Implementation for user creation

705

pass

706

707

async def get_user(self, user_id: str) -> Dict[str, Any]:

708

"""Retrieve user from database"""

709

# Implementation for user retrieval

710

pass

711

712

async def store_message(self, message_data: Dict[str, Any]) -> str:

713

"""Store message in database"""

714

# Implementation for message storage

715

pass

716

717

@cl.data_layer

718

def setup_production_data_layer():

719

"""Configure production data layer"""

720

import os

721

722

database_url = os.getenv("DATABASE_URL")

723

redis_url = os.getenv("REDIS_URL")

724

725

if not database_url:

726

raise ValueError("DATABASE_URL environment variable required")

727

728

return CustomDataLayer(database_url, redis_url)

729

730

# Production configuration

731

@cl.on_app_startup

732

async def production_startup():

733

"""Production application startup configuration"""

734

import os

735

import logging

736

737

# Configure logging

738

logging.basicConfig(

739

level=logging.INFO,

740

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

741

)

742

743

# Initialize external services

744

await init_database_pool()

745

await init_redis_cache()

746

await init_monitoring_services()

747

748

# Load configuration from environment

749

config = {

750

"max_concurrent_users": int(os.getenv("MAX_CONCURRENT_USERS", "100")),

751

"rate_limit": int(os.getenv("RATE_LIMIT", "60")),

752

"session_timeout": int(os.getenv("SESSION_TIMEOUT", "3600")),

753

}

754

755

cl.user_session.set("app_config", config)

756

757

logging.info("Production application startup complete")

758

759

@cl.on_app_shutdown

760

async def production_shutdown():

761

"""Production application shutdown"""

762

import logging

763

764

# Cleanup resources

765

await cleanup_database_connections()

766

await cleanup_redis_connections()

767

await save_application_state()

768

769

logging.info("Production application shutdown complete")

770

771

# Health check and monitoring

772

async def health_check() -> Dict[str, Any]:

773

"""Application health check for monitoring"""

774

775

try:

776

# Check database connectivity

777

db_status = await check_database_health()

778

779

# Check Redis connectivity

780

cache_status = await check_redis_health()

781

782

# Check external service dependencies

783

external_services = await check_external_services()

784

785

return {

786

"status": "healthy",

787

"timestamp": time.time(),

788

"checks": {

789

"database": db_status,

790

"cache": cache_status,

791

"external_services": external_services

792

}

793

}

794

795

except Exception as e:

796

return {

797

"status": "unhealthy",

798

"error": str(e),

799

"timestamp": time.time()

800

}

801

802

# Configuration management

803

class AppConfig:

804

"""Application configuration management"""

805

806

def __init__(self):

807

self.load_config()

808

809

def load_config(self):

810

"""Load configuration from environment and files"""

811

import os

812

813

self.debug = os.getenv("DEBUG", "false").lower() == "true"

814

self.environment = os.getenv("ENVIRONMENT", "development")

815

self.secret_key = os.getenv("SECRET_KEY", "dev-secret")

816

817

# Load feature flags

818

self.features = {

819

"oauth_enabled": os.getenv("OAUTH_ENABLED", "true").lower() == "true",

820

"mcp_enabled": os.getenv("MCP_ENABLED", "false").lower() == "true",

821

"analytics_enabled": os.getenv("ANALYTICS_ENABLED", "true").lower() == "true"

822

}

823

824

def get_database_config(self):

825

"""Get database configuration"""

826

import os

827

return {

828

"url": os.getenv("DATABASE_URL"),

829

"pool_size": int(os.getenv("DB_POOL_SIZE", "10")),

830

"max_overflow": int(os.getenv("DB_MAX_OVERFLOW", "20"))

831

}

832

833

# Initialize global configuration

834

app_config = AppConfig()

835

```

836

837

## Core Types

838

839

```python { .api }

840

from typing import Any, Dict, Callable, Awaitable

841

from dataclasses import dataclass

842

843

# MCP (Model Context Protocol) types

844

class McpConnection:

845

"""MCP connection object"""

846

id: str

847

# Additional connection properties

848

849

class ClientSession:

850

"""MCP client session"""

851

# Session methods and properties

852

853

# Copilot function type

854

@dataclass

855

class CopilotFunction:

856

name: str

857

args: Dict[str, Any]

858

859

async def acall(self) -> Any: ...

860

861

# Data layer interface

862

class BaseDataLayer:

863

"""Base interface for custom data layers"""

864

865

async def create_user(self, user_data: Dict) -> str: ...

866

async def get_user(self, user_id: str) -> Dict: ...

867

async def store_message(self, message_data: Dict) -> str: ...

868

# Additional data layer methods

869

870

# Configuration types

871

AppConfiguration = Dict[str, Any]

872

FeatureFlags = Dict[str, bool]

873

HealthStatus = Dict[str, Any]

874

875

# Utility function types

876

AsyncUtilityFunction = Callable[..., Awaitable[Any]]

877

SyncToAsyncWrapper = Callable[[Callable], Awaitable[Callable]]

878

```