or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mdauthentication.mdcallbacks.mdindex.mdinput-widgets.mdintegrations.mdmessaging.mdui-elements.mduser-management.md

authentication.mddocs/

0

# Authentication

1

2

Secure user authentication and authorization with support for password authentication, header-based auth, OAuth providers, and custom authentication workflows. These components enable building secure conversational applications with user management and access control.

3

4

## Capabilities

5

6

### Password Authentication

7

8

Username and password-based authentication with custom validation logic.

9

10

```python { .api }

11

import chainlit as cl

12

from typing import Optional

13

14

@cl.password_auth_callback

15

async def password_auth(username: str, password: str) -> Optional[cl.User]:

16

"""

17

Authentication callback for username/password login.

18

Implement custom validation logic for user credentials.

19

20

Args:

21

username: str - Username or email provided by user

22

password: str - Password provided by user

23

24

Signature: Callable[[str, str], Awaitable[Optional[User]]]

25

26

Returns:

27

Optional[cl.User] - User object if authentication succeeds, None if fails

28

29

Usage:

30

Return a User object for successful authentication, None for failure.

31

The User object will be stored in the session for subsequent requests.

32

"""

33

```

34

35

Usage examples for password authentication:

36

37

```python

38

import chainlit as cl

39

import bcrypt

40

from typing import Optional

41

42

# Mock user database (replace with real database in production)

43

USERS_DB = {

44

"alice@example.com": {

45

"password_hash": "$2b$12$...", # bcrypt hash

46

"display_name": "Alice Smith",

47

"role": "admin",

48

"metadata": {"department": "engineering"}

49

},

50

"bob@example.com": {

51

"password_hash": "$2b$12$...",

52

"display_name": "Bob Johnson",

53

"role": "user",

54

"metadata": {"department": "sales"}

55

}

56

}

57

58

@cl.password_auth_callback

59

async def authenticate_user(username: str, password: str) -> Optional[cl.User]:

60

"""Authenticate user with username/password"""

61

62

# Look up user in database

63

user_data = USERS_DB.get(username.lower())

64

if not user_data:

65

# User not found

66

return None

67

68

# Verify password using bcrypt

69

password_bytes = password.encode('utf-8')

70

hash_bytes = user_data["password_hash"].encode('utf-8')

71

72

if not bcrypt.checkpw(password_bytes, hash_bytes):

73

# Invalid password

74

return None

75

76

# Authentication successful - create User object

77

return cl.User(

78

identifier=username,

79

display_name=user_data["display_name"],

80

metadata={

81

"role": user_data["role"],

82

**user_data["metadata"],

83

"last_login": datetime.now().isoformat()

84

}

85

)

86

87

# Alternative: Simple authentication for development/testing

88

@cl.password_auth_callback

89

async def simple_auth(username: str, password: str) -> Optional[cl.User]:

90

"""Simple authentication for development"""

91

92

# Simple hardcoded authentication (NOT for production)

93

valid_users = {

94

"admin": "secret123",

95

"user": "password",

96

"demo": "demo"

97

}

98

99

if username in valid_users and valid_users[username] == password:

100

return cl.User(

101

identifier=username,

102

display_name=username.title(),

103

metadata={"auth_method": "simple"}

104

)

105

106

return None

107

108

# Database-backed authentication example

109

async def database_auth_example(username: str, password: str) -> Optional[cl.User]:

110

"""Database-backed authentication with async database calls"""

111

112

try:

113

# Query database for user (example with async database client)

114

user_record = await db_client.fetch_user_by_email(username)

115

116

if not user_record:

117

return None

118

119

# Verify password hash

120

if not verify_password(password, user_record.password_hash):

121

return None

122

123

# Check if account is active

124

if not user_record.is_active:

125

return None

126

127

# Update last login timestamp

128

await db_client.update_last_login(user_record.id)

129

130

# Create Chainlit User object

131

return cl.User(

132

identifier=user_record.email,

133

display_name=user_record.full_name,

134

metadata={

135

"user_id": user_record.id,

136

"role": user_record.role,

137

"permissions": user_record.permissions,

138

"created_at": user_record.created_at.isoformat()

139

}

140

)

141

142

except Exception as e:

143

# Log authentication error

144

print(f"Authentication error for {username}: {e}")

145

return None

146

```

147

148

### Header-Based Authentication

149

150

Authentication using HTTP headers, useful for API keys, JWT tokens, or proxy authentication.

151

152

```python { .api }

153

from fastapi import Headers

154

155

@cl.header_auth_callback

156

async def header_auth(headers: Headers) -> Optional[cl.User]:

157

"""

158

Authentication callback for header-based authentication.

159

Extract and validate authentication information from HTTP headers.

160

161

Args:

162

headers: Headers - FastAPI Headers object containing HTTP request headers

163

164

Signature: Callable[[Headers], Awaitable[Optional[User]]]

165

166

Returns:

167

Optional[cl.User] - User object if authentication succeeds, None if fails

168

169

Usage:

170

Extract tokens, API keys, or other auth data from headers.

171

Validate credentials and return User object for successful auth.

172

"""

173

```

174

175

Usage examples for header authentication:

176

177

```python

178

import chainlit as cl

179

from fastapi import Headers

180

import jwt

181

from typing import Optional

182

183

@cl.header_auth_callback

184

async def authenticate_with_headers(headers: Headers) -> Optional[cl.User]:

185

"""Authenticate using JWT token in Authorization header"""

186

187

# Extract Authorization header

188

auth_header = headers.get("authorization")

189

if not auth_header:

190

return None

191

192

# Check for Bearer token format

193

if not auth_header.startswith("Bearer "):

194

return None

195

196

token = auth_header[7:] # Remove "Bearer " prefix

197

198

try:

199

# Decode JWT token (replace with your JWT secret)

200

payload = jwt.decode(token, "your-jwt-secret", algorithms=["HS256"])

201

202

# Extract user information from token

203

user_id = payload.get("user_id")

204

email = payload.get("email")

205

name = payload.get("name")

206

role = payload.get("role", "user")

207

208

if not user_id or not email:

209

return None

210

211

# Create User object from token data

212

return cl.User(

213

identifier=email,

214

display_name=name or email,

215

metadata={

216

"user_id": user_id,

217

"role": role,

218

"token_exp": payload.get("exp"),

219

"auth_method": "jwt"

220

}

221

)

222

223

except jwt.ExpiredSignatureError:

224

# Token expired

225

return None

226

except jwt.InvalidTokenError:

227

# Invalid token

228

return None

229

230

@cl.header_auth_callback

231

async def api_key_auth(headers: Headers) -> Optional[cl.User]:

232

"""Authenticate using API key in custom header"""

233

234

# Extract API key from custom header

235

api_key = headers.get("x-api-key")

236

if not api_key:

237

return None

238

239

# Validate API key (example with database lookup)

240

try:

241

api_key_record = await db_client.validate_api_key(api_key)

242

243

if not api_key_record or not api_key_record.is_active:

244

return None

245

246

# Get associated user

247

user = await db_client.get_user_by_id(api_key_record.user_id)

248

249

if not user:

250

return None

251

252

return cl.User(

253

identifier=user.email,

254

display_name=user.name,

255

metadata={

256

"user_id": user.id,

257

"api_key_id": api_key_record.id,

258

"auth_method": "api_key",

259

"permissions": api_key_record.permissions

260

}

261

)

262

263

except Exception as e:

264

print(f"API key validation error: {e}")

265

return None

266

267

@cl.header_auth_callback

268

async def proxy_auth(headers: Headers) -> Optional[cl.User]:

269

"""Authenticate using headers from authentication proxy"""

270

271

# Extract user information from proxy headers

272

user_id = headers.get("x-forwarded-user")

273

user_email = headers.get("x-forwarded-email")

274

user_name = headers.get("x-forwarded-name")

275

user_groups = headers.get("x-forwarded-groups", "")

276

277

if not user_id or not user_email:

278

return None

279

280

# Parse groups/roles

281

groups = [g.strip() for g in user_groups.split(",") if g.strip()]

282

283

return cl.User(

284

identifier=user_email,

285

display_name=user_name or user_email,

286

metadata={

287

"proxy_user_id": user_id,

288

"groups": groups,

289

"auth_method": "proxy"

290

}

291

)

292

```

293

294

### OAuth Integration

295

296

OAuth authentication with support for multiple providers and custom user mapping.

297

298

```python { .api }

299

@cl.oauth_callback

300

async def oauth_callback(

301

provider_id: str,

302

token: str,

303

raw_user_data: Dict[str, str],

304

default_app_user: cl.User,

305

id_token: Optional[str] = None

306

) -> Optional[cl.User]:

307

"""

308

OAuth authentication callback for handling OAuth provider responses.

309

310

Args:

311

provider_id: str - OAuth provider identifier (e.g., "google", "github")

312

token: str - OAuth access token

313

raw_user_data: Dict[str, str] - Raw user data from OAuth provider

314

default_app_user: cl.User - Default user object created by Chainlit

315

id_token: Optional[str] - JWT ID token (if available)

316

317

Signature: Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]

318

319

Returns:

320

Optional[cl.User] - Customized user object or None to use default

321

322

Usage:

323

Customize user data mapping, add additional metadata, validate permissions.

324

Return None to use the default user object, or return a custom User object.

325

"""

326

```

327

328

Usage examples for OAuth integration:

329

330

```python

331

import chainlit as cl

332

from typing import Dict, Optional

333

334

@cl.oauth_callback

335

async def process_oauth_user(

336

provider_id: str,

337

token: str,

338

raw_user_data: Dict[str, str],

339

default_user: cl.User,

340

id_token: Optional[str] = None

341

) -> Optional[cl.User]:

342

"""Process OAuth authentication and customize user data"""

343

344

# Extract provider-specific user information

345

if provider_id == "google":

346

email = raw_user_data.get("email")

347

name = raw_user_data.get("name")

348

picture = raw_user_data.get("picture")

349

verified_email = raw_user_data.get("verified_email") == "true"

350

351

if not verified_email:

352

# Reject unverified email addresses

353

return None

354

355

elif provider_id == "github":

356

email = raw_user_data.get("email")

357

name = raw_user_data.get("name") or raw_user_data.get("login")

358

avatar_url = raw_user_data.get("avatar_url")

359

github_id = raw_user_data.get("id")

360

361

elif provider_id == "microsoft":

362

email = raw_user_data.get("mail") or raw_user_data.get("userPrincipalName")

363

name = raw_user_data.get("displayName")

364

365

else:

366

# Unknown provider - use default user

367

return default_user

368

369

# Check if user exists in database and get additional info

370

try:

371

existing_user = await db_client.get_user_by_oauth(provider_id, email)

372

373

if existing_user:

374

# Update existing user with fresh OAuth data

375

await db_client.update_user_oauth_data(

376

existing_user.id,

377

provider_id,

378

raw_user_data,

379

token

380

)

381

382

user_metadata = {

383

"user_id": existing_user.id,

384

"role": existing_user.role,

385

"oauth_provider": provider_id,

386

"permissions": existing_user.permissions,

387

"last_oauth_login": datetime.now().isoformat()

388

}

389

390

else:

391

# Create new user in database

392

new_user = await db_client.create_user_from_oauth(

393

email=email,

394

name=name,

395

provider_id=provider_id,

396

oauth_data=raw_user_data

397

)

398

399

user_metadata = {

400

"user_id": new_user.id,

401

"role": "user", # Default role for new users

402

"oauth_provider": provider_id,

403

"is_new_user": True,

404

"permissions": ["read"], # Default permissions

405

"created_at": datetime.now().isoformat()

406

}

407

408

# Add provider-specific metadata

409

if provider_id == "google" and picture:

410

user_metadata["avatar_url"] = picture

411

elif provider_id == "github" and avatar_url:

412

user_metadata["avatar_url"] = avatar_url

413

user_metadata["github_id"] = github_id

414

415

# Create customized User object

416

return cl.User(

417

identifier=email,

418

display_name=name or email,

419

metadata=user_metadata

420

)

421

422

except Exception as e:

423

print(f"OAuth user processing error: {e}")

424

# Return default user as fallback

425

return default_user

426

427

@cl.oauth_callback

428

async def admin_only_oauth(

429

provider_id: str,

430

token: str,

431

raw_user_data: Dict[str, str],

432

default_user: cl.User,

433

id_token: Optional[str] = None

434

) -> Optional[cl.User]:

435

"""Example: Only allow OAuth for admin users"""

436

437

email = raw_user_data.get("email", "").lower()

438

439

# Define admin email domains/addresses

440

admin_domains = ["@yourcompany.com", "@admin.example.org"]

441

admin_emails = ["admin@example.com", "support@example.com"]

442

443

# Check if user is authorized

444

is_admin = (

445

any(email.endswith(domain) for domain in admin_domains) or

446

email in admin_emails

447

)

448

449

if not is_admin:

450

# Reject non-admin users

451

return None

452

453

# Grant admin privileges

454

return cl.User(

455

identifier=email,

456

display_name=raw_user_data.get("name", email),

457

metadata={

458

"role": "admin",

459

"oauth_provider": provider_id,

460

"admin_verified": True,

461

"permissions": ["read", "write", "admin"]

462

}

463

)

464

```

465

466

### Logout Handling

467

468

Handle user logout events and cleanup sessions.

469

470

```python { .api }

471

from fastapi import Request, Response

472

473

@cl.on_logout

474

async def logout_handler(request: Request, response: Response) -> None:

475

"""

476

Hook executed when user logs out of the application.

477

478

Args:

479

request: Request - FastAPI Request object

480

response: Response - FastAPI Response object for setting cookies/headers

481

482

Signature: Callable[[Request, Response], Any]

483

484

Returns:

485

Any - Return value ignored

486

487

Usage:

488

Cleanup user sessions, invalidate tokens, log audit events.

489

Modify response headers/cookies for additional cleanup.

490

"""

491

```

492

493

Usage examples for logout handling:

494

495

```python

496

import chainlit as cl

497

from fastapi import Request, Response

498

499

@cl.on_logout

500

async def handle_logout(request: Request, response: Response):

501

"""Handle user logout and cleanup"""

502

503

# Get user information before logout

504

user = cl.user_session.get("user")

505

506

if user:

507

# Log logout event

508

await log_user_event(

509

user_id=user.metadata.get("user_id"),

510

event_type="logout",

511

timestamp=datetime.now().isoformat(),

512

ip_address=request.client.host

513

)

514

515

# Invalidate user tokens in database

516

if user.metadata.get("auth_method") == "jwt":

517

token = request.headers.get("authorization", "").replace("Bearer ", "")

518

await db_client.invalidate_token(token)

519

520

# Cleanup user-specific resources

521

await cleanup_user_resources(user.identifier)

522

523

# Clear additional cookies

524

response.delete_cookie("session_id")

525

response.delete_cookie("user_preferences")

526

527

# Optional: Redirect to custom logout page

528

# response.headers["Location"] = "/logout-success"

529

530

print(f"User {user.identifier if user else 'unknown'} logged out")

531

532

async def log_user_event(user_id: str, event_type: str, timestamp: str, **kwargs):

533

"""Log user authentication events for auditing"""

534

try:

535

await db_client.insert_audit_log({

536

"user_id": user_id,

537

"event_type": event_type,

538

"timestamp": timestamp,

539

**kwargs

540

})

541

except Exception as e:

542

print(f"Failed to log user event: {e}")

543

544

async def cleanup_user_resources(user_identifier: str):

545

"""Cleanup user-specific resources on logout"""

546

try:

547

# Clear user caches

548

await cache_client.delete_pattern(f"user:{user_identifier}:*")

549

550

# Cleanup temporary files

551

await cleanup_temp_files(user_identifier)

552

553

# Notify other services of logout

554

await notify_logout_event(user_identifier)

555

556

except Exception as e:

557

print(f"Failed to cleanup resources for {user_identifier}: {e}")

558

```

559

560

### Authentication Configuration

561

562

Configure authentication settings and customize the authentication flow.

563

564

```python

565

import chainlit as cl

566

567

# Example: Configure authentication in app startup

568

@cl.on_app_startup

569

async def configure_auth():

570

"""Configure authentication settings"""

571

572

# Set up database connections for auth

573

await initialize_auth_database()

574

575

# Configure OAuth providers (if using external OAuth)

576

configure_oauth_providers()

577

578

# Set up session management

579

configure_session_settings()

580

581

print("Authentication system initialized")

582

583

# Example: Conditional authentication based on environment

584

@cl.password_auth_callback

585

async def environment_aware_auth(username: str, password: str) -> Optional[cl.User]:

586

"""Authentication that varies by environment"""

587

588

import os

589

environment = os.getenv("ENVIRONMENT", "development")

590

591

if environment == "development":

592

# Simplified auth for development

593

if username == "dev" and password == "dev":

594

return cl.User(

595

identifier="dev@example.com",

596

display_name="Developer",

597

metadata={"role": "admin", "env": "development"}

598

)

599

600

elif environment == "production":

601

# Full authentication for production

602

return await full_database_auth(username, password)

603

604

return None

605

606

# Example: Multi-method authentication

607

@cl.password_auth_callback

608

async def multi_method_auth(username: str, password: str) -> Optional[cl.User]:

609

"""Try multiple authentication methods"""

610

611

# Try LDAP authentication first

612

user = await try_ldap_auth(username, password)

613

if user:

614

return user

615

616

# Fall back to local database

617

user = await try_database_auth(username, password)

618

if user:

619

return user

620

621

# Fall back to external API

622

user = await try_external_auth(username, password)

623

return user

624

```

625

626

## Core Types

627

628

```python { .api }

629

from typing import Dict, Any, Optional

630

from fastapi import Headers, Request, Response

631

632

# Authentication callback types

633

PasswordAuthCallback = Callable[[str, str], Awaitable[Optional[User]]]

634

HeaderAuthCallback = Callable[[Headers], Awaitable[Optional[User]]]

635

OAuthCallback = Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]

636

LogoutCallback = Callable[[Request, Response], Any]

637

638

# OAuth provider data

639

OAuthProviderData = Dict[str, str] # Raw user data from OAuth provider

640

OAuthToken = str # OAuth access token

641

ProviderID = str # OAuth provider identifier

642

643

# User metadata structure

644

UserMetadata = Dict[str, Any] # Additional user information and permissions

645

```