CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fastmcp

The fast, Pythonic way to build MCP servers and clients with minimal boilerplate code.

Pending
Overview
Eval results
Files

authentication.mddocs/

Authentication

Comprehensive authentication system supporting multiple providers and tokens for secure server and client connections. FastMCP provides flexible authentication mechanisms for both server protection and client authorization.

Capabilities

Server Authentication Providers

Authentication providers for securing FastMCP servers against unauthorized access.

class AuthProvider:
    """Base class for authentication providers."""
    
    async def authenticate(self, request: Any) -> AccessToken | None:
        """
        Authenticate a request and return access token.
        
        Parameters:
        - request: Request object to authenticate
        
        Returns:
        AccessToken if authentication successful, None otherwise
        """

class OAuthProvider(AuthProvider):
    """OAuth 2.0 authentication provider."""
    
    def __init__(
        self,
        client_id: str,
        client_secret: str,
        token_url: str,
        scope: list[str] | None = None,
        audience: str | None = None
    ):
        """
        Initialize OAuth provider.
        
        Parameters:
        - client_id: OAuth client ID
        - client_secret: OAuth client secret
        - token_url: Token endpoint URL
        - scope: Required scopes
        - audience: Token audience
        """

class JWTVerifier(AuthProvider):
    """JWT token verification provider."""
    
    def __init__(
        self,
        secret: str,
        algorithms: list[str] = ["HS256"],
        audience: str | None = None,
        issuer: str | None = None
    ):
        """
        Initialize JWT verifier.
        
        Parameters:
        - secret: JWT signing secret
        - algorithms: Allowed signing algorithms
        - audience: Expected token audience
        - issuer: Expected token issuer
        """

class StaticTokenVerifier(AuthProvider):
    """Static token verification provider."""
    
    def __init__(
        self,
        valid_tokens: dict[str, dict] | list[str],
        token_type: str = "bearer"
    ):
        """
        Initialize static token verifier.
        
        Parameters:
        - valid_tokens: Dictionary of token->metadata or list of valid tokens
        - token_type: Type of tokens to accept
        """

class RemoteAuthProvider(AuthProvider):
    """Remote authentication provider using external service."""
    
    def __init__(
        self,
        verify_url: str,
        headers: dict[str, str] | None = None,
        timeout: float = 10.0
    ):
        """
        Initialize remote auth provider.
        
        Parameters:
        - verify_url: URL for token verification
        - headers: Additional headers for verification requests
        - timeout: Request timeout
        """

Client Authentication

Authentication classes for clients connecting to secured servers.

class BearerAuth:
    """Bearer token authentication for clients."""
    
    def __init__(self, token: str):
        """
        Initialize bearer authentication.
        
        Parameters:
        - token: Bearer token string
        """

class OAuth:
    """OAuth 2.0 authentication for clients."""
    
    def __init__(
        self,
        client_id: str,
        client_secret: str,
        token_url: str,
        scope: list[str] | None = None,
        audience: str | None = None
    ):
        """
        Initialize OAuth authentication.
        
        Parameters:
        - client_id: OAuth client ID
        - client_secret: OAuth client secret
        - token_url: Token endpoint URL
        - scope: Requested scopes
        - audience: Token audience
        """
    
    async def get_token(self) -> str:
        """
        Get access token from OAuth provider.
        
        Returns:
        Access token string
        """

Access Token Model

Token representation containing authentication information and metadata.

class AccessToken:
    def __init__(
        self,
        token: str,
        token_type: str = "bearer",
        expires_at: datetime | None = None,
        scope: list[str] | None = None,
        user_id: str | None = None,
        metadata: dict | None = None
    ):
        """
        Access token representation.
        
        Parameters:
        - token: Token string
        - token_type: Type of token (bearer, etc.)
        - expires_at: Token expiration time
        - scope: Token scopes/permissions
        - user_id: Associated user ID
        - metadata: Additional token metadata
        """
    
    def is_expired(self) -> bool:
        """Check if token is expired."""
    
    def has_scope(self, required_scope: str) -> bool:
        """Check if token has required scope."""

Usage Examples

Server Authentication Setup

from fastmcp import FastMCP
from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier, OAuthProvider

# JWT Authentication
jwt_auth = JWTVerifier(
    secret="your-jwt-secret-key",
    algorithms=["HS256"],
    audience="fastmcp-server",
    issuer="your-auth-service"
)

mcp = FastMCP(
    name="Secure Server",
    auth_provider=jwt_auth
)

@mcp.tool
def protected_operation(data: str) -> str:
    """This tool requires authentication."""
    return f"Processed: {data}"

# Run with authentication
mcp.run(transport="http", port=8080)

Static Token Authentication

from fastmcp import FastMCP
from fastmcp.server.auth import StaticTokenVerifier

# Define valid tokens with metadata
valid_tokens = {
    "admin-token-123": {
        "user_id": "admin",
        "scope": ["read", "write", "admin"],
        "role": "administrator"
    },
    "user-token-456": {
        "user_id": "user1", 
        "scope": ["read", "write"],
        "role": "user"
    },
    "readonly-token-789": {
        "user_id": "readonly",
        "scope": ["read"],
        "role": "readonly"
    }
}

static_auth = StaticTokenVerifier(valid_tokens)

mcp = FastMCP(
    name="Token Protected Server",
    auth_provider=static_auth
)

@mcp.tool
def read_data() -> dict:
    """Requires 'read' scope."""
    return {"data": "public information"}

@mcp.tool  
def write_data(content: str) -> str:
    """Requires 'write' scope."""
    # This would check token scope in practice
    return f"Wrote: {content}"

mcp.run(transport="http", port=8080)

OAuth Server Authentication

from fastmcp import FastMCP
from fastmcp.server.auth import OAuthProvider

oauth_auth = OAuthProvider(
    client_id="your-oauth-client-id",
    client_secret="your-oauth-client-secret", 
    token_url="https://auth.example.com/oauth/token",
    scope=["mcp:access"],
    audience="mcp-server"
)

mcp = FastMCP(
    name="OAuth Protected Server",
    auth_provider=oauth_auth
)

@mcp.tool
async def oauth_protected_tool(query: str) -> str:
    """Tool that requires OAuth authentication."""
    return f"Query result: {query}"

mcp.run(transport="http", port=8080)

Remote Authentication Provider

from fastmcp import FastMCP
from fastmcp.server.auth import RemoteAuthProvider

remote_auth = RemoteAuthProvider(
    verify_url="https://auth-service.example.com/verify",
    headers={
        "X-Service-Key": "your-service-key",
        "Content-Type": "application/json"
    },
    timeout=5.0
)

mcp = FastMCP(
    name="Remote Auth Server",
    auth_provider=remote_auth
)

@mcp.tool
def external_auth_tool(input_data: str) -> str:
    """Tool protected by external authentication service."""
    return f"Processed with external auth: {input_data}"

mcp.run(transport="http", port=8080)

Client Authentication Usage

from fastmcp import Client
from fastmcp.client.auth import BearerAuth, OAuth

async def bearer_auth_client():
    """Client with bearer token authentication."""
    auth = BearerAuth("your-bearer-token")
    
    async with Client(
        "http://secure-server.example.com/mcp",
        auth=auth
    ) as client:
        result = await client.call_tool("protected_operation", {"data": "test"})
        return result.text

async def oauth_client():
    """Client with OAuth authentication."""
    oauth = OAuth(
        client_id="client-id",
        client_secret="client-secret",
        token_url="https://auth.example.com/token",
        scope=["mcp:access"]
    )
    
    async with Client(
        "http://oauth-server.example.com/mcp",
        auth=oauth
    ) as client:
        # OAuth token is automatically obtained and used
        result = await client.call_tool("oauth_protected_tool", {"query": "hello"})
        return result.text

async def manual_token_client():
    """Client with manually managed token."""
    # Get token from your auth system
    token = await get_auth_token_from_somewhere()
    
    auth = BearerAuth(token)
    
    async with Client(
        "https://api.example.com/mcp", 
        auth=auth
    ) as client:
        result = await client.call_tool("secure_api", {})
        return result.text

Scope-Based Authorization

from fastmcp import FastMCP, Context
from fastmcp.server.auth import JWTVerifier
from fastmcp.server.dependencies import get_access_token

jwt_auth = JWTVerifier(
    secret="jwt-secret",
    algorithms=["HS256"]
)

mcp = FastMCP(
    name="Scope Protected Server",
    auth_provider=jwt_auth
)

def require_scope(required_scope: str):
    """Decorator to require specific scope."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            token = get_access_token()
            if not token or not token.has_scope(required_scope):
                raise PermissionError(f"Missing required scope: {required_scope}")
            return func(*args, **kwargs)
        return wrapper
    return decorator

@mcp.tool
@require_scope("read")
def read_users() -> list[dict]:
    """Read user data - requires 'read' scope."""
    return [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"}
    ]

@mcp.tool
@require_scope("write")
def create_user(name: str, email: str) -> dict:
    """Create user - requires 'write' scope."""
    return {
        "id": 123,
        "name": name,
        "email": email,
        "created": "2024-01-01T00:00:00Z"
    }

@mcp.tool
@require_scope("admin")
def delete_user(user_id: int) -> str:
    """Delete user - requires 'admin' scope."""
    return f"User {user_id} deleted"

@mcp.tool
async def get_user_info(ctx: Context) -> dict:
    """Get current user info from token."""
    token = get_access_token()
    if not token:
        await ctx.error("No authentication token")
        return {"error": "Not authenticated"}
    
    return {
        "user_id": token.user_id,
        "token_type": token.token_type,
        "scopes": token.scope,
        "expires_at": token.expires_at.isoformat() if token.expires_at else None
    }

mcp.run(transport="http", port=8080)

Advanced Authentication Patterns

from fastmcp import FastMCP, Context
from fastmcp.server.auth import AuthProvider, AccessToken
from fastmcp.server.dependencies import get_access_token, get_http_headers
import jwt
import httpx
from datetime import datetime, timezone

class CustomAuthProvider(AuthProvider):
    """Custom authentication provider with multiple methods."""
    
    def __init__(self, api_keys: dict[str, dict], jwt_secret: str):
        self.api_keys = api_keys
        self.jwt_secret = jwt_secret
    
    async def authenticate(self, request) -> AccessToken | None:
        """Authenticate using API key or JWT token."""
        headers = getattr(request, 'headers', {})
        
        # Try API key authentication
        api_key = headers.get('x-api-key')
        if api_key and api_key in self.api_keys:
            key_info = self.api_keys[api_key]
            return AccessToken(
                token=api_key,
                token_type="api_key",
                user_id=key_info["user_id"],
                scope=key_info["scope"],
                metadata=key_info
            )
        
        # Try JWT authentication
        auth_header = headers.get('authorization', '')
        if auth_header.startswith('Bearer '):
            token = auth_header[7:]
            try:
                payload = jwt.decode(
                    token, 
                    self.jwt_secret, 
                    algorithms=["HS256"]
                )
                
                expires_at = None
                if 'exp' in payload:
                    expires_at = datetime.fromtimestamp(payload['exp'], timezone.utc)
                
                return AccessToken(
                    token=token,
                    token_type="bearer",
                    user_id=payload.get('sub'),
                    scope=payload.get('scope', []),
                    expires_at=expires_at,
                    metadata=payload
                )
            except jwt.InvalidTokenError:
                pass
        
        return None

# Set up custom auth
api_keys = {
    "key-123": {
        "user_id": "service_account",
        "scope": ["read", "write"],
        "name": "Service Account"
    }
}

custom_auth = CustomAuthProvider(api_keys, "jwt-secret")

mcp = FastMCP(
    name="Multi-Auth Server",
    auth_provider=custom_auth
)

@mcp.tool
async def auth_demo(ctx: Context) -> dict:
    """Demonstrate authentication information access."""
    token = get_access_token()
    headers = get_http_headers()
    
    if not token:
        await ctx.error("No authentication provided")
        return {"error": "Authentication required"}
    
    await ctx.info(f"Authenticated user: {token.user_id}")
    
    return {
        "authentication": {
            "user_id": token.user_id,
            "token_type": token.token_type,
            "scopes": token.scope,
            "is_expired": token.is_expired(),
            "metadata": token.metadata
        },
        "request_info": {
            "user_agent": headers.get("user-agent", "unknown"),
            "ip_address": headers.get("x-forwarded-for", "unknown"),
            "timestamp": datetime.now(timezone.utc).isoformat()
        }
    }

mcp.run(transport="http", port=8080)

Authentication Error Handling

from fastmcp import Client
from fastmcp.client.auth import BearerAuth
from fastmcp.exceptions import ClientError
import asyncio

async def robust_auth_client():
    """Client with authentication error handling."""
    
    # Try with potentially expired token
    auth = BearerAuth("potentially-expired-token")
    
    try:
        async with Client(
            "https://secure-api.example.com/mcp",
            auth=auth
        ) as client:
            result = await client.call_tool("protected_tool", {})
            return result.text
            
    except ClientError as e:
        if "401" in str(e) or "unauthorized" in str(e).lower():
            print("Authentication failed - token may be expired")
            
            # Refresh token and retry
            new_token = await refresh_auth_token()
            auth = BearerAuth(new_token)
            
            async with Client(
                "https://secure-api.example.com/mcp",
                auth=auth
            ) as client:
                result = await client.call_tool("protected_tool", {})
                return result.text
        else:
            raise

async def refresh_auth_token() -> str:
    """Refresh authentication token."""
    # Implementation would depend on your auth system
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://auth.example.com/refresh",
            json={"refresh_token": "stored-refresh-token"}
        )
        return response.json()["access_token"]

Security Best Practices

Token Management

# Store tokens securely
import os
from pathlib import Path

def get_secure_token():
    """Get token from secure storage."""
    # From environment variable
    token = os.getenv("MCP_AUTH_TOKEN")
    if token:
        return token
    
    # From secure file
    token_file = Path.home() / ".config" / "mcp" / "token"
    if token_file.exists():
        return token_file.read_text().strip()
    
    raise ValueError("No authentication token found")

# Use secure token
auth = BearerAuth(get_secure_token())

Scope Validation

def validate_scopes(required_scopes: list[str]):
    """Validate that token has all required scopes."""
    token = get_access_token()
    if not token:
        raise PermissionError("Authentication required")
    
    missing_scopes = [
        scope for scope in required_scopes 
        if not token.has_scope(scope)
    ]
    
    if missing_scopes:
        raise PermissionError(f"Missing scopes: {', '.join(missing_scopes)}")

Install with Tessl CLI

npx tessl i tessl/pypi-fastmcp

docs

authentication.md

client.md

context.md

index.md

prompts.md

resources.md

server.md

tools.md

transports.md

utilities.md

tile.json