0
# Authentication
1
2
Comprehensive authentication system supporting multiple providers and tokens for secure server and client connections. FastMCP provides flexible authentication mechanisms for both server protection and client authorization.
3
4
## Capabilities
5
6
### Server Authentication Providers
7
8
Authentication providers for securing FastMCP servers against unauthorized access.
9
10
```python { .api }
11
class AuthProvider:
12
"""Base class for authentication providers."""
13
14
async def authenticate(self, request: Any) -> AccessToken | None:
15
"""
16
Authenticate a request and return access token.
17
18
Parameters:
19
- request: Request object to authenticate
20
21
Returns:
22
AccessToken if authentication successful, None otherwise
23
"""
24
25
class OAuthProvider(AuthProvider):
26
"""OAuth 2.0 authentication provider."""
27
28
def __init__(
29
self,
30
client_id: str,
31
client_secret: str,
32
token_url: str,
33
scope: list[str] | None = None,
34
audience: str | None = None
35
):
36
"""
37
Initialize OAuth provider.
38
39
Parameters:
40
- client_id: OAuth client ID
41
- client_secret: OAuth client secret
42
- token_url: Token endpoint URL
43
- scope: Required scopes
44
- audience: Token audience
45
"""
46
47
class JWTVerifier(AuthProvider):
48
"""JWT token verification provider."""
49
50
def __init__(
51
self,
52
secret: str,
53
algorithms: list[str] = ["HS256"],
54
audience: str | None = None,
55
issuer: str | None = None
56
):
57
"""
58
Initialize JWT verifier.
59
60
Parameters:
61
- secret: JWT signing secret
62
- algorithms: Allowed signing algorithms
63
- audience: Expected token audience
64
- issuer: Expected token issuer
65
"""
66
67
class StaticTokenVerifier(AuthProvider):
68
"""Static token verification provider."""
69
70
def __init__(
71
self,
72
valid_tokens: dict[str, dict] | list[str],
73
token_type: str = "bearer"
74
):
75
"""
76
Initialize static token verifier.
77
78
Parameters:
79
- valid_tokens: Dictionary of token->metadata or list of valid tokens
80
- token_type: Type of tokens to accept
81
"""
82
83
class RemoteAuthProvider(AuthProvider):
84
"""Remote authentication provider using external service."""
85
86
def __init__(
87
self,
88
verify_url: str,
89
headers: dict[str, str] | None = None,
90
timeout: float = 10.0
91
):
92
"""
93
Initialize remote auth provider.
94
95
Parameters:
96
- verify_url: URL for token verification
97
- headers: Additional headers for verification requests
98
- timeout: Request timeout
99
"""
100
```
101
102
### Client Authentication
103
104
Authentication classes for clients connecting to secured servers.
105
106
```python { .api }
107
class BearerAuth:
108
"""Bearer token authentication for clients."""
109
110
def __init__(self, token: str):
111
"""
112
Initialize bearer authentication.
113
114
Parameters:
115
- token: Bearer token string
116
"""
117
118
class OAuth:
119
"""OAuth 2.0 authentication for clients."""
120
121
def __init__(
122
self,
123
client_id: str,
124
client_secret: str,
125
token_url: str,
126
scope: list[str] | None = None,
127
audience: str | None = None
128
):
129
"""
130
Initialize OAuth authentication.
131
132
Parameters:
133
- client_id: OAuth client ID
134
- client_secret: OAuth client secret
135
- token_url: Token endpoint URL
136
- scope: Requested scopes
137
- audience: Token audience
138
"""
139
140
async def get_token(self) -> str:
141
"""
142
Get access token from OAuth provider.
143
144
Returns:
145
Access token string
146
"""
147
```
148
149
### Access Token Model
150
151
Token representation containing authentication information and metadata.
152
153
```python { .api }
154
class AccessToken:
155
def __init__(
156
self,
157
token: str,
158
token_type: str = "bearer",
159
expires_at: datetime | None = None,
160
scope: list[str] | None = None,
161
user_id: str | None = None,
162
metadata: dict | None = None
163
):
164
"""
165
Access token representation.
166
167
Parameters:
168
- token: Token string
169
- token_type: Type of token (bearer, etc.)
170
- expires_at: Token expiration time
171
- scope: Token scopes/permissions
172
- user_id: Associated user ID
173
- metadata: Additional token metadata
174
"""
175
176
def is_expired(self) -> bool:
177
"""Check if token is expired."""
178
179
def has_scope(self, required_scope: str) -> bool:
180
"""Check if token has required scope."""
181
```
182
183
## Usage Examples
184
185
### Server Authentication Setup
186
187
```python
188
from fastmcp import FastMCP
189
from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier, OAuthProvider
190
191
# JWT Authentication
192
jwt_auth = JWTVerifier(
193
secret="your-jwt-secret-key",
194
algorithms=["HS256"],
195
audience="fastmcp-server",
196
issuer="your-auth-service"
197
)
198
199
mcp = FastMCP(
200
name="Secure Server",
201
auth_provider=jwt_auth
202
)
203
204
@mcp.tool
205
def protected_operation(data: str) -> str:
206
"""This tool requires authentication."""
207
return f"Processed: {data}"
208
209
# Run with authentication
210
mcp.run(transport="http", port=8080)
211
```
212
213
### Static Token Authentication
214
215
```python
216
from fastmcp import FastMCP
217
from fastmcp.server.auth import StaticTokenVerifier
218
219
# Define valid tokens with metadata
220
valid_tokens = {
221
"admin-token-123": {
222
"user_id": "admin",
223
"scope": ["read", "write", "admin"],
224
"role": "administrator"
225
},
226
"user-token-456": {
227
"user_id": "user1",
228
"scope": ["read", "write"],
229
"role": "user"
230
},
231
"readonly-token-789": {
232
"user_id": "readonly",
233
"scope": ["read"],
234
"role": "readonly"
235
}
236
}
237
238
static_auth = StaticTokenVerifier(valid_tokens)
239
240
mcp = FastMCP(
241
name="Token Protected Server",
242
auth_provider=static_auth
243
)
244
245
@mcp.tool
246
def read_data() -> dict:
247
"""Requires 'read' scope."""
248
return {"data": "public information"}
249
250
@mcp.tool
251
def write_data(content: str) -> str:
252
"""Requires 'write' scope."""
253
# This would check token scope in practice
254
return f"Wrote: {content}"
255
256
mcp.run(transport="http", port=8080)
257
```
258
259
### OAuth Server Authentication
260
261
```python
262
from fastmcp import FastMCP
263
from fastmcp.server.auth import OAuthProvider
264
265
oauth_auth = OAuthProvider(
266
client_id="your-oauth-client-id",
267
client_secret="your-oauth-client-secret",
268
token_url="https://auth.example.com/oauth/token",
269
scope=["mcp:access"],
270
audience="mcp-server"
271
)
272
273
mcp = FastMCP(
274
name="OAuth Protected Server",
275
auth_provider=oauth_auth
276
)
277
278
@mcp.tool
279
async def oauth_protected_tool(query: str) -> str:
280
"""Tool that requires OAuth authentication."""
281
return f"Query result: {query}"
282
283
mcp.run(transport="http", port=8080)
284
```
285
286
### Remote Authentication Provider
287
288
```python
289
from fastmcp import FastMCP
290
from fastmcp.server.auth import RemoteAuthProvider
291
292
remote_auth = RemoteAuthProvider(
293
verify_url="https://auth-service.example.com/verify",
294
headers={
295
"X-Service-Key": "your-service-key",
296
"Content-Type": "application/json"
297
},
298
timeout=5.0
299
)
300
301
mcp = FastMCP(
302
name="Remote Auth Server",
303
auth_provider=remote_auth
304
)
305
306
@mcp.tool
307
def external_auth_tool(input_data: str) -> str:
308
"""Tool protected by external authentication service."""
309
return f"Processed with external auth: {input_data}"
310
311
mcp.run(transport="http", port=8080)
312
```
313
314
### Client Authentication Usage
315
316
```python
317
from fastmcp import Client
318
from fastmcp.client.auth import BearerAuth, OAuth
319
320
async def bearer_auth_client():
321
"""Client with bearer token authentication."""
322
auth = BearerAuth("your-bearer-token")
323
324
async with Client(
325
"http://secure-server.example.com/mcp",
326
auth=auth
327
) as client:
328
result = await client.call_tool("protected_operation", {"data": "test"})
329
return result.text
330
331
async def oauth_client():
332
"""Client with OAuth authentication."""
333
oauth = OAuth(
334
client_id="client-id",
335
client_secret="client-secret",
336
token_url="https://auth.example.com/token",
337
scope=["mcp:access"]
338
)
339
340
async with Client(
341
"http://oauth-server.example.com/mcp",
342
auth=oauth
343
) as client:
344
# OAuth token is automatically obtained and used
345
result = await client.call_tool("oauth_protected_tool", {"query": "hello"})
346
return result.text
347
348
async def manual_token_client():
349
"""Client with manually managed token."""
350
# Get token from your auth system
351
token = await get_auth_token_from_somewhere()
352
353
auth = BearerAuth(token)
354
355
async with Client(
356
"https://api.example.com/mcp",
357
auth=auth
358
) as client:
359
result = await client.call_tool("secure_api", {})
360
return result.text
361
```
362
363
### Scope-Based Authorization
364
365
```python
366
from fastmcp import FastMCP, Context
367
from fastmcp.server.auth import JWTVerifier
368
from fastmcp.server.dependencies import get_access_token
369
370
jwt_auth = JWTVerifier(
371
secret="jwt-secret",
372
algorithms=["HS256"]
373
)
374
375
mcp = FastMCP(
376
name="Scope Protected Server",
377
auth_provider=jwt_auth
378
)
379
380
def require_scope(required_scope: str):
381
"""Decorator to require specific scope."""
382
def decorator(func):
383
def wrapper(*args, **kwargs):
384
token = get_access_token()
385
if not token or not token.has_scope(required_scope):
386
raise PermissionError(f"Missing required scope: {required_scope}")
387
return func(*args, **kwargs)
388
return wrapper
389
return decorator
390
391
@mcp.tool
392
@require_scope("read")
393
def read_users() -> list[dict]:
394
"""Read user data - requires 'read' scope."""
395
return [
396
{"id": 1, "name": "Alice"},
397
{"id": 2, "name": "Bob"}
398
]
399
400
@mcp.tool
401
@require_scope("write")
402
def create_user(name: str, email: str) -> dict:
403
"""Create user - requires 'write' scope."""
404
return {
405
"id": 123,
406
"name": name,
407
"email": email,
408
"created": "2024-01-01T00:00:00Z"
409
}
410
411
@mcp.tool
412
@require_scope("admin")
413
def delete_user(user_id: int) -> str:
414
"""Delete user - requires 'admin' scope."""
415
return f"User {user_id} deleted"
416
417
@mcp.tool
418
async def get_user_info(ctx: Context) -> dict:
419
"""Get current user info from token."""
420
token = get_access_token()
421
if not token:
422
await ctx.error("No authentication token")
423
return {"error": "Not authenticated"}
424
425
return {
426
"user_id": token.user_id,
427
"token_type": token.token_type,
428
"scopes": token.scope,
429
"expires_at": token.expires_at.isoformat() if token.expires_at else None
430
}
431
432
mcp.run(transport="http", port=8080)
433
```
434
435
### Advanced Authentication Patterns
436
437
```python
438
from fastmcp import FastMCP, Context
439
from fastmcp.server.auth import AuthProvider, AccessToken
440
from fastmcp.server.dependencies import get_access_token, get_http_headers
441
import jwt
442
import httpx
443
from datetime import datetime, timezone
444
445
class CustomAuthProvider(AuthProvider):
446
"""Custom authentication provider with multiple methods."""
447
448
def __init__(self, api_keys: dict[str, dict], jwt_secret: str):
449
self.api_keys = api_keys
450
self.jwt_secret = jwt_secret
451
452
async def authenticate(self, request) -> AccessToken | None:
453
"""Authenticate using API key or JWT token."""
454
headers = getattr(request, 'headers', {})
455
456
# Try API key authentication
457
api_key = headers.get('x-api-key')
458
if api_key and api_key in self.api_keys:
459
key_info = self.api_keys[api_key]
460
return AccessToken(
461
token=api_key,
462
token_type="api_key",
463
user_id=key_info["user_id"],
464
scope=key_info["scope"],
465
metadata=key_info
466
)
467
468
# Try JWT authentication
469
auth_header = headers.get('authorization', '')
470
if auth_header.startswith('Bearer '):
471
token = auth_header[7:]
472
try:
473
payload = jwt.decode(
474
token,
475
self.jwt_secret,
476
algorithms=["HS256"]
477
)
478
479
expires_at = None
480
if 'exp' in payload:
481
expires_at = datetime.fromtimestamp(payload['exp'], timezone.utc)
482
483
return AccessToken(
484
token=token,
485
token_type="bearer",
486
user_id=payload.get('sub'),
487
scope=payload.get('scope', []),
488
expires_at=expires_at,
489
metadata=payload
490
)
491
except jwt.InvalidTokenError:
492
pass
493
494
return None
495
496
# Set up custom auth
497
api_keys = {
498
"key-123": {
499
"user_id": "service_account",
500
"scope": ["read", "write"],
501
"name": "Service Account"
502
}
503
}
504
505
custom_auth = CustomAuthProvider(api_keys, "jwt-secret")
506
507
mcp = FastMCP(
508
name="Multi-Auth Server",
509
auth_provider=custom_auth
510
)
511
512
@mcp.tool
513
async def auth_demo(ctx: Context) -> dict:
514
"""Demonstrate authentication information access."""
515
token = get_access_token()
516
headers = get_http_headers()
517
518
if not token:
519
await ctx.error("No authentication provided")
520
return {"error": "Authentication required"}
521
522
await ctx.info(f"Authenticated user: {token.user_id}")
523
524
return {
525
"authentication": {
526
"user_id": token.user_id,
527
"token_type": token.token_type,
528
"scopes": token.scope,
529
"is_expired": token.is_expired(),
530
"metadata": token.metadata
531
},
532
"request_info": {
533
"user_agent": headers.get("user-agent", "unknown"),
534
"ip_address": headers.get("x-forwarded-for", "unknown"),
535
"timestamp": datetime.now(timezone.utc).isoformat()
536
}
537
}
538
539
mcp.run(transport="http", port=8080)
540
```
541
542
### Authentication Error Handling
543
544
```python
545
from fastmcp import Client
546
from fastmcp.client.auth import BearerAuth
547
from fastmcp.exceptions import ClientError
548
import asyncio
549
550
async def robust_auth_client():
551
"""Client with authentication error handling."""
552
553
# Try with potentially expired token
554
auth = BearerAuth("potentially-expired-token")
555
556
try:
557
async with Client(
558
"https://secure-api.example.com/mcp",
559
auth=auth
560
) as client:
561
result = await client.call_tool("protected_tool", {})
562
return result.text
563
564
except ClientError as e:
565
if "401" in str(e) or "unauthorized" in str(e).lower():
566
print("Authentication failed - token may be expired")
567
568
# Refresh token and retry
569
new_token = await refresh_auth_token()
570
auth = BearerAuth(new_token)
571
572
async with Client(
573
"https://secure-api.example.com/mcp",
574
auth=auth
575
) as client:
576
result = await client.call_tool("protected_tool", {})
577
return result.text
578
else:
579
raise
580
581
async def refresh_auth_token() -> str:
582
"""Refresh authentication token."""
583
# Implementation would depend on your auth system
584
async with httpx.AsyncClient() as client:
585
response = await client.post(
586
"https://auth.example.com/refresh",
587
json={"refresh_token": "stored-refresh-token"}
588
)
589
return response.json()["access_token"]
590
```
591
592
## Security Best Practices
593
594
### Token Management
595
596
```python
597
# Store tokens securely
598
import os
599
from pathlib import Path
600
601
def get_secure_token():
602
"""Get token from secure storage."""
603
# From environment variable
604
token = os.getenv("MCP_AUTH_TOKEN")
605
if token:
606
return token
607
608
# From secure file
609
token_file = Path.home() / ".config" / "mcp" / "token"
610
if token_file.exists():
611
return token_file.read_text().strip()
612
613
raise ValueError("No authentication token found")
614
615
# Use secure token
616
auth = BearerAuth(get_secure_token())
617
```
618
619
### Scope Validation
620
621
```python
622
def validate_scopes(required_scopes: list[str]):
623
"""Validate that token has all required scopes."""
624
token = get_access_token()
625
if not token:
626
raise PermissionError("Authentication required")
627
628
missing_scopes = [
629
scope for scope in required_scopes
630
if not token.has_scope(scope)
631
]
632
633
if missing_scopes:
634
raise PermissionError(f"Missing scopes: {', '.join(missing_scopes)}")
635
```