0
# Authentication
1
2
Secure user authentication and authorization with support for password authentication, header-based auth, OAuth providers, and custom authentication workflows. These components enable building secure conversational applications with user management and access control.
3
4
## Capabilities
5
6
### Password Authentication
7
8
Username and password-based authentication with custom validation logic.
9
10
```python { .api }
11
import chainlit as cl
12
from typing import Optional
13
14
@cl.password_auth_callback
15
async def password_auth(username: str, password: str) -> Optional[cl.User]:
16
"""
17
Authentication callback for username/password login.
18
Implement custom validation logic for user credentials.
19
20
Args:
21
username: str - Username or email provided by user
22
password: str - Password provided by user
23
24
Signature: Callable[[str, str], Awaitable[Optional[User]]]
25
26
Returns:
27
Optional[cl.User] - User object if authentication succeeds, None if fails
28
29
Usage:
30
Return a User object for successful authentication, None for failure.
31
The User object will be stored in the session for subsequent requests.
32
"""
33
```
34
35
Usage examples for password authentication:
36
37
```python
38
import chainlit as cl
39
import bcrypt
40
from typing import Optional
41
42
# Mock user database (replace with real database in production)
43
USERS_DB = {
44
"alice@example.com": {
45
"password_hash": "$2b$12$...", # bcrypt hash
46
"display_name": "Alice Smith",
47
"role": "admin",
48
"metadata": {"department": "engineering"}
49
},
50
"bob@example.com": {
51
"password_hash": "$2b$12$...",
52
"display_name": "Bob Johnson",
53
"role": "user",
54
"metadata": {"department": "sales"}
55
}
56
}
57
58
@cl.password_auth_callback
59
async def authenticate_user(username: str, password: str) -> Optional[cl.User]:
60
"""Authenticate user with username/password"""
61
62
# Look up user in database
63
user_data = USERS_DB.get(username.lower())
64
if not user_data:
65
# User not found
66
return None
67
68
# Verify password using bcrypt
69
password_bytes = password.encode('utf-8')
70
hash_bytes = user_data["password_hash"].encode('utf-8')
71
72
if not bcrypt.checkpw(password_bytes, hash_bytes):
73
# Invalid password
74
return None
75
76
# Authentication successful - create User object
77
return cl.User(
78
identifier=username,
79
display_name=user_data["display_name"],
80
metadata={
81
"role": user_data["role"],
82
**user_data["metadata"],
83
"last_login": datetime.now().isoformat()
84
}
85
)
86
87
# Alternative: Simple authentication for development/testing
88
@cl.password_auth_callback
89
async def simple_auth(username: str, password: str) -> Optional[cl.User]:
90
"""Simple authentication for development"""
91
92
# Simple hardcoded authentication (NOT for production)
93
valid_users = {
94
"admin": "secret123",
95
"user": "password",
96
"demo": "demo"
97
}
98
99
if username in valid_users and valid_users[username] == password:
100
return cl.User(
101
identifier=username,
102
display_name=username.title(),
103
metadata={"auth_method": "simple"}
104
)
105
106
return None
107
108
# Database-backed authentication example
109
async def database_auth_example(username: str, password: str) -> Optional[cl.User]:
110
"""Database-backed authentication with async database calls"""
111
112
try:
113
# Query database for user (example with async database client)
114
user_record = await db_client.fetch_user_by_email(username)
115
116
if not user_record:
117
return None
118
119
# Verify password hash
120
if not verify_password(password, user_record.password_hash):
121
return None
122
123
# Check if account is active
124
if not user_record.is_active:
125
return None
126
127
# Update last login timestamp
128
await db_client.update_last_login(user_record.id)
129
130
# Create Chainlit User object
131
return cl.User(
132
identifier=user_record.email,
133
display_name=user_record.full_name,
134
metadata={
135
"user_id": user_record.id,
136
"role": user_record.role,
137
"permissions": user_record.permissions,
138
"created_at": user_record.created_at.isoformat()
139
}
140
)
141
142
except Exception as e:
143
# Log authentication error
144
print(f"Authentication error for {username}: {e}")
145
return None
146
```
147
148
### Header-Based Authentication
149
150
Authentication using HTTP headers, useful for API keys, JWT tokens, or proxy authentication.
151
152
```python { .api }
153
from fastapi import Headers
154
155
@cl.header_auth_callback
156
async def header_auth(headers: Headers) -> Optional[cl.User]:
157
"""
158
Authentication callback for header-based authentication.
159
Extract and validate authentication information from HTTP headers.
160
161
Args:
162
headers: Headers - FastAPI Headers object containing HTTP request headers
163
164
Signature: Callable[[Headers], Awaitable[Optional[User]]]
165
166
Returns:
167
Optional[cl.User] - User object if authentication succeeds, None if fails
168
169
Usage:
170
Extract tokens, API keys, or other auth data from headers.
171
Validate credentials and return User object for successful auth.
172
"""
173
```
174
175
Usage examples for header authentication:
176
177
```python
178
import chainlit as cl
179
from fastapi import Headers
180
import jwt
181
from typing import Optional
182
183
@cl.header_auth_callback
184
async def authenticate_with_headers(headers: Headers) -> Optional[cl.User]:
185
"""Authenticate using JWT token in Authorization header"""
186
187
# Extract Authorization header
188
auth_header = headers.get("authorization")
189
if not auth_header:
190
return None
191
192
# Check for Bearer token format
193
if not auth_header.startswith("Bearer "):
194
return None
195
196
token = auth_header[7:] # Remove "Bearer " prefix
197
198
try:
199
# Decode JWT token (replace with your JWT secret)
200
payload = jwt.decode(token, "your-jwt-secret", algorithms=["HS256"])
201
202
# Extract user information from token
203
user_id = payload.get("user_id")
204
email = payload.get("email")
205
name = payload.get("name")
206
role = payload.get("role", "user")
207
208
if not user_id or not email:
209
return None
210
211
# Create User object from token data
212
return cl.User(
213
identifier=email,
214
display_name=name or email,
215
metadata={
216
"user_id": user_id,
217
"role": role,
218
"token_exp": payload.get("exp"),
219
"auth_method": "jwt"
220
}
221
)
222
223
except jwt.ExpiredSignatureError:
224
# Token expired
225
return None
226
except jwt.InvalidTokenError:
227
# Invalid token
228
return None
229
230
@cl.header_auth_callback
231
async def api_key_auth(headers: Headers) -> Optional[cl.User]:
232
"""Authenticate using API key in custom header"""
233
234
# Extract API key from custom header
235
api_key = headers.get("x-api-key")
236
if not api_key:
237
return None
238
239
# Validate API key (example with database lookup)
240
try:
241
api_key_record = await db_client.validate_api_key(api_key)
242
243
if not api_key_record or not api_key_record.is_active:
244
return None
245
246
# Get associated user
247
user = await db_client.get_user_by_id(api_key_record.user_id)
248
249
if not user:
250
return None
251
252
return cl.User(
253
identifier=user.email,
254
display_name=user.name,
255
metadata={
256
"user_id": user.id,
257
"api_key_id": api_key_record.id,
258
"auth_method": "api_key",
259
"permissions": api_key_record.permissions
260
}
261
)
262
263
except Exception as e:
264
print(f"API key validation error: {e}")
265
return None
266
267
@cl.header_auth_callback
268
async def proxy_auth(headers: Headers) -> Optional[cl.User]:
269
"""Authenticate using headers from authentication proxy"""
270
271
# Extract user information from proxy headers
272
user_id = headers.get("x-forwarded-user")
273
user_email = headers.get("x-forwarded-email")
274
user_name = headers.get("x-forwarded-name")
275
user_groups = headers.get("x-forwarded-groups", "")
276
277
if not user_id or not user_email:
278
return None
279
280
# Parse groups/roles
281
groups = [g.strip() for g in user_groups.split(",") if g.strip()]
282
283
return cl.User(
284
identifier=user_email,
285
display_name=user_name or user_email,
286
metadata={
287
"proxy_user_id": user_id,
288
"groups": groups,
289
"auth_method": "proxy"
290
}
291
)
292
```
293
294
### OAuth Integration
295
296
OAuth authentication with support for multiple providers and custom user mapping.
297
298
```python { .api }
299
@cl.oauth_callback
300
async def oauth_callback(
301
provider_id: str,
302
token: str,
303
raw_user_data: Dict[str, str],
304
default_app_user: cl.User,
305
id_token: Optional[str] = None
306
) -> Optional[cl.User]:
307
"""
308
OAuth authentication callback for handling OAuth provider responses.
309
310
Args:
311
provider_id: str - OAuth provider identifier (e.g., "google", "github")
312
token: str - OAuth access token
313
raw_user_data: Dict[str, str] - Raw user data from OAuth provider
314
default_app_user: cl.User - Default user object created by Chainlit
315
id_token: Optional[str] - JWT ID token (if available)
316
317
Signature: Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]
318
319
Returns:
320
Optional[cl.User] - Customized user object or None to use default
321
322
Usage:
323
Customize user data mapping, add additional metadata, validate permissions.
324
Return None to use the default user object, or return a custom User object.
325
"""
326
```
327
328
Usage examples for OAuth integration:
329
330
```python
331
import chainlit as cl
332
from typing import Dict, Optional
333
334
@cl.oauth_callback
335
async def process_oauth_user(
336
provider_id: str,
337
token: str,
338
raw_user_data: Dict[str, str],
339
default_user: cl.User,
340
id_token: Optional[str] = None
341
) -> Optional[cl.User]:
342
"""Process OAuth authentication and customize user data"""
343
344
# Extract provider-specific user information
345
if provider_id == "google":
346
email = raw_user_data.get("email")
347
name = raw_user_data.get("name")
348
picture = raw_user_data.get("picture")
349
verified_email = raw_user_data.get("verified_email") == "true"
350
351
if not verified_email:
352
# Reject unverified email addresses
353
return None
354
355
elif provider_id == "github":
356
email = raw_user_data.get("email")
357
name = raw_user_data.get("name") or raw_user_data.get("login")
358
avatar_url = raw_user_data.get("avatar_url")
359
github_id = raw_user_data.get("id")
360
361
elif provider_id == "microsoft":
362
email = raw_user_data.get("mail") or raw_user_data.get("userPrincipalName")
363
name = raw_user_data.get("displayName")
364
365
else:
366
# Unknown provider - use default user
367
return default_user
368
369
# Check if user exists in database and get additional info
370
try:
371
existing_user = await db_client.get_user_by_oauth(provider_id, email)
372
373
if existing_user:
374
# Update existing user with fresh OAuth data
375
await db_client.update_user_oauth_data(
376
existing_user.id,
377
provider_id,
378
raw_user_data,
379
token
380
)
381
382
user_metadata = {
383
"user_id": existing_user.id,
384
"role": existing_user.role,
385
"oauth_provider": provider_id,
386
"permissions": existing_user.permissions,
387
"last_oauth_login": datetime.now().isoformat()
388
}
389
390
else:
391
# Create new user in database
392
new_user = await db_client.create_user_from_oauth(
393
email=email,
394
name=name,
395
provider_id=provider_id,
396
oauth_data=raw_user_data
397
)
398
399
user_metadata = {
400
"user_id": new_user.id,
401
"role": "user", # Default role for new users
402
"oauth_provider": provider_id,
403
"is_new_user": True,
404
"permissions": ["read"], # Default permissions
405
"created_at": datetime.now().isoformat()
406
}
407
408
# Add provider-specific metadata
409
if provider_id == "google" and picture:
410
user_metadata["avatar_url"] = picture
411
elif provider_id == "github" and avatar_url:
412
user_metadata["avatar_url"] = avatar_url
413
user_metadata["github_id"] = github_id
414
415
# Create customized User object
416
return cl.User(
417
identifier=email,
418
display_name=name or email,
419
metadata=user_metadata
420
)
421
422
except Exception as e:
423
print(f"OAuth user processing error: {e}")
424
# Return default user as fallback
425
return default_user
426
427
@cl.oauth_callback
428
async def admin_only_oauth(
429
provider_id: str,
430
token: str,
431
raw_user_data: Dict[str, str],
432
default_user: cl.User,
433
id_token: Optional[str] = None
434
) -> Optional[cl.User]:
435
"""Example: Only allow OAuth for admin users"""
436
437
email = raw_user_data.get("email", "").lower()
438
439
# Define admin email domains/addresses
440
admin_domains = ["@yourcompany.com", "@admin.example.org"]
441
admin_emails = ["admin@example.com", "support@example.com"]
442
443
# Check if user is authorized
444
is_admin = (
445
any(email.endswith(domain) for domain in admin_domains) or
446
email in admin_emails
447
)
448
449
if not is_admin:
450
# Reject non-admin users
451
return None
452
453
# Grant admin privileges
454
return cl.User(
455
identifier=email,
456
display_name=raw_user_data.get("name", email),
457
metadata={
458
"role": "admin",
459
"oauth_provider": provider_id,
460
"admin_verified": True,
461
"permissions": ["read", "write", "admin"]
462
}
463
)
464
```
465
466
### Logout Handling
467
468
Handle user logout events and cleanup sessions.
469
470
```python { .api }
471
from fastapi import Request, Response
472
473
@cl.on_logout
474
async def logout_handler(request: Request, response: Response) -> None:
475
"""
476
Hook executed when user logs out of the application.
477
478
Args:
479
request: Request - FastAPI Request object
480
response: Response - FastAPI Response object for setting cookies/headers
481
482
Signature: Callable[[Request, Response], Any]
483
484
Returns:
485
Any - Return value ignored
486
487
Usage:
488
Cleanup user sessions, invalidate tokens, log audit events.
489
Modify response headers/cookies for additional cleanup.
490
"""
491
```
492
493
Usage examples for logout handling:
494
495
```python
496
import chainlit as cl
497
from fastapi import Request, Response
498
499
@cl.on_logout
500
async def handle_logout(request: Request, response: Response):
501
"""Handle user logout and cleanup"""
502
503
# Get user information before logout
504
user = cl.user_session.get("user")
505
506
if user:
507
# Log logout event
508
await log_user_event(
509
user_id=user.metadata.get("user_id"),
510
event_type="logout",
511
timestamp=datetime.now().isoformat(),
512
ip_address=request.client.host
513
)
514
515
# Invalidate user tokens in database
516
if user.metadata.get("auth_method") == "jwt":
517
token = request.headers.get("authorization", "").replace("Bearer ", "")
518
await db_client.invalidate_token(token)
519
520
# Cleanup user-specific resources
521
await cleanup_user_resources(user.identifier)
522
523
# Clear additional cookies
524
response.delete_cookie("session_id")
525
response.delete_cookie("user_preferences")
526
527
# Optional: Redirect to custom logout page
528
# response.headers["Location"] = "/logout-success"
529
530
print(f"User {user.identifier if user else 'unknown'} logged out")
531
532
async def log_user_event(user_id: str, event_type: str, timestamp: str, **kwargs):
533
"""Log user authentication events for auditing"""
534
try:
535
await db_client.insert_audit_log({
536
"user_id": user_id,
537
"event_type": event_type,
538
"timestamp": timestamp,
539
**kwargs
540
})
541
except Exception as e:
542
print(f"Failed to log user event: {e}")
543
544
async def cleanup_user_resources(user_identifier: str):
545
"""Cleanup user-specific resources on logout"""
546
try:
547
# Clear user caches
548
await cache_client.delete_pattern(f"user:{user_identifier}:*")
549
550
# Cleanup temporary files
551
await cleanup_temp_files(user_identifier)
552
553
# Notify other services of logout
554
await notify_logout_event(user_identifier)
555
556
except Exception as e:
557
print(f"Failed to cleanup resources for {user_identifier}: {e}")
558
```
559
560
### Authentication Configuration
561
562
Configure authentication settings and customize the authentication flow.
563
564
```python
565
import chainlit as cl
566
567
# Example: Configure authentication in app startup
568
@cl.on_app_startup
569
async def configure_auth():
570
"""Configure authentication settings"""
571
572
# Set up database connections for auth
573
await initialize_auth_database()
574
575
# Configure OAuth providers (if using external OAuth)
576
configure_oauth_providers()
577
578
# Set up session management
579
configure_session_settings()
580
581
print("Authentication system initialized")
582
583
# Example: Conditional authentication based on environment
584
@cl.password_auth_callback
585
async def environment_aware_auth(username: str, password: str) -> Optional[cl.User]:
586
"""Authentication that varies by environment"""
587
588
import os
589
environment = os.getenv("ENVIRONMENT", "development")
590
591
if environment == "development":
592
# Simplified auth for development
593
if username == "dev" and password == "dev":
594
return cl.User(
595
identifier="dev@example.com",
596
display_name="Developer",
597
metadata={"role": "admin", "env": "development"}
598
)
599
600
elif environment == "production":
601
# Full authentication for production
602
return await full_database_auth(username, password)
603
604
return None
605
606
# Example: Multi-method authentication
607
@cl.password_auth_callback
608
async def multi_method_auth(username: str, password: str) -> Optional[cl.User]:
609
"""Try multiple authentication methods"""
610
611
# Try LDAP authentication first
612
user = await try_ldap_auth(username, password)
613
if user:
614
return user
615
616
# Fall back to local database
617
user = await try_database_auth(username, password)
618
if user:
619
return user
620
621
# Fall back to external API
622
user = await try_external_auth(username, password)
623
return user
624
```
625
626
## Core Types
627
628
```python { .api }
629
from typing import Dict, Any, Optional
630
from fastapi import Headers, Request, Response
631
632
# Authentication callback types
633
PasswordAuthCallback = Callable[[str, str], Awaitable[Optional[User]]]
634
HeaderAuthCallback = Callable[[Headers], Awaitable[Optional[User]]]
635
OAuthCallback = Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]
636
LogoutCallback = Callable[[Request, Response], Any]
637
638
# OAuth provider data
639
OAuthProviderData = Dict[str, str] # Raw user data from OAuth provider
640
OAuthToken = str # OAuth access token
641
ProviderID = str # OAuth provider identifier
642
643
# User metadata structure
644
UserMetadata = Dict[str, Any] # Additional user information and permissions
645
```