Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
pypi-fastmcp
Describes: pypi/fastmcp
- Description
- The fast, Pythonic way to build MCP servers and clients with minimal boilerplate code.
- Author
- tessl
- Last updated
authentication.md docs/
1# Authentication23Comprehensive authentication system supporting multiple providers and tokens for secure server and client connections. FastMCP provides flexible authentication mechanisms for both server protection and client authorization.45## Capabilities67### Server Authentication Providers89Authentication providers for securing FastMCP servers against unauthorized access.1011```python { .api }12class AuthProvider:13"""Base class for authentication providers."""1415async def authenticate(self, request: Any) -> AccessToken | None:16"""17Authenticate a request and return access token.1819Parameters:20- request: Request object to authenticate2122Returns:23AccessToken if authentication successful, None otherwise24"""2526class OAuthProvider(AuthProvider):27"""OAuth 2.0 authentication provider."""2829def __init__(30self,31client_id: str,32client_secret: str,33token_url: str,34scope: list[str] | None = None,35audience: str | None = None36):37"""38Initialize OAuth provider.3940Parameters:41- client_id: OAuth client ID42- client_secret: OAuth client secret43- token_url: Token endpoint URL44- scope: Required scopes45- audience: Token audience46"""4748class JWTVerifier(AuthProvider):49"""JWT token verification provider."""5051def __init__(52self,53secret: str,54algorithms: list[str] = ["HS256"],55audience: str | None = None,56issuer: str | None = None57):58"""59Initialize JWT verifier.6061Parameters:62- secret: JWT signing secret63- algorithms: Allowed signing algorithms64- audience: Expected token audience65- issuer: Expected token issuer66"""6768class StaticTokenVerifier(AuthProvider):69"""Static token verification provider."""7071def __init__(72self,73valid_tokens: dict[str, dict] | list[str],74token_type: str = "bearer"75):76"""77Initialize static token verifier.7879Parameters:80- valid_tokens: Dictionary of token->metadata or list of valid tokens81- token_type: Type of tokens to accept82"""8384class RemoteAuthProvider(AuthProvider):85"""Remote authentication provider using external service."""8687def __init__(88self,89verify_url: str,90headers: dict[str, str] | None = None,91timeout: float = 10.092):93"""94Initialize remote auth provider.9596Parameters:97- verify_url: URL for token verification98- headers: Additional headers for verification requests99- timeout: Request timeout100"""101```102103### Client Authentication104105Authentication classes for clients connecting to secured servers.106107```python { .api }108class BearerAuth:109"""Bearer token authentication for clients."""110111def __init__(self, token: str):112"""113Initialize bearer authentication.114115Parameters:116- token: Bearer token string117"""118119class OAuth:120"""OAuth 2.0 authentication for clients."""121122def __init__(123self,124client_id: str,125client_secret: str,126token_url: str,127scope: list[str] | None = None,128audience: str | None = None129):130"""131Initialize OAuth authentication.132133Parameters:134- client_id: OAuth client ID135- client_secret: OAuth client secret136- token_url: Token endpoint URL137- scope: Requested scopes138- audience: Token audience139"""140141async def get_token(self) -> str:142"""143Get access token from OAuth provider.144145Returns:146Access token string147"""148```149150### Access Token Model151152Token representation containing authentication information and metadata.153154```python { .api }155class AccessToken:156def __init__(157self,158token: str,159token_type: str = "bearer",160expires_at: datetime | None = None,161scope: list[str] | None = None,162user_id: str | None = None,163metadata: dict | None = None164):165"""166Access token representation.167168Parameters:169- token: Token string170- token_type: Type of token (bearer, etc.)171- expires_at: Token expiration time172- scope: Token scopes/permissions173- user_id: Associated user ID174- metadata: Additional token metadata175"""176177def is_expired(self) -> bool:178"""Check if token is expired."""179180def has_scope(self, required_scope: str) -> bool:181"""Check if token has required scope."""182```183184## Usage Examples185186### Server Authentication Setup187188```python189from fastmcp import FastMCP190from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier, OAuthProvider191192# JWT Authentication193jwt_auth = JWTVerifier(194secret="your-jwt-secret-key",195algorithms=["HS256"],196audience="fastmcp-server",197issuer="your-auth-service"198)199200mcp = FastMCP(201name="Secure Server",202auth_provider=jwt_auth203)204205@mcp.tool206def protected_operation(data: str) -> str:207"""This tool requires authentication."""208return f"Processed: {data}"209210# Run with authentication211mcp.run(transport="http", port=8080)212```213214### Static Token Authentication215216```python217from fastmcp import FastMCP218from fastmcp.server.auth import StaticTokenVerifier219220# Define valid tokens with metadata221valid_tokens = {222"admin-token-123": {223"user_id": "admin",224"scope": ["read", "write", "admin"],225"role": "administrator"226},227"user-token-456": {228"user_id": "user1",229"scope": ["read", "write"],230"role": "user"231},232"readonly-token-789": {233"user_id": "readonly",234"scope": ["read"],235"role": "readonly"236}237}238239static_auth = StaticTokenVerifier(valid_tokens)240241mcp = FastMCP(242name="Token Protected Server",243auth_provider=static_auth244)245246@mcp.tool247def read_data() -> dict:248"""Requires 'read' scope."""249return {"data": "public information"}250251@mcp.tool252def write_data(content: str) -> str:253"""Requires 'write' scope."""254# This would check token scope in practice255return f"Wrote: {content}"256257mcp.run(transport="http", port=8080)258```259260### OAuth Server Authentication261262```python263from fastmcp import FastMCP264from fastmcp.server.auth import OAuthProvider265266oauth_auth = OAuthProvider(267client_id="your-oauth-client-id",268client_secret="your-oauth-client-secret",269token_url="https://auth.example.com/oauth/token",270scope=["mcp:access"],271audience="mcp-server"272)273274mcp = FastMCP(275name="OAuth Protected Server",276auth_provider=oauth_auth277)278279@mcp.tool280async def oauth_protected_tool(query: str) -> str:281"""Tool that requires OAuth authentication."""282return f"Query result: {query}"283284mcp.run(transport="http", port=8080)285```286287### Remote Authentication Provider288289```python290from fastmcp import FastMCP291from fastmcp.server.auth import RemoteAuthProvider292293remote_auth = RemoteAuthProvider(294verify_url="https://auth-service.example.com/verify",295headers={296"X-Service-Key": "your-service-key",297"Content-Type": "application/json"298},299timeout=5.0300)301302mcp = FastMCP(303name="Remote Auth Server",304auth_provider=remote_auth305)306307@mcp.tool308def external_auth_tool(input_data: str) -> str:309"""Tool protected by external authentication service."""310return f"Processed with external auth: {input_data}"311312mcp.run(transport="http", port=8080)313```314315### Client Authentication Usage316317```python318from fastmcp import Client319from fastmcp.client.auth import BearerAuth, OAuth320321async def bearer_auth_client():322"""Client with bearer token authentication."""323auth = BearerAuth("your-bearer-token")324325async with Client(326"http://secure-server.example.com/mcp",327auth=auth328) as client:329result = await client.call_tool("protected_operation", {"data": "test"})330return result.text331332async def oauth_client():333"""Client with OAuth authentication."""334oauth = OAuth(335client_id="client-id",336client_secret="client-secret",337token_url="https://auth.example.com/token",338scope=["mcp:access"]339)340341async with Client(342"http://oauth-server.example.com/mcp",343auth=oauth344) as client:345# OAuth token is automatically obtained and used346result = await client.call_tool("oauth_protected_tool", {"query": "hello"})347return result.text348349async def manual_token_client():350"""Client with manually managed token."""351# Get token from your auth system352token = await get_auth_token_from_somewhere()353354auth = BearerAuth(token)355356async with Client(357"https://api.example.com/mcp",358auth=auth359) as client:360result = await client.call_tool("secure_api", {})361return result.text362```363364### Scope-Based Authorization365366```python367from fastmcp import FastMCP, Context368from fastmcp.server.auth import JWTVerifier369from fastmcp.server.dependencies import get_access_token370371jwt_auth = JWTVerifier(372secret="jwt-secret",373algorithms=["HS256"]374)375376mcp = FastMCP(377name="Scope Protected Server",378auth_provider=jwt_auth379)380381def require_scope(required_scope: str):382"""Decorator to require specific scope."""383def decorator(func):384def wrapper(*args, **kwargs):385token = get_access_token()386if not token or not token.has_scope(required_scope):387raise PermissionError(f"Missing required scope: {required_scope}")388return func(*args, **kwargs)389return wrapper390return decorator391392@mcp.tool393@require_scope("read")394def read_users() -> list[dict]:395"""Read user data - requires 'read' scope."""396return [397{"id": 1, "name": "Alice"},398{"id": 2, "name": "Bob"}399]400401@mcp.tool402@require_scope("write")403def create_user(name: str, email: str) -> dict:404"""Create user - requires 'write' scope."""405return {406"id": 123,407"name": name,408"email": email,409"created": "2024-01-01T00:00:00Z"410}411412@mcp.tool413@require_scope("admin")414def delete_user(user_id: int) -> str:415"""Delete user - requires 'admin' scope."""416return f"User {user_id} deleted"417418@mcp.tool419async def get_user_info(ctx: Context) -> dict:420"""Get current user info from token."""421token = get_access_token()422if not token:423await ctx.error("No authentication token")424return {"error": "Not authenticated"}425426return {427"user_id": token.user_id,428"token_type": token.token_type,429"scopes": token.scope,430"expires_at": token.expires_at.isoformat() if token.expires_at else None431}432433mcp.run(transport="http", port=8080)434```435436### Advanced Authentication Patterns437438```python439from fastmcp import FastMCP, Context440from fastmcp.server.auth import AuthProvider, AccessToken441from fastmcp.server.dependencies import get_access_token, get_http_headers442import jwt443import httpx444from datetime import datetime, timezone445446class CustomAuthProvider(AuthProvider):447"""Custom authentication provider with multiple methods."""448449def __init__(self, api_keys: dict[str, dict], jwt_secret: str):450self.api_keys = api_keys451self.jwt_secret = jwt_secret452453async def authenticate(self, request) -> AccessToken | None:454"""Authenticate using API key or JWT token."""455headers = getattr(request, 'headers', {})456457# Try API key authentication458api_key = headers.get('x-api-key')459if api_key and api_key in self.api_keys:460key_info = self.api_keys[api_key]461return AccessToken(462token=api_key,463token_type="api_key",464user_id=key_info["user_id"],465scope=key_info["scope"],466metadata=key_info467)468469# Try JWT authentication470auth_header = headers.get('authorization', '')471if auth_header.startswith('Bearer '):472token = auth_header[7:]473try:474payload = jwt.decode(475token,476self.jwt_secret,477algorithms=["HS256"]478)479480expires_at = None481if 'exp' in payload:482expires_at = datetime.fromtimestamp(payload['exp'], timezone.utc)483484return AccessToken(485token=token,486token_type="bearer",487user_id=payload.get('sub'),488scope=payload.get('scope', []),489expires_at=expires_at,490metadata=payload491)492except jwt.InvalidTokenError:493pass494495return None496497# Set up custom auth498api_keys = {499"key-123": {500"user_id": "service_account",501"scope": ["read", "write"],502"name": "Service Account"503}504}505506custom_auth = CustomAuthProvider(api_keys, "jwt-secret")507508mcp = FastMCP(509name="Multi-Auth Server",510auth_provider=custom_auth511)512513@mcp.tool514async def auth_demo(ctx: Context) -> dict:515"""Demonstrate authentication information access."""516token = get_access_token()517headers = get_http_headers()518519if not token:520await ctx.error("No authentication provided")521return {"error": "Authentication required"}522523await ctx.info(f"Authenticated user: {token.user_id}")524525return {526"authentication": {527"user_id": token.user_id,528"token_type": token.token_type,529"scopes": token.scope,530"is_expired": token.is_expired(),531"metadata": token.metadata532},533"request_info": {534"user_agent": headers.get("user-agent", "unknown"),535"ip_address": headers.get("x-forwarded-for", "unknown"),536"timestamp": datetime.now(timezone.utc).isoformat()537}538}539540mcp.run(transport="http", port=8080)541```542543### Authentication Error Handling544545```python546from fastmcp import Client547from fastmcp.client.auth import BearerAuth548from fastmcp.exceptions import ClientError549import asyncio550551async def robust_auth_client():552"""Client with authentication error handling."""553554# Try with potentially expired token555auth = BearerAuth("potentially-expired-token")556557try:558async with Client(559"https://secure-api.example.com/mcp",560auth=auth561) as client:562result = await client.call_tool("protected_tool", {})563return result.text564565except ClientError as e:566if "401" in str(e) or "unauthorized" in str(e).lower():567print("Authentication failed - token may be expired")568569# Refresh token and retry570new_token = await refresh_auth_token()571auth = BearerAuth(new_token)572573async with Client(574"https://secure-api.example.com/mcp",575auth=auth576) as client:577result = await client.call_tool("protected_tool", {})578return result.text579else:580raise581582async def refresh_auth_token() -> str:583"""Refresh authentication token."""584# Implementation would depend on your auth system585async with httpx.AsyncClient() as client:586response = await client.post(587"https://auth.example.com/refresh",588json={"refresh_token": "stored-refresh-token"}589)590return response.json()["access_token"]591```592593## Security Best Practices594595### Token Management596597```python598# Store tokens securely599import os600from pathlib import Path601602def get_secure_token():603"""Get token from secure storage."""604# From environment variable605token = os.getenv("MCP_AUTH_TOKEN")606if token:607return token608609# From secure file610token_file = Path.home() / ".config" / "mcp" / "token"611if token_file.exists():612return token_file.read_text().strip()613614raise ValueError("No authentication token found")615616# Use secure token617auth = BearerAuth(get_secure_token())618```619620### Scope Validation621622```python623def validate_scopes(required_scopes: list[str]):624"""Validate that token has all required scopes."""625token = get_access_token()626if not token:627raise PermissionError("Authentication required")628629missing_scopes = [630scope for scope in required_scopes631if not token.has_scope(scope)632]633634if missing_scopes:635raise PermissionError(f"Missing scopes: {', '.join(missing_scopes)}")636```