Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

pypi-fastmcp

Describes: pypipypi/fastmcp

Description
The fast, Pythonic way to build MCP servers and clients with minimal boilerplate code.
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-fastmcp@2.12.0

authentication.md docs/

1
# Authentication
2
3
Comprehensive authentication system supporting multiple providers and tokens for secure server and client connections. FastMCP provides flexible authentication mechanisms for both server protection and client authorization.
4
5
## Capabilities
6
7
### Server Authentication Providers
8
9
Authentication providers for securing FastMCP servers against unauthorized access.
10
11
```python { .api }
12
class AuthProvider:
13
"""Base class for authentication providers."""
14
15
async def authenticate(self, request: Any) -> AccessToken | None:
16
"""
17
Authenticate a request and return access token.
18
19
Parameters:
20
- request: Request object to authenticate
21
22
Returns:
23
AccessToken if authentication successful, None otherwise
24
"""
25
26
class OAuthProvider(AuthProvider):
27
"""OAuth 2.0 authentication provider."""
28
29
def __init__(
30
self,
31
client_id: str,
32
client_secret: str,
33
token_url: str,
34
scope: list[str] | None = None,
35
audience: str | None = None
36
):
37
"""
38
Initialize OAuth provider.
39
40
Parameters:
41
- client_id: OAuth client ID
42
- client_secret: OAuth client secret
43
- token_url: Token endpoint URL
44
- scope: Required scopes
45
- audience: Token audience
46
"""
47
48
class JWTVerifier(AuthProvider):
49
"""JWT token verification provider."""
50
51
def __init__(
52
self,
53
secret: str,
54
algorithms: list[str] = ["HS256"],
55
audience: str | None = None,
56
issuer: str | None = None
57
):
58
"""
59
Initialize JWT verifier.
60
61
Parameters:
62
- secret: JWT signing secret
63
- algorithms: Allowed signing algorithms
64
- audience: Expected token audience
65
- issuer: Expected token issuer
66
"""
67
68
class StaticTokenVerifier(AuthProvider):
69
"""Static token verification provider."""
70
71
def __init__(
72
self,
73
valid_tokens: dict[str, dict] | list[str],
74
token_type: str = "bearer"
75
):
76
"""
77
Initialize static token verifier.
78
79
Parameters:
80
- valid_tokens: Dictionary of token->metadata or list of valid tokens
81
- token_type: Type of tokens to accept
82
"""
83
84
class RemoteAuthProvider(AuthProvider):
85
"""Remote authentication provider using external service."""
86
87
def __init__(
88
self,
89
verify_url: str,
90
headers: dict[str, str] | None = None,
91
timeout: float = 10.0
92
):
93
"""
94
Initialize remote auth provider.
95
96
Parameters:
97
- verify_url: URL for token verification
98
- headers: Additional headers for verification requests
99
- timeout: Request timeout
100
"""
101
```
102
103
### Client Authentication
104
105
Authentication classes for clients connecting to secured servers.
106
107
```python { .api }
108
class BearerAuth:
109
"""Bearer token authentication for clients."""
110
111
def __init__(self, token: str):
112
"""
113
Initialize bearer authentication.
114
115
Parameters:
116
- token: Bearer token string
117
"""
118
119
class OAuth:
120
"""OAuth 2.0 authentication for clients."""
121
122
def __init__(
123
self,
124
client_id: str,
125
client_secret: str,
126
token_url: str,
127
scope: list[str] | None = None,
128
audience: str | None = None
129
):
130
"""
131
Initialize OAuth authentication.
132
133
Parameters:
134
- client_id: OAuth client ID
135
- client_secret: OAuth client secret
136
- token_url: Token endpoint URL
137
- scope: Requested scopes
138
- audience: Token audience
139
"""
140
141
async def get_token(self) -> str:
142
"""
143
Get access token from OAuth provider.
144
145
Returns:
146
Access token string
147
"""
148
```
149
150
### Access Token Model
151
152
Token representation containing authentication information and metadata.
153
154
```python { .api }
155
class AccessToken:
156
def __init__(
157
self,
158
token: str,
159
token_type: str = "bearer",
160
expires_at: datetime | None = None,
161
scope: list[str] | None = None,
162
user_id: str | None = None,
163
metadata: dict | None = None
164
):
165
"""
166
Access token representation.
167
168
Parameters:
169
- token: Token string
170
- token_type: Type of token (bearer, etc.)
171
- expires_at: Token expiration time
172
- scope: Token scopes/permissions
173
- user_id: Associated user ID
174
- metadata: Additional token metadata
175
"""
176
177
def is_expired(self) -> bool:
178
"""Check if token is expired."""
179
180
def has_scope(self, required_scope: str) -> bool:
181
"""Check if token has required scope."""
182
```
183
184
## Usage Examples
185
186
### Server Authentication Setup
187
188
```python
189
from fastmcp import FastMCP
190
from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier, OAuthProvider
191
192
# JWT Authentication
193
jwt_auth = JWTVerifier(
194
secret="your-jwt-secret-key",
195
algorithms=["HS256"],
196
audience="fastmcp-server",
197
issuer="your-auth-service"
198
)
199
200
mcp = FastMCP(
201
name="Secure Server",
202
auth_provider=jwt_auth
203
)
204
205
@mcp.tool
206
def protected_operation(data: str) -> str:
207
"""This tool requires authentication."""
208
return f"Processed: {data}"
209
210
# Run with authentication
211
mcp.run(transport="http", port=8080)
212
```
213
214
### Static Token Authentication
215
216
```python
217
from fastmcp import FastMCP
218
from fastmcp.server.auth import StaticTokenVerifier
219
220
# Define valid tokens with metadata
221
valid_tokens = {
222
"admin-token-123": {
223
"user_id": "admin",
224
"scope": ["read", "write", "admin"],
225
"role": "administrator"
226
},
227
"user-token-456": {
228
"user_id": "user1",
229
"scope": ["read", "write"],
230
"role": "user"
231
},
232
"readonly-token-789": {
233
"user_id": "readonly",
234
"scope": ["read"],
235
"role": "readonly"
236
}
237
}
238
239
static_auth = StaticTokenVerifier(valid_tokens)
240
241
mcp = FastMCP(
242
name="Token Protected Server",
243
auth_provider=static_auth
244
)
245
246
@mcp.tool
247
def read_data() -> dict:
248
"""Requires 'read' scope."""
249
return {"data": "public information"}
250
251
@mcp.tool
252
def write_data(content: str) -> str:
253
"""Requires 'write' scope."""
254
# This would check token scope in practice
255
return f"Wrote: {content}"
256
257
mcp.run(transport="http", port=8080)
258
```
259
260
### OAuth Server Authentication
261
262
```python
263
from fastmcp import FastMCP
264
from fastmcp.server.auth import OAuthProvider
265
266
oauth_auth = OAuthProvider(
267
client_id="your-oauth-client-id",
268
client_secret="your-oauth-client-secret",
269
token_url="https://auth.example.com/oauth/token",
270
scope=["mcp:access"],
271
audience="mcp-server"
272
)
273
274
mcp = FastMCP(
275
name="OAuth Protected Server",
276
auth_provider=oauth_auth
277
)
278
279
@mcp.tool
280
async def oauth_protected_tool(query: str) -> str:
281
"""Tool that requires OAuth authentication."""
282
return f"Query result: {query}"
283
284
mcp.run(transport="http", port=8080)
285
```
286
287
### Remote Authentication Provider
288
289
```python
290
from fastmcp import FastMCP
291
from fastmcp.server.auth import RemoteAuthProvider
292
293
remote_auth = RemoteAuthProvider(
294
verify_url="https://auth-service.example.com/verify",
295
headers={
296
"X-Service-Key": "your-service-key",
297
"Content-Type": "application/json"
298
},
299
timeout=5.0
300
)
301
302
mcp = FastMCP(
303
name="Remote Auth Server",
304
auth_provider=remote_auth
305
)
306
307
@mcp.tool
308
def external_auth_tool(input_data: str) -> str:
309
"""Tool protected by external authentication service."""
310
return f"Processed with external auth: {input_data}"
311
312
mcp.run(transport="http", port=8080)
313
```
314
315
### Client Authentication Usage
316
317
```python
318
from fastmcp import Client
319
from fastmcp.client.auth import BearerAuth, OAuth
320
321
async def bearer_auth_client():
322
"""Client with bearer token authentication."""
323
auth = BearerAuth("your-bearer-token")
324
325
async with Client(
326
"http://secure-server.example.com/mcp",
327
auth=auth
328
) as client:
329
result = await client.call_tool("protected_operation", {"data": "test"})
330
return result.text
331
332
async def oauth_client():
333
"""Client with OAuth authentication."""
334
oauth = OAuth(
335
client_id="client-id",
336
client_secret="client-secret",
337
token_url="https://auth.example.com/token",
338
scope=["mcp:access"]
339
)
340
341
async with Client(
342
"http://oauth-server.example.com/mcp",
343
auth=oauth
344
) as client:
345
# OAuth token is automatically obtained and used
346
result = await client.call_tool("oauth_protected_tool", {"query": "hello"})
347
return result.text
348
349
async def manual_token_client():
350
"""Client with manually managed token."""
351
# Get token from your auth system
352
token = await get_auth_token_from_somewhere()
353
354
auth = BearerAuth(token)
355
356
async with Client(
357
"https://api.example.com/mcp",
358
auth=auth
359
) as client:
360
result = await client.call_tool("secure_api", {})
361
return result.text
362
```
363
364
### Scope-Based Authorization
365
366
```python
367
from fastmcp import FastMCP, Context
368
from fastmcp.server.auth import JWTVerifier
369
from fastmcp.server.dependencies import get_access_token
370
371
jwt_auth = JWTVerifier(
372
secret="jwt-secret",
373
algorithms=["HS256"]
374
)
375
376
mcp = FastMCP(
377
name="Scope Protected Server",
378
auth_provider=jwt_auth
379
)
380
381
def require_scope(required_scope: str):
382
"""Decorator to require specific scope."""
383
def decorator(func):
384
def wrapper(*args, **kwargs):
385
token = get_access_token()
386
if not token or not token.has_scope(required_scope):
387
raise PermissionError(f"Missing required scope: {required_scope}")
388
return func(*args, **kwargs)
389
return wrapper
390
return decorator
391
392
@mcp.tool
393
@require_scope("read")
394
def read_users() -> list[dict]:
395
"""Read user data - requires 'read' scope."""
396
return [
397
{"id": 1, "name": "Alice"},
398
{"id": 2, "name": "Bob"}
399
]
400
401
@mcp.tool
402
@require_scope("write")
403
def create_user(name: str, email: str) -> dict:
404
"""Create user - requires 'write' scope."""
405
return {
406
"id": 123,
407
"name": name,
408
"email": email,
409
"created": "2024-01-01T00:00:00Z"
410
}
411
412
@mcp.tool
413
@require_scope("admin")
414
def delete_user(user_id: int) -> str:
415
"""Delete user - requires 'admin' scope."""
416
return f"User {user_id} deleted"
417
418
@mcp.tool
419
async def get_user_info(ctx: Context) -> dict:
420
"""Get current user info from token."""
421
token = get_access_token()
422
if not token:
423
await ctx.error("No authentication token")
424
return {"error": "Not authenticated"}
425
426
return {
427
"user_id": token.user_id,
428
"token_type": token.token_type,
429
"scopes": token.scope,
430
"expires_at": token.expires_at.isoformat() if token.expires_at else None
431
}
432
433
mcp.run(transport="http", port=8080)
434
```
435
436
### Advanced Authentication Patterns
437
438
```python
439
from fastmcp import FastMCP, Context
440
from fastmcp.server.auth import AuthProvider, AccessToken
441
from fastmcp.server.dependencies import get_access_token, get_http_headers
442
import jwt
443
import httpx
444
from datetime import datetime, timezone
445
446
class CustomAuthProvider(AuthProvider):
447
"""Custom authentication provider with multiple methods."""
448
449
def __init__(self, api_keys: dict[str, dict], jwt_secret: str):
450
self.api_keys = api_keys
451
self.jwt_secret = jwt_secret
452
453
async def authenticate(self, request) -> AccessToken | None:
454
"""Authenticate using API key or JWT token."""
455
headers = getattr(request, 'headers', {})
456
457
# Try API key authentication
458
api_key = headers.get('x-api-key')
459
if api_key and api_key in self.api_keys:
460
key_info = self.api_keys[api_key]
461
return AccessToken(
462
token=api_key,
463
token_type="api_key",
464
user_id=key_info["user_id"],
465
scope=key_info["scope"],
466
metadata=key_info
467
)
468
469
# Try JWT authentication
470
auth_header = headers.get('authorization', '')
471
if auth_header.startswith('Bearer '):
472
token = auth_header[7:]
473
try:
474
payload = jwt.decode(
475
token,
476
self.jwt_secret,
477
algorithms=["HS256"]
478
)
479
480
expires_at = None
481
if 'exp' in payload:
482
expires_at = datetime.fromtimestamp(payload['exp'], timezone.utc)
483
484
return AccessToken(
485
token=token,
486
token_type="bearer",
487
user_id=payload.get('sub'),
488
scope=payload.get('scope', []),
489
expires_at=expires_at,
490
metadata=payload
491
)
492
except jwt.InvalidTokenError:
493
pass
494
495
return None
496
497
# Set up custom auth
498
api_keys = {
499
"key-123": {
500
"user_id": "service_account",
501
"scope": ["read", "write"],
502
"name": "Service Account"
503
}
504
}
505
506
custom_auth = CustomAuthProvider(api_keys, "jwt-secret")
507
508
mcp = FastMCP(
509
name="Multi-Auth Server",
510
auth_provider=custom_auth
511
)
512
513
@mcp.tool
514
async def auth_demo(ctx: Context) -> dict:
515
"""Demonstrate authentication information access."""
516
token = get_access_token()
517
headers = get_http_headers()
518
519
if not token:
520
await ctx.error("No authentication provided")
521
return {"error": "Authentication required"}
522
523
await ctx.info(f"Authenticated user: {token.user_id}")
524
525
return {
526
"authentication": {
527
"user_id": token.user_id,
528
"token_type": token.token_type,
529
"scopes": token.scope,
530
"is_expired": token.is_expired(),
531
"metadata": token.metadata
532
},
533
"request_info": {
534
"user_agent": headers.get("user-agent", "unknown"),
535
"ip_address": headers.get("x-forwarded-for", "unknown"),
536
"timestamp": datetime.now(timezone.utc).isoformat()
537
}
538
}
539
540
mcp.run(transport="http", port=8080)
541
```
542
543
### Authentication Error Handling
544
545
```python
546
from fastmcp import Client
547
from fastmcp.client.auth import BearerAuth
548
from fastmcp.exceptions import ClientError
549
import asyncio
550
551
async def robust_auth_client():
552
"""Client with authentication error handling."""
553
554
# Try with potentially expired token
555
auth = BearerAuth("potentially-expired-token")
556
557
try:
558
async with Client(
559
"https://secure-api.example.com/mcp",
560
auth=auth
561
) as client:
562
result = await client.call_tool("protected_tool", {})
563
return result.text
564
565
except ClientError as e:
566
if "401" in str(e) or "unauthorized" in str(e).lower():
567
print("Authentication failed - token may be expired")
568
569
# Refresh token and retry
570
new_token = await refresh_auth_token()
571
auth = BearerAuth(new_token)
572
573
async with Client(
574
"https://secure-api.example.com/mcp",
575
auth=auth
576
) as client:
577
result = await client.call_tool("protected_tool", {})
578
return result.text
579
else:
580
raise
581
582
async def refresh_auth_token() -> str:
583
"""Refresh authentication token."""
584
# Implementation would depend on your auth system
585
async with httpx.AsyncClient() as client:
586
response = await client.post(
587
"https://auth.example.com/refresh",
588
json={"refresh_token": "stored-refresh-token"}
589
)
590
return response.json()["access_token"]
591
```
592
593
## Security Best Practices
594
595
### Token Management
596
597
```python
598
# Store tokens securely
599
import os
600
from pathlib import Path
601
602
def get_secure_token():
603
"""Get token from secure storage."""
604
# From environment variable
605
token = os.getenv("MCP_AUTH_TOKEN")
606
if token:
607
return token
608
609
# From secure file
610
token_file = Path.home() / ".config" / "mcp" / "token"
611
if token_file.exists():
612
return token_file.read_text().strip()
613
614
raise ValueError("No authentication token found")
615
616
# Use secure token
617
auth = BearerAuth(get_secure_token())
618
```
619
620
### Scope Validation
621
622
```python
623
def validate_scopes(required_scopes: list[str]):
624
"""Validate that token has all required scopes."""
625
token = get_access_token()
626
if not token:
627
raise PermissionError("Authentication required")
628
629
missing_scopes = [
630
scope for scope in required_scopes
631
if not token.has_scope(scope)
632
]
633
634
if missing_scopes:
635
raise PermissionError(f"Missing scopes: {', '.join(missing_scopes)}")
636
```