The fast, Pythonic way to build MCP servers and clients with minimal boilerplate code.
—
Resources expose read-only data sources with support for static resources and dynamic templates with URI parameters. The FastMCP resources system provides a clean way to expose data that LLMs can access for context and information retrieval.
Base classes for creating and managing resources with different content types and access patterns.
class Resource:
def __init__(
self,
uri: str,
name: str,
description: str,
mime_type: str | None = None
):
"""
Base resource class for static resources.
Parameters:
- uri: Resource URI for client access
- name: Human-readable resource name
- description: Resource description for LLM understanding
- mime_type: MIME type of resource content
"""
class FunctionResource(Resource):
"""Resource implementation for function-based resources."""
class TextResource:
"""Resource for text-based content."""
class BinaryResource:
"""Resource for binary content (images, files, etc.)."""
class FileResource:
"""Resource that serves file content from filesystem."""
class HttpResource:
"""Resource that proxies HTTP requests to external URLs."""
class DirectoryResource:
"""Resource that serves directory listings or file content."""Dynamic resources that accept URI parameters for flexible data access.
class ResourceTemplate:
def __init__(
self,
uri_template: str,
name: str,
description: str,
mime_type: str | None = None
):
"""
Resource template for dynamic resources with URI parameters.
Parameters:
- uri_template: URI template with {parameters} placeholders
- name: Human-readable template name
- description: Template description for LLM understanding
- mime_type: MIME type of resource content
"""Manages resource registration, templates, and resolution within a FastMCP server.
class ResourceManager:
def add_resource(self, resource: Resource) -> None:
"""
Add a static resource to the manager.
Parameters:
- resource: Resource instance to add
"""
def add_template(self, template: ResourceTemplate) -> None:
"""
Add a resource template for dynamic resources.
Parameters:
- template: ResourceTemplate instance to add
"""
def get_resource(self, uri: str) -> Resource | None:
"""
Get a resource by URI.
Parameters:
- uri: Resource URI to retrieve
Returns:
Resource instance or None if not found
"""
def list_resources(self) -> list[Resource]:
"""
List all registered static resources.
Returns:
List of all resource instances
"""
def list_templates(self) -> list[ResourceTemplate]:
"""
List all registered resource templates.
Returns:
List of all resource template instances
"""from fastmcp import FastMCP
mcp = FastMCP("Info Server")
@mcp.resource("config://version")
def get_version():
"""Get the current server version."""
return "1.0.0"
@mcp.resource("config://settings")
def get_settings():
"""Get server configuration settings."""
return {
"debug": True,
"max_connections": 100,
"timeout": 30
}
@mcp.resource("info://status")
def get_status():
"""Get current server status."""
import psutil
import time
return {
"uptime": time.time() - mcp.start_time,
"memory_usage": psutil.virtual_memory().percent,
"cpu_usage": psutil.cpu_percent(),
"active_connections": len(mcp.connections)
}from fastmcp import FastMCP
mcp = FastMCP("Data Server")
@mcp.resource("users://{user_id}/profile")
def get_user_profile(user_id: int):
"""
Get user profile information by ID.
Parameters:
- user_id: Unique user identifier
Returns:
User profile data as JSON
"""
# This would typically query a database
users_db = {
1: {"name": "Alice", "email": "alice@example.com", "status": "active"},
2: {"name": "Bob", "email": "bob@example.com", "status": "inactive"},
3: {"name": "Charlie", "email": "charlie@example.com", "status": "active"}
}
user = users_db.get(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return {
"user_id": user_id,
**user,
"last_updated": "2024-01-01T00:00:00Z"
}
@mcp.resource("data://{dataset}/{table}")
def get_table_data(dataset: str, table: str):
"""
Get data from a specific dataset table.
Parameters:
- dataset: Dataset name
- table: Table name within dataset
Returns:
Table data as JSON
"""
# Validate dataset and table
valid_datasets = ["sales", "marketing", "finance"]
if dataset not in valid_datasets:
raise ValueError(f"Invalid dataset: {dataset}")
# Mock data structure
data = {
"sales": {
"orders": [
{"id": 1, "amount": 100.0, "date": "2024-01-01"},
{"id": 2, "amount": 250.0, "date": "2024-01-02"}
],
"customers": [
{"id": 1, "name": "Customer A", "region": "North"},
{"id": 2, "name": "Customer B", "region": "South"}
]
},
"marketing": {
"campaigns": [
{"id": 1, "name": "Q1 Campaign", "budget": 10000},
{"id": 2, "name": "Q2 Campaign", "budget": 15000}
]
}
}
if table not in data.get(dataset, {}):
raise ValueError(f"Table {table} not found in dataset {dataset}")
return {
"dataset": dataset,
"table": table,
"data": data[dataset][table],
"record_count": len(data[dataset][table])
}
@mcp.resource("files://{path}")
def get_file_content(path: str):
"""
Get file content by path (with security restrictions).
Parameters:
- path: File path relative to allowed directory
Returns:
File content as text or binary data
"""
import os
from pathlib import Path
# Security: restrict to allowed directory
allowed_dir = Path("./data")
safe_path = allowed_dir / path
# Prevent directory traversal
if not safe_path.resolve().is_relative_to(allowed_dir.resolve()):
raise ValueError("Access denied: path outside allowed directory")
if not safe_path.exists():
raise ValueError(f"File not found: {path}")
if safe_path.is_dir():
# Return directory listing
files = [f.name for f in safe_path.iterdir() if f.is_file()]
dirs = [d.name for d in safe_path.iterdir() if d.is_dir()]
return {
"type": "directory",
"path": path,
"files": files,
"directories": dirs
}
# Return file content
try:
with open(safe_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"type": "text",
"path": path,
"content": content,
"size": len(content)
}
except UnicodeDecodeError:
# Binary file
with open(safe_path, 'rb') as f:
content = f.read()
return {
"type": "binary",
"path": path,
"size": len(content),
"content_base64": content.hex() # Or base64.b64encode(content).decode()
}from fastmcp import FastMCP, Context
import json
mcp = FastMCP("API Proxy Server")
@mcp.resource("api://{service}/{endpoint}")
async def proxy_api_request(service: str, endpoint: str, ctx: Context):
"""
Proxy requests to external APIs with authentication and logging.
Parameters:
- service: Service name (maps to API base URL)
- endpoint: API endpoint path
- ctx: Execution context for HTTP requests and logging
Returns:
API response data
"""
# Service URL mapping
service_urls = {
"weather": "https://api.weather.com/v1",
"news": "https://api.news.com/v2",
"stocks": "https://api.stocks.com/v1"
}
if service not in service_urls:
await ctx.error(f"Unknown service: {service}")
raise ValueError(f"Service {service} not supported")
base_url = service_urls[service]
full_url = f"{base_url}/{endpoint}"
await ctx.info(f"Proxying request to {service}: {endpoint}")
try:
# Make HTTP request with authentication
headers = {
"Authorization": f"Bearer {get_api_key(service)}",
"User-Agent": "FastMCP-Proxy/1.0"
}
response = await ctx.http_request(
method="GET",
url=full_url,
headers=headers
)
if response.status_code == 200:
await ctx.info(f"Successfully fetched data from {service}")
data = response.json()
return {
"service": service,
"endpoint": endpoint,
"data": data,
"timestamp": "2024-01-01T00:00:00Z",
"status": "success"
}
else:
await ctx.error(f"API request failed: {response.status_code}")
return {
"service": service,
"endpoint": endpoint,
"error": f"HTTP {response.status_code}",
"status": "error"
}
except Exception as e:
await ctx.error(f"Request failed: {str(e)}")
return {
"service": service,
"endpoint": endpoint,
"error": str(e),
"status": "error"
}
def get_api_key(service: str) -> str:
"""Get API key for service (would load from secure config)."""
api_keys = {
"weather": "weather_api_key_here",
"news": "news_api_key_here",
"stocks": "stocks_api_key_here"
}
return api_keys.get(service, "")from fastmcp import FastMCP
from fastmcp.utilities.types import Image, File
import mimetypes
from pathlib import Path
mcp = FastMCP("File Server")
@mcp.resource("docs://{filename}")
def get_document(filename: str):
"""
Serve documentation files.
Parameters:
- filename: Name of documentation file
Returns:
Document content with appropriate MIME type
"""
docs_dir = Path("./docs")
file_path = docs_dir / filename
# Security check
if not file_path.resolve().is_relative_to(docs_dir.resolve()):
raise ValueError("Access denied")
if not file_path.exists():
raise ValueError(f"Document not found: {filename}")
mime_type, _ = mimetypes.guess_type(str(file_path))
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"filename": filename,
"content": content,
"mime_type": mime_type or "text/plain",
"size": len(content)
}
@mcp.resource("images://{image_name}")
def get_image(image_name: str):
"""
Serve image files as Image objects.
Parameters:
- image_name: Name of image file
Returns:
Image object with binary data
"""
images_dir = Path("./images")
image_path = images_dir / image_name
if not image_path.exists():
raise ValueError(f"Image not found: {image_name}")
with open(image_path, 'rb') as f:
image_data = f.read()
mime_type, _ = mimetypes.guess_type(str(image_path))
return Image(
data=image_data,
mime_type=mime_type or "application/octet-stream"
)
@mcp.resource("downloads://{file_name}")
def get_download_file(file_name: str):
"""
Serve downloadable files as File objects.
Parameters:
- file_name: Name of file to download
Returns:
File object with metadata
"""
downloads_dir = Path("./downloads")
file_path = downloads_dir / file_name
if not file_path.exists():
raise ValueError(f"File not found: {file_name}")
mime_type, _ = mimetypes.guess_type(str(file_path))
if mime_type and mime_type.startswith('text/'):
# Text file
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
else:
# Binary file
with open(file_path, 'rb') as f:
content = f.read()
return File(
data=content,
name=file_name,
mime_type=mime_type
)from fastmcp import FastMCP
import sqlite3
import json
mcp = FastMCP("Database Server")
@mcp.resource("db://{table}")
def get_table_contents(table: str):
"""
Get contents of a database table.
Parameters:
- table: Name of database table
Returns:
Table data as JSON with metadata
"""
# Whitelist allowed tables for security
allowed_tables = ["users", "orders", "products", "categories"]
if table not in allowed_tables:
raise ValueError(f"Access to table '{table}' not allowed")
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row # Enable column access by name
cursor = conn.cursor()
try:
# Get table schema
cursor.execute(f"PRAGMA table_info({table})")
columns = [row['name'] for row in cursor.fetchall()]
# Get table data
cursor.execute(f"SELECT * FROM {table} LIMIT 100") # Limit for safety
rows = cursor.fetchall()
# Convert to list of dictionaries
data = [dict(row) for row in rows]
return {
"table": table,
"columns": columns,
"row_count": len(data),
"data": data,
"query_timestamp": "2024-01-01T00:00:00Z"
}
except sqlite3.Error as e:
raise ValueError(f"Database error: {str(e)}")
finally:
conn.close()
@mcp.resource("db://query/{query_name}")
def get_predefined_query(query_name: str):
"""
Execute predefined database queries.
Parameters:
- query_name: Name of predefined query
Returns:
Query results as JSON
"""
# Predefined queries for security
queries = {
"user_stats": "SELECT COUNT(*) as total_users, COUNT(CASE WHEN status='active' THEN 1 END) as active_users FROM users",
"recent_orders": "SELECT * FROM orders WHERE created_at > datetime('now', '-7 days') ORDER BY created_at DESC",
"top_products": "SELECT p.*, COUNT(o.product_id) as order_count FROM products p LEFT JOIN orders o ON p.id = o.product_id GROUP BY p.id ORDER BY order_count DESC LIMIT 10"
}
if query_name not in queries:
raise ValueError(f"Query '{query_name}' not found")
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cursor.execute(queries[query_name])
rows = cursor.fetchall()
data = [dict(row) for row in rows]
return {
"query_name": query_name,
"sql": queries[query_name],
"result_count": len(data),
"data": data,
"executed_at": "2024-01-01T00:00:00Z"
}
except sqlite3.Error as e:
raise ValueError(f"Query error: {str(e)}")
finally:
conn.close()Resources can return various content types:
# Text content
@mcp.resource("text://example")
def text_resource():
return "Plain text content"
# JSON data
@mcp.resource("json://example")
def json_resource():
return {"key": "value", "numbers": [1, 2, 3]}
# Binary data as Image
@mcp.resource("image://example")
def image_resource():
with open("example.png", "rb") as f:
return Image(f.read(), mime_type="image/png")
# Binary data as File
@mcp.resource("file://example")
def file_resource():
return File(
data=b"binary content",
name="example.bin",
mime_type="application/octet-stream"
)Install with Tessl CLI
npx tessl i tessl/pypi-fastmcp