The fast, Pythonic way to build MCP servers and clients with minimal boilerplate code.
—
Comprehensive authentication system supporting multiple providers and tokens for secure server and client connections. FastMCP provides flexible authentication mechanisms for both server protection and client authorization.
Authentication providers for securing FastMCP servers against unauthorized access.
class AuthProvider:
"""Base class for authentication providers."""
async def authenticate(self, request: Any) -> AccessToken | None:
"""
Authenticate a request and return access token.
Parameters:
- request: Request object to authenticate
Returns:
AccessToken if authentication successful, None otherwise
"""
class OAuthProvider(AuthProvider):
"""OAuth 2.0 authentication provider."""
def __init__(
self,
client_id: str,
client_secret: str,
token_url: str,
scope: list[str] | None = None,
audience: str | None = None
):
"""
Initialize OAuth provider.
Parameters:
- client_id: OAuth client ID
- client_secret: OAuth client secret
- token_url: Token endpoint URL
- scope: Required scopes
- audience: Token audience
"""
class JWTVerifier(AuthProvider):
"""JWT token verification provider."""
def __init__(
self,
secret: str,
algorithms: list[str] = ["HS256"],
audience: str | None = None,
issuer: str | None = None
):
"""
Initialize JWT verifier.
Parameters:
- secret: JWT signing secret
- algorithms: Allowed signing algorithms
- audience: Expected token audience
- issuer: Expected token issuer
"""
class StaticTokenVerifier(AuthProvider):
"""Static token verification provider."""
def __init__(
self,
valid_tokens: dict[str, dict] | list[str],
token_type: str = "bearer"
):
"""
Initialize static token verifier.
Parameters:
- valid_tokens: Dictionary of token->metadata or list of valid tokens
- token_type: Type of tokens to accept
"""
class RemoteAuthProvider(AuthProvider):
"""Remote authentication provider using external service."""
def __init__(
self,
verify_url: str,
headers: dict[str, str] | None = None,
timeout: float = 10.0
):
"""
Initialize remote auth provider.
Parameters:
- verify_url: URL for token verification
- headers: Additional headers for verification requests
- timeout: Request timeout
"""Authentication classes for clients connecting to secured servers.
class BearerAuth:
"""Bearer token authentication for clients."""
def __init__(self, token: str):
"""
Initialize bearer authentication.
Parameters:
- token: Bearer token string
"""
class OAuth:
"""OAuth 2.0 authentication for clients."""
def __init__(
self,
client_id: str,
client_secret: str,
token_url: str,
scope: list[str] | None = None,
audience: str | None = None
):
"""
Initialize OAuth authentication.
Parameters:
- client_id: OAuth client ID
- client_secret: OAuth client secret
- token_url: Token endpoint URL
- scope: Requested scopes
- audience: Token audience
"""
async def get_token(self) -> str:
"""
Get access token from OAuth provider.
Returns:
Access token string
"""Token representation containing authentication information and metadata.
class AccessToken:
def __init__(
self,
token: str,
token_type: str = "bearer",
expires_at: datetime | None = None,
scope: list[str] | None = None,
user_id: str | None = None,
metadata: dict | None = None
):
"""
Access token representation.
Parameters:
- token: Token string
- token_type: Type of token (bearer, etc.)
- expires_at: Token expiration time
- scope: Token scopes/permissions
- user_id: Associated user ID
- metadata: Additional token metadata
"""
def is_expired(self) -> bool:
"""Check if token is expired."""
def has_scope(self, required_scope: str) -> bool:
"""Check if token has required scope."""from fastmcp import FastMCP
from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier, OAuthProvider
# JWT Authentication
jwt_auth = JWTVerifier(
secret="your-jwt-secret-key",
algorithms=["HS256"],
audience="fastmcp-server",
issuer="your-auth-service"
)
mcp = FastMCP(
name="Secure Server",
auth_provider=jwt_auth
)
@mcp.tool
def protected_operation(data: str) -> str:
"""This tool requires authentication."""
return f"Processed: {data}"
# Run with authentication
mcp.run(transport="http", port=8080)from fastmcp import FastMCP
from fastmcp.server.auth import StaticTokenVerifier
# Define valid tokens with metadata
valid_tokens = {
"admin-token-123": {
"user_id": "admin",
"scope": ["read", "write", "admin"],
"role": "administrator"
},
"user-token-456": {
"user_id": "user1",
"scope": ["read", "write"],
"role": "user"
},
"readonly-token-789": {
"user_id": "readonly",
"scope": ["read"],
"role": "readonly"
}
}
static_auth = StaticTokenVerifier(valid_tokens)
mcp = FastMCP(
name="Token Protected Server",
auth_provider=static_auth
)
@mcp.tool
def read_data() -> dict:
"""Requires 'read' scope."""
return {"data": "public information"}
@mcp.tool
def write_data(content: str) -> str:
"""Requires 'write' scope."""
# This would check token scope in practice
return f"Wrote: {content}"
mcp.run(transport="http", port=8080)from fastmcp import FastMCP
from fastmcp.server.auth import OAuthProvider
oauth_auth = OAuthProvider(
client_id="your-oauth-client-id",
client_secret="your-oauth-client-secret",
token_url="https://auth.example.com/oauth/token",
scope=["mcp:access"],
audience="mcp-server"
)
mcp = FastMCP(
name="OAuth Protected Server",
auth_provider=oauth_auth
)
@mcp.tool
async def oauth_protected_tool(query: str) -> str:
"""Tool that requires OAuth authentication."""
return f"Query result: {query}"
mcp.run(transport="http", port=8080)from fastmcp import FastMCP
from fastmcp.server.auth import RemoteAuthProvider
remote_auth = RemoteAuthProvider(
verify_url="https://auth-service.example.com/verify",
headers={
"X-Service-Key": "your-service-key",
"Content-Type": "application/json"
},
timeout=5.0
)
mcp = FastMCP(
name="Remote Auth Server",
auth_provider=remote_auth
)
@mcp.tool
def external_auth_tool(input_data: str) -> str:
"""Tool protected by external authentication service."""
return f"Processed with external auth: {input_data}"
mcp.run(transport="http", port=8080)from fastmcp import Client
from fastmcp.client.auth import BearerAuth, OAuth
async def bearer_auth_client():
"""Client with bearer token authentication."""
auth = BearerAuth("your-bearer-token")
async with Client(
"http://secure-server.example.com/mcp",
auth=auth
) as client:
result = await client.call_tool("protected_operation", {"data": "test"})
return result.text
async def oauth_client():
"""Client with OAuth authentication."""
oauth = OAuth(
client_id="client-id",
client_secret="client-secret",
token_url="https://auth.example.com/token",
scope=["mcp:access"]
)
async with Client(
"http://oauth-server.example.com/mcp",
auth=oauth
) as client:
# OAuth token is automatically obtained and used
result = await client.call_tool("oauth_protected_tool", {"query": "hello"})
return result.text
async def manual_token_client():
"""Client with manually managed token."""
# Get token from your auth system
token = await get_auth_token_from_somewhere()
auth = BearerAuth(token)
async with Client(
"https://api.example.com/mcp",
auth=auth
) as client:
result = await client.call_tool("secure_api", {})
return result.textfrom fastmcp import FastMCP, Context
from fastmcp.server.auth import JWTVerifier
from fastmcp.server.dependencies import get_access_token
jwt_auth = JWTVerifier(
secret="jwt-secret",
algorithms=["HS256"]
)
mcp = FastMCP(
name="Scope Protected Server",
auth_provider=jwt_auth
)
def require_scope(required_scope: str):
"""Decorator to require specific scope."""
def decorator(func):
def wrapper(*args, **kwargs):
token = get_access_token()
if not token or not token.has_scope(required_scope):
raise PermissionError(f"Missing required scope: {required_scope}")
return func(*args, **kwargs)
return wrapper
return decorator
@mcp.tool
@require_scope("read")
def read_users() -> list[dict]:
"""Read user data - requires 'read' scope."""
return [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
]
@mcp.tool
@require_scope("write")
def create_user(name: str, email: str) -> dict:
"""Create user - requires 'write' scope."""
return {
"id": 123,
"name": name,
"email": email,
"created": "2024-01-01T00:00:00Z"
}
@mcp.tool
@require_scope("admin")
def delete_user(user_id: int) -> str:
"""Delete user - requires 'admin' scope."""
return f"User {user_id} deleted"
@mcp.tool
async def get_user_info(ctx: Context) -> dict:
"""Get current user info from token."""
token = get_access_token()
if not token:
await ctx.error("No authentication token")
return {"error": "Not authenticated"}
return {
"user_id": token.user_id,
"token_type": token.token_type,
"scopes": token.scope,
"expires_at": token.expires_at.isoformat() if token.expires_at else None
}
mcp.run(transport="http", port=8080)from fastmcp import FastMCP, Context
from fastmcp.server.auth import AuthProvider, AccessToken
from fastmcp.server.dependencies import get_access_token, get_http_headers
import jwt
import httpx
from datetime import datetime, timezone
class CustomAuthProvider(AuthProvider):
"""Custom authentication provider with multiple methods."""
def __init__(self, api_keys: dict[str, dict], jwt_secret: str):
self.api_keys = api_keys
self.jwt_secret = jwt_secret
async def authenticate(self, request) -> AccessToken | None:
"""Authenticate using API key or JWT token."""
headers = getattr(request, 'headers', {})
# Try API key authentication
api_key = headers.get('x-api-key')
if api_key and api_key in self.api_keys:
key_info = self.api_keys[api_key]
return AccessToken(
token=api_key,
token_type="api_key",
user_id=key_info["user_id"],
scope=key_info["scope"],
metadata=key_info
)
# Try JWT authentication
auth_header = headers.get('authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
try:
payload = jwt.decode(
token,
self.jwt_secret,
algorithms=["HS256"]
)
expires_at = None
if 'exp' in payload:
expires_at = datetime.fromtimestamp(payload['exp'], timezone.utc)
return AccessToken(
token=token,
token_type="bearer",
user_id=payload.get('sub'),
scope=payload.get('scope', []),
expires_at=expires_at,
metadata=payload
)
except jwt.InvalidTokenError:
pass
return None
# Set up custom auth
api_keys = {
"key-123": {
"user_id": "service_account",
"scope": ["read", "write"],
"name": "Service Account"
}
}
custom_auth = CustomAuthProvider(api_keys, "jwt-secret")
mcp = FastMCP(
name="Multi-Auth Server",
auth_provider=custom_auth
)
@mcp.tool
async def auth_demo(ctx: Context) -> dict:
"""Demonstrate authentication information access."""
token = get_access_token()
headers = get_http_headers()
if not token:
await ctx.error("No authentication provided")
return {"error": "Authentication required"}
await ctx.info(f"Authenticated user: {token.user_id}")
return {
"authentication": {
"user_id": token.user_id,
"token_type": token.token_type,
"scopes": token.scope,
"is_expired": token.is_expired(),
"metadata": token.metadata
},
"request_info": {
"user_agent": headers.get("user-agent", "unknown"),
"ip_address": headers.get("x-forwarded-for", "unknown"),
"timestamp": datetime.now(timezone.utc).isoformat()
}
}
mcp.run(transport="http", port=8080)from fastmcp import Client
from fastmcp.client.auth import BearerAuth
from fastmcp.exceptions import ClientError
import asyncio
async def robust_auth_client():
"""Client with authentication error handling."""
# Try with potentially expired token
auth = BearerAuth("potentially-expired-token")
try:
async with Client(
"https://secure-api.example.com/mcp",
auth=auth
) as client:
result = await client.call_tool("protected_tool", {})
return result.text
except ClientError as e:
if "401" in str(e) or "unauthorized" in str(e).lower():
print("Authentication failed - token may be expired")
# Refresh token and retry
new_token = await refresh_auth_token()
auth = BearerAuth(new_token)
async with Client(
"https://secure-api.example.com/mcp",
auth=auth
) as client:
result = await client.call_tool("protected_tool", {})
return result.text
else:
raise
async def refresh_auth_token() -> str:
"""Refresh authentication token."""
# Implementation would depend on your auth system
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/refresh",
json={"refresh_token": "stored-refresh-token"}
)
return response.json()["access_token"]# Store tokens securely
import os
from pathlib import Path
def get_secure_token():
"""Get token from secure storage."""
# From environment variable
token = os.getenv("MCP_AUTH_TOKEN")
if token:
return token
# From secure file
token_file = Path.home() / ".config" / "mcp" / "token"
if token_file.exists():
return token_file.read_text().strip()
raise ValueError("No authentication token found")
# Use secure token
auth = BearerAuth(get_secure_token())def validate_scopes(required_scopes: list[str]):
"""Validate that token has all required scopes."""
token = get_access_token()
if not token:
raise PermissionError("Authentication required")
missing_scopes = [
scope for scope in required_scopes
if not token.has_scope(scope)
]
if missing_scopes:
raise PermissionError(f"Missing scopes: {', '.join(missing_scopes)}")Install with Tessl CLI
npx tessl i tessl/pypi-fastmcp