or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdclient.mdcontext.mdindex.mdprompts.mdresources.mdserver.mdtools.mdtransports.mdutilities.md

authentication.mddocs/

0

# Authentication

1

2

Comprehensive authentication system supporting multiple providers and tokens for secure server and client connections. FastMCP provides flexible authentication mechanisms for both server protection and client authorization.

3

4

## Capabilities

5

6

### Server Authentication Providers

7

8

Authentication providers for securing FastMCP servers against unauthorized access.

9

10

```python { .api }

11

class AuthProvider:

12

"""Base class for authentication providers."""

13

14

async def authenticate(self, request: Any) -> AccessToken | None:

15

"""

16

Authenticate a request and return access token.

17

18

Parameters:

19

- request: Request object to authenticate

20

21

Returns:

22

AccessToken if authentication successful, None otherwise

23

"""

24

25

class OAuthProvider(AuthProvider):

26

"""OAuth 2.0 authentication provider."""

27

28

def __init__(

29

self,

30

client_id: str,

31

client_secret: str,

32

token_url: str,

33

scope: list[str] | None = None,

34

audience: str | None = None

35

):

36

"""

37

Initialize OAuth provider.

38

39

Parameters:

40

- client_id: OAuth client ID

41

- client_secret: OAuth client secret

42

- token_url: Token endpoint URL

43

- scope: Required scopes

44

- audience: Token audience

45

"""

46

47

class JWTVerifier(AuthProvider):

48

"""JWT token verification provider."""

49

50

def __init__(

51

self,

52

secret: str,

53

algorithms: list[str] = ["HS256"],

54

audience: str | None = None,

55

issuer: str | None = None

56

):

57

"""

58

Initialize JWT verifier.

59

60

Parameters:

61

- secret: JWT signing secret

62

- algorithms: Allowed signing algorithms

63

- audience: Expected token audience

64

- issuer: Expected token issuer

65

"""

66

67

class StaticTokenVerifier(AuthProvider):

68

"""Static token verification provider."""

69

70

def __init__(

71

self,

72

valid_tokens: dict[str, dict] | list[str],

73

token_type: str = "bearer"

74

):

75

"""

76

Initialize static token verifier.

77

78

Parameters:

79

- valid_tokens: Dictionary of token->metadata or list of valid tokens

80

- token_type: Type of tokens to accept

81

"""

82

83

class RemoteAuthProvider(AuthProvider):

84

"""Remote authentication provider using external service."""

85

86

def __init__(

87

self,

88

verify_url: str,

89

headers: dict[str, str] | None = None,

90

timeout: float = 10.0

91

):

92

"""

93

Initialize remote auth provider.

94

95

Parameters:

96

- verify_url: URL for token verification

97

- headers: Additional headers for verification requests

98

- timeout: Request timeout

99

"""

100

```

101

102

### Client Authentication

103

104

Authentication classes for clients connecting to secured servers.

105

106

```python { .api }

107

class BearerAuth:

108

"""Bearer token authentication for clients."""

109

110

def __init__(self, token: str):

111

"""

112

Initialize bearer authentication.

113

114

Parameters:

115

- token: Bearer token string

116

"""

117

118

class OAuth:

119

"""OAuth 2.0 authentication for clients."""

120

121

def __init__(

122

self,

123

client_id: str,

124

client_secret: str,

125

token_url: str,

126

scope: list[str] | None = None,

127

audience: str | None = None

128

):

129

"""

130

Initialize OAuth authentication.

131

132

Parameters:

133

- client_id: OAuth client ID

134

- client_secret: OAuth client secret

135

- token_url: Token endpoint URL

136

- scope: Requested scopes

137

- audience: Token audience

138

"""

139

140

async def get_token(self) -> str:

141

"""

142

Get access token from OAuth provider.

143

144

Returns:

145

Access token string

146

"""

147

```

148

149

### Access Token Model

150

151

Token representation containing authentication information and metadata.

152

153

```python { .api }

154

class AccessToken:

155

def __init__(

156

self,

157

token: str,

158

token_type: str = "bearer",

159

expires_at: datetime | None = None,

160

scope: list[str] | None = None,

161

user_id: str | None = None,

162

metadata: dict | None = None

163

):

164

"""

165

Access token representation.

166

167

Parameters:

168

- token: Token string

169

- token_type: Type of token (bearer, etc.)

170

- expires_at: Token expiration time

171

- scope: Token scopes/permissions

172

- user_id: Associated user ID

173

- metadata: Additional token metadata

174

"""

175

176

def is_expired(self) -> bool:

177

"""Check if token is expired."""

178

179

def has_scope(self, required_scope: str) -> bool:

180

"""Check if token has required scope."""

181

```

182

183

## Usage Examples

184

185

### Server Authentication Setup

186

187

```python

188

from fastmcp import FastMCP

189

from fastmcp.server.auth import JWTVerifier, StaticTokenVerifier, OAuthProvider

190

191

# JWT Authentication

192

jwt_auth = JWTVerifier(

193

secret="your-jwt-secret-key",

194

algorithms=["HS256"],

195

audience="fastmcp-server",

196

issuer="your-auth-service"

197

)

198

199

mcp = FastMCP(

200

name="Secure Server",

201

auth_provider=jwt_auth

202

)

203

204

@mcp.tool

205

def protected_operation(data: str) -> str:

206

"""This tool requires authentication."""

207

return f"Processed: {data}"

208

209

# Run with authentication

210

mcp.run(transport="http", port=8080)

211

```

212

213

### Static Token Authentication

214

215

```python

216

from fastmcp import FastMCP

217

from fastmcp.server.auth import StaticTokenVerifier

218

219

# Define valid tokens with metadata

220

valid_tokens = {

221

"admin-token-123": {

222

"user_id": "admin",

223

"scope": ["read", "write", "admin"],

224

"role": "administrator"

225

},

226

"user-token-456": {

227

"user_id": "user1",

228

"scope": ["read", "write"],

229

"role": "user"

230

},

231

"readonly-token-789": {

232

"user_id": "readonly",

233

"scope": ["read"],

234

"role": "readonly"

235

}

236

}

237

238

static_auth = StaticTokenVerifier(valid_tokens)

239

240

mcp = FastMCP(

241

name="Token Protected Server",

242

auth_provider=static_auth

243

)

244

245

@mcp.tool

246

def read_data() -> dict:

247

"""Requires 'read' scope."""

248

return {"data": "public information"}

249

250

@mcp.tool

251

def write_data(content: str) -> str:

252

"""Requires 'write' scope."""

253

# This would check token scope in practice

254

return f"Wrote: {content}"

255

256

mcp.run(transport="http", port=8080)

257

```

258

259

### OAuth Server Authentication

260

261

```python

262

from fastmcp import FastMCP

263

from fastmcp.server.auth import OAuthProvider

264

265

oauth_auth = OAuthProvider(

266

client_id="your-oauth-client-id",

267

client_secret="your-oauth-client-secret",

268

token_url="https://auth.example.com/oauth/token",

269

scope=["mcp:access"],

270

audience="mcp-server"

271

)

272

273

mcp = FastMCP(

274

name="OAuth Protected Server",

275

auth_provider=oauth_auth

276

)

277

278

@mcp.tool

279

async def oauth_protected_tool(query: str) -> str:

280

"""Tool that requires OAuth authentication."""

281

return f"Query result: {query}"

282

283

mcp.run(transport="http", port=8080)

284

```

285

286

### Remote Authentication Provider

287

288

```python

289

from fastmcp import FastMCP

290

from fastmcp.server.auth import RemoteAuthProvider

291

292

remote_auth = RemoteAuthProvider(

293

verify_url="https://auth-service.example.com/verify",

294

headers={

295

"X-Service-Key": "your-service-key",

296

"Content-Type": "application/json"

297

},

298

timeout=5.0

299

)

300

301

mcp = FastMCP(

302

name="Remote Auth Server",

303

auth_provider=remote_auth

304

)

305

306

@mcp.tool

307

def external_auth_tool(input_data: str) -> str:

308

"""Tool protected by external authentication service."""

309

return f"Processed with external auth: {input_data}"

310

311

mcp.run(transport="http", port=8080)

312

```

313

314

### Client Authentication Usage

315

316

```python

317

from fastmcp import Client

318

from fastmcp.client.auth import BearerAuth, OAuth

319

320

async def bearer_auth_client():

321

"""Client with bearer token authentication."""

322

auth = BearerAuth("your-bearer-token")

323

324

async with Client(

325

"http://secure-server.example.com/mcp",

326

auth=auth

327

) as client:

328

result = await client.call_tool("protected_operation", {"data": "test"})

329

return result.text

330

331

async def oauth_client():

332

"""Client with OAuth authentication."""

333

oauth = OAuth(

334

client_id="client-id",

335

client_secret="client-secret",

336

token_url="https://auth.example.com/token",

337

scope=["mcp:access"]

338

)

339

340

async with Client(

341

"http://oauth-server.example.com/mcp",

342

auth=oauth

343

) as client:

344

# OAuth token is automatically obtained and used

345

result = await client.call_tool("oauth_protected_tool", {"query": "hello"})

346

return result.text

347

348

async def manual_token_client():

349

"""Client with manually managed token."""

350

# Get token from your auth system

351

token = await get_auth_token_from_somewhere()

352

353

auth = BearerAuth(token)

354

355

async with Client(

356

"https://api.example.com/mcp",

357

auth=auth

358

) as client:

359

result = await client.call_tool("secure_api", {})

360

return result.text

361

```

362

363

### Scope-Based Authorization

364

365

```python

366

from fastmcp import FastMCP, Context

367

from fastmcp.server.auth import JWTVerifier

368

from fastmcp.server.dependencies import get_access_token

369

370

jwt_auth = JWTVerifier(

371

secret="jwt-secret",

372

algorithms=["HS256"]

373

)

374

375

mcp = FastMCP(

376

name="Scope Protected Server",

377

auth_provider=jwt_auth

378

)

379

380

def require_scope(required_scope: str):

381

"""Decorator to require specific scope."""

382

def decorator(func):

383

def wrapper(*args, **kwargs):

384

token = get_access_token()

385

if not token or not token.has_scope(required_scope):

386

raise PermissionError(f"Missing required scope: {required_scope}")

387

return func(*args, **kwargs)

388

return wrapper

389

return decorator

390

391

@mcp.tool

392

@require_scope("read")

393

def read_users() -> list[dict]:

394

"""Read user data - requires 'read' scope."""

395

return [

396

{"id": 1, "name": "Alice"},

397

{"id": 2, "name": "Bob"}

398

]

399

400

@mcp.tool

401

@require_scope("write")

402

def create_user(name: str, email: str) -> dict:

403

"""Create user - requires 'write' scope."""

404

return {

405

"id": 123,

406

"name": name,

407

"email": email,

408

"created": "2024-01-01T00:00:00Z"

409

}

410

411

@mcp.tool

412

@require_scope("admin")

413

def delete_user(user_id: int) -> str:

414

"""Delete user - requires 'admin' scope."""

415

return f"User {user_id} deleted"

416

417

@mcp.tool

418

async def get_user_info(ctx: Context) -> dict:

419

"""Get current user info from token."""

420

token = get_access_token()

421

if not token:

422

await ctx.error("No authentication token")

423

return {"error": "Not authenticated"}

424

425

return {

426

"user_id": token.user_id,

427

"token_type": token.token_type,

428

"scopes": token.scope,

429

"expires_at": token.expires_at.isoformat() if token.expires_at else None

430

}

431

432

mcp.run(transport="http", port=8080)

433

```

434

435

### Advanced Authentication Patterns

436

437

```python

438

from fastmcp import FastMCP, Context

439

from fastmcp.server.auth import AuthProvider, AccessToken

440

from fastmcp.server.dependencies import get_access_token, get_http_headers

441

import jwt

442

import httpx

443

from datetime import datetime, timezone

444

445

class CustomAuthProvider(AuthProvider):

446

"""Custom authentication provider with multiple methods."""

447

448

def __init__(self, api_keys: dict[str, dict], jwt_secret: str):

449

self.api_keys = api_keys

450

self.jwt_secret = jwt_secret

451

452

async def authenticate(self, request) -> AccessToken | None:

453

"""Authenticate using API key or JWT token."""

454

headers = getattr(request, 'headers', {})

455

456

# Try API key authentication

457

api_key = headers.get('x-api-key')

458

if api_key and api_key in self.api_keys:

459

key_info = self.api_keys[api_key]

460

return AccessToken(

461

token=api_key,

462

token_type="api_key",

463

user_id=key_info["user_id"],

464

scope=key_info["scope"],

465

metadata=key_info

466

)

467

468

# Try JWT authentication

469

auth_header = headers.get('authorization', '')

470

if auth_header.startswith('Bearer '):

471

token = auth_header[7:]

472

try:

473

payload = jwt.decode(

474

token,

475

self.jwt_secret,

476

algorithms=["HS256"]

477

)

478

479

expires_at = None

480

if 'exp' in payload:

481

expires_at = datetime.fromtimestamp(payload['exp'], timezone.utc)

482

483

return AccessToken(

484

token=token,

485

token_type="bearer",

486

user_id=payload.get('sub'),

487

scope=payload.get('scope', []),

488

expires_at=expires_at,

489

metadata=payload

490

)

491

except jwt.InvalidTokenError:

492

pass

493

494

return None

495

496

# Set up custom auth

497

api_keys = {

498

"key-123": {

499

"user_id": "service_account",

500

"scope": ["read", "write"],

501

"name": "Service Account"

502

}

503

}

504

505

custom_auth = CustomAuthProvider(api_keys, "jwt-secret")

506

507

mcp = FastMCP(

508

name="Multi-Auth Server",

509

auth_provider=custom_auth

510

)

511

512

@mcp.tool

513

async def auth_demo(ctx: Context) -> dict:

514

"""Demonstrate authentication information access."""

515

token = get_access_token()

516

headers = get_http_headers()

517

518

if not token:

519

await ctx.error("No authentication provided")

520

return {"error": "Authentication required"}

521

522

await ctx.info(f"Authenticated user: {token.user_id}")

523

524

return {

525

"authentication": {

526

"user_id": token.user_id,

527

"token_type": token.token_type,

528

"scopes": token.scope,

529

"is_expired": token.is_expired(),

530

"metadata": token.metadata

531

},

532

"request_info": {

533

"user_agent": headers.get("user-agent", "unknown"),

534

"ip_address": headers.get("x-forwarded-for", "unknown"),

535

"timestamp": datetime.now(timezone.utc).isoformat()

536

}

537

}

538

539

mcp.run(transport="http", port=8080)

540

```

541

542

### Authentication Error Handling

543

544

```python

545

from fastmcp import Client

546

from fastmcp.client.auth import BearerAuth

547

from fastmcp.exceptions import ClientError

548

import asyncio

549

550

async def robust_auth_client():

551

"""Client with authentication error handling."""

552

553

# Try with potentially expired token

554

auth = BearerAuth("potentially-expired-token")

555

556

try:

557

async with Client(

558

"https://secure-api.example.com/mcp",

559

auth=auth

560

) as client:

561

result = await client.call_tool("protected_tool", {})

562

return result.text

563

564

except ClientError as e:

565

if "401" in str(e) or "unauthorized" in str(e).lower():

566

print("Authentication failed - token may be expired")

567

568

# Refresh token and retry

569

new_token = await refresh_auth_token()

570

auth = BearerAuth(new_token)

571

572

async with Client(

573

"https://secure-api.example.com/mcp",

574

auth=auth

575

) as client:

576

result = await client.call_tool("protected_tool", {})

577

return result.text

578

else:

579

raise

580

581

async def refresh_auth_token() -> str:

582

"""Refresh authentication token."""

583

# Implementation would depend on your auth system

584

async with httpx.AsyncClient() as client:

585

response = await client.post(

586

"https://auth.example.com/refresh",

587

json={"refresh_token": "stored-refresh-token"}

588

)

589

return response.json()["access_token"]

590

```

591

592

## Security Best Practices

593

594

### Token Management

595

596

```python

597

# Store tokens securely

598

import os

599

from pathlib import Path

600

601

def get_secure_token():

602

"""Get token from secure storage."""

603

# From environment variable

604

token = os.getenv("MCP_AUTH_TOKEN")

605

if token:

606

return token

607

608

# From secure file

609

token_file = Path.home() / ".config" / "mcp" / "token"

610

if token_file.exists():

611

return token_file.read_text().strip()

612

613

raise ValueError("No authentication token found")

614

615

# Use secure token

616

auth = BearerAuth(get_secure_token())

617

```

618

619

### Scope Validation

620

621

```python

622

def validate_scopes(required_scopes: list[str]):

623

"""Validate that token has all required scopes."""

624

token = get_access_token()

625

if not token:

626

raise PermissionError("Authentication required")

627

628

missing_scopes = [

629

scope for scope in required_scopes

630

if not token.has_scope(scope)

631

]

632

633

if missing_scopes:

634

raise PermissionError(f"Missing scopes: {', '.join(missing_scopes)}")

635

```