Discover and Export available Agent(s) as MCP Servers
Comprehensive architecture overview of embabel-agent-mcpserver, covering design patterns, component interactions, extensibility points, and architectural decisions.
The embabel-agent-mcpserver follows a layered architecture with clear separation of concerns, enabling both synchronous and asynchronous execution modes through a unified interface.
McpServerStrategy interface abstracts sync/async implementation detailsMono<T> for consistent reactive composition┌─────────────────────────────────────────────────────────────┐
│ MCP Clients │
│ (Claude Desktop, Custom Clients) │
└─────────────────────────┬───────────────────────────────────┘
│ MCP Protocol (SSE/HTTP)
┌─────────────────────────▼───────────────────────────────────┐
│ Transport Layer │
│ McpSyncServer / McpAsyncServer │
│ (io.modelcontextprotocol) │
└─────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────────▼───────────────────────────────────┐
│ Strategy Layer │
│ McpServerStrategy Interface │
│ ┌──────────────────┬──────────────────┐ │
│ │ │ │ │
│ SyncServerStrategy AsyncServerStrategy │ │
└─────────┬────────────────┬──────────────────┬──────────────┘
│ │ │
┌─────────▼────────────────▼──────────────────▼──────────────┐
│ Service Layer │
│ ToolRegistry │ ServerInfo │ Publishers │
└─────────┬────────────────┬──────────────────┬──────────────┘
│ │ │
┌─────────▼────────────────▼──────────────────▼──────────────┐
│ Publisher Layer │
│ McpExportToolCallbackPublisher (Tools) │
│ McpResourcePublisher / McpAsyncResourcePublisher │
│ McpPromptPublisher / McpAsyncPromptPublisher │
└─────────┬────────────────────────────────────────────────────┘
│
┌─────────▼────────────────────────────────────────────────────┐
│ Domain Layer │
│ ToolObject │ LlmReference │ McpToolExport │
└──────────────────────────────────────────────────────────────┘Responsibility: Handle MCP protocol communication with clients
Components:
McpSyncServer - Synchronous server implementationMcpAsyncServer - Asynchronous server implementationCharacteristics:
Example: { .api }
// Internal - managed by library
class McpSyncServer(
features: McpServerFeatures,
sseServer: SseServer
) {
fun handleToolCall(request: CallToolRequest): CallToolResult
fun handleResourceRead(request: ReadResourceRequest): ReadResourceResult
fun handlePromptGet(request: GetPromptRequest): GetPromptResult
}Responsibility: Provide unified interface across execution modes
Components:
McpServerStrategy - Core interfaceSyncServerStrategy - Synchronous implementationAsyncServerStrategy - Asynchronous implementationCharacteristics:
Design Pattern: Strategy Pattern
Example: { .api }
interface McpServerStrategy {
fun addToolCallback(toolCallback: ToolCallback): Mono<Void>
fun removeToolCallback(toolName: String): Mono<Void>
fun getServerInfo(): Mono<ServerInfo>
fun getExecutionMode(): McpExecutionMode
fun checkHealth(): Mono<ServerHealthStatus>
}
// Implementations delegate to mode-specific servers
class SyncServerStrategy(
private val mcpSyncServer: McpSyncServer,
private val toolRegistry: ToolRegistry,
private val serverInfo: ServerInfo
) : McpServerStrategy {
override fun addToolCallback(toolCallback: ToolCallback): Mono<Void> {
return Mono.fromRunnable {
mcpSyncServer.registerTool(toolCallback)
toolRegistry.register(toolCallback)
}
}
}Responsibility: Manage server state and business logic
Components:
ToolRegistry - Tool storage and queryServerInfo - Server metadataCharacteristics:
Example: { .api }
interface ToolRegistry {
fun register(toolCallback: ToolCallback): Mono<Void>
fun unregister(toolName: String): Mono<Void>
fun findToolCallback(toolName: String): Mono<ToolCallback>
fun listToolCallbacks(): Mono<List<ToolCallback>>
fun containsTool(toolName: String): Mono<Boolean>
}
// Implementation maintains internal state
class InMemoryToolRegistry : ToolRegistry {
private val tools = ConcurrentHashMap<String, ToolCallback>()
override fun register(toolCallback: ToolCallback): Mono<Void> {
return Mono.fromRunnable {
tools[toolCallback.name] = toolCallback
}
}
}Responsibility: Provide tools, resources, and prompts to server
Components:
McpExportToolCallbackPublisher - Tool publishersMcpResourcePublisher / McpAsyncResourcePublisher - Resource publishersMcpPromptPublisher / McpAsyncPromptPublisher - Prompt publishersCharacteristics:
Design Pattern: Publisher-Subscriber Pattern
Example: { .api }
@Service
class UserApiPublisher : McpExportToolCallbackPublisher {
override val toolCallbacks: List<ToolCallback>
get() = McpToolExport.fromToolObject(
ToolObject(
objects = listOf(createUserTool, updateUserTool),
namingStrategy = { "user_api_$it" }
)
).toolCallbacks
override fun infoString(verbose: Boolean?, indent: Int): String =
"UserApiPublisher: ${toolCallbacks.size} tools"
}Responsibility: Core domain models and conversion logic
Components:
ToolObject - Embabel tool containerLlmReference - Embabel agent referenceMcpToolExport - Tool export factoryCharacteristics:
Example: { .api }
data class ToolObject(
val objects: List<Any>,
val namingStrategy: StringTransformer? = null,
val filter: ((String) -> Boolean)? = null
)
interface McpToolExport {
val toolCallbacks: List<ToolCallback>
companion object {
fun fromToolObject(toolObject: ToolObject): McpToolExport
fun fromLlmReference(llmReference: LlmReference): McpToolExport
}
}Purpose: Enable runtime selection of server implementation
Implementation:
// Strategy interface
interface McpServerStrategy {
fun addToolCallback(toolCallback: ToolCallback): Mono<Void>
}
// Concrete strategies
class SyncServerStrategy : McpServerStrategy { /* blocking impl */ }
class AsyncServerStrategy : McpServerStrategy { /* reactive impl */ }
// Context selection via Spring auto-configuration
@Configuration
@ConditionalOnProperty("spring.ai.mcp.server.type", havingValue = "SYNC")
class SyncConfiguration {
@Bean
fun serverStrategy(): McpServerStrategy = SyncServerStrategy(...)
}Benefits:
Purpose: Simplify creation of complex objects
Implementation: { .api }
// Tool export factory
companion object {
fun fromToolObject(toolObject: ToolObject): McpToolExport {
// Complex conversion logic
return DefaultMcpToolExport(convertedCallbacks)
}
}
// Resource specification factories
object SyncResourceSpecificationFactory {
fun staticSyncResourceSpecification(
uri: String,
name: String,
description: String,
content: String,
mimeType: String
): SyncResourceSpecification {
// Builder logic
}
}Benefits:
Purpose: Decouple tool/resource/prompt providers from server
Implementation: { .api }
// Publisher interface
interface McpExportToolCallbackPublisher {
val toolCallbacks: List<ToolCallback>
}
// Auto-discovery and subscription
@Configuration
class McpServerAutoConfiguration {
@Bean
fun mcpServer(publishers: List<McpExportToolCallbackPublisher>): McpSyncServer {
val allTools = publishers.flatMap { it.toolCallbacks }
return createServerWithTools(allTools)
}
}Benefits:
Purpose: Define skeleton for configuration classes
Implementation: { .api }
abstract class AbstractMcpServerConfiguration {
// Template method
protected abstract fun createServerInfo(
applicationName: String,
executionMode: McpExecutionMode
): ServerInfo
protected abstract fun createToolRegistry(): ToolRegistry
// Concrete methods
protected fun getApplicationName(environment: Environment): String {
return environment.getProperty("spring.application.name", "agent-api")
}
}
// Concrete implementations
class McpSyncServerConfiguration : AbstractMcpServerConfiguration() {
override fun createServerInfo(...) = ServerInfo(...)
override fun createToolRegistry() = SyncToolRegistry()
}Benefits:
Purpose: Construct complex specifications incrementally
Implementation: { .api }
// Implicit builder via factory methods
SyncResourceSpecificationFactory.syncResourceSpecification(
uri = "app://data/users",
name = "Users",
description = "User list",
resourceLoader = { loadUsers() },
mimeType = "application/json"
)
// Chaining for complex objects
ToolObject(
objects = listOf(tool1, tool2),
namingStrategy = { "api_$it" },
filter = { !it.startsWith("_") }
)Benefits:
Purpose: Central tool lookup and management
Implementation: { .api }
interface ToolRegistry {
fun register(toolCallback: ToolCallback): Mono<Void>
fun findToolCallback(toolName: String): Mono<ToolCallback>
fun listToolCallbacks(): Mono<List<ToolCallback>>
}
// Usage
@Service
class ToolManager(private val registry: ToolRegistry) {
fun addTool(tool: ToolCallback) {
registry.register(tool).subscribe()
}
fun findTool(name: String): Mono<ToolCallback> {
return registry.findToolCallback(name)
}
}Benefits:
Application Startup
│
▼
┌──────────────────┐
│ Spring Context │
│ Initialization │
└────────┬─────────┘
│
▼
┌────────────────────────┐
│ Component Scanning │
│ Discovers Publishers │
└────────┬───────────────┘
│
▼
┌─────────────────────────────────┐
│ Auto-Configuration │
│ - Detects execution mode │
│ - Creates McpServerStrategy │
│ - Creates ToolRegistry │
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ Publisher Initialization │
│ - Calls toolCallbacks getter │
│ - Calls resources() / prompts() │
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ Tool Registration │
│ - McpToolExport conversion │
│ - Registry.register() calls │
│ - Server.registerTool() calls │
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ Server Ready │
│ - Exposes SSE endpoints │
│ - Accepts MCP requests │
└──────────────────────────────────┘MCP Client
│
│ CallToolRequest
▼
Transport Layer (McpSyncServer/McpAsyncServer)
│
│ Deserialize request
▼
Strategy Layer (McpServerStrategy)
│
│ getExecutionMode() check
▼
Service Layer (ToolRegistry)
│
│ findToolCallback(toolName)
▼
Tool Callback
│
│ call(arguments)
▼
Tool Implementation (User Code)
│
│ Business logic execution
▼
Result
│
│ Serialize response
▼
MCP ClientUser Code
│
│ serverStrategy.addToolCallback(tool)
▼
Strategy Layer
│
│ Mode-specific wrapper
▼
Service Layer (ToolRegistry)
│
│ Validation check
│ Duplicate check
▼
Registry Storage
│
│ ConcurrentHashMap.put()
▼
Transport Layer
│
│ Server.registerTool()
│ Update capabilities
▼
Tool AvailableCharacteristics:
Thread Model:
┌─────────────────────────────────────┐
│ Request Thread Pool │
│ (Tomcat/Jetty default) │
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Thread │ │ Thread │ │ Thread │ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │
└──────┼───────────┼───────────┼──────┘
│ │ │
▼ ▼ ▼
Request A Request B Request C
(blocks) (blocks) (blocks)Implementation: { .api }
class SyncServerStrategy(
private val mcpSyncServer: McpSyncServer,
private val toolRegistry: ToolRegistry,
private val serverInfo: ServerInfo
) : McpServerStrategy {
override fun addToolCallback(toolCallback: ToolCallback): Mono<Void> {
return Mono.fromRunnable {
// Blocking operation wrapped in Mono
mcpSyncServer.registerTool(toolCallback)
toolRegistry.register(toolCallback).block()
}
}
override fun getServerInfo(): Mono<ServerInfo> {
return Mono.just(serverInfo) // Immediate return
}
}Use Cases:
Characteristics:
Thread Model:
┌─────────────────────────────────────┐
│ Event Loop (Small Pool) │
│ │
│ ┌────────┐ ┌────────┐ │
│ │ Thread │ │ Thread │ │
│ │ 1 │ │ 2 │ │
│ └───┬────┘ └───┬────┘ │
│ │ │ │
└──────┼───────────┼──────────────────┘
│ │
▼ ▼
┌───────────────────┐
│ Request Queue │
│ (Non-blocking) │
│ │
│ Req A → Req B → │
│ Req C → Req D → │
└───────────────────┘Implementation: { .api }
class AsyncServerStrategy(
private val mcpAsyncServer: McpAsyncServer,
private val toolRegistry: ToolRegistry,
private val serverInfo: ServerInfo
) : McpServerStrategy {
override fun addToolCallback(toolCallback: ToolCallback): Mono<Void> {
return toolRegistry.register(toolCallback)
.then(
Mono.defer {
mcpAsyncServer.registerToolAsync(toolCallback)
}
)
}
override fun getServerInfo(): Mono<ServerInfo> {
return Mono.just(serverInfo)
.subscribeOn(Schedulers.boundedElastic())
}
}Use Cases:
| Aspect | Synchronous | Asynchronous |
|---|---|---|
| Concurrency Model | Thread-per-request | Event loop |
| Thread Usage | High (100s-1000s) | Low (10s) |
| Memory Per Request | ~1MB stack | ~KB heap |
| Throughput (idle) | Limited by threads | Very high |
| Throughput (CPU-bound) | Good | Similar |
| Throughput (I/O-bound) | Limited | Excellent |
| Error Handling | Try-catch | Reactive operators |
| Debugging | Straightforward | Complex |
| Learning Curve | Low | Moderate-High |
Purpose: Add domain-specific tools, resources, or prompts
Extension Point: Implement publisher interfaces
Example: { .api }
@Service
class CustomDomainPublisher :
McpExportToolCallbackPublisher,
McpResourcePublisher {
// Tools
override val toolCallbacks: List<ToolCallback>
get() = listOf(
createDomainTool1(),
createDomainTool2()
)
// Resources
override fun resources(): List<SyncResourceSpecification> {
return listOf(
createDomainResource1(),
createDomainResource2()
)
}
override fun infoString(verbose: Boolean?, indent: Int): String {
return "CustomDomainPublisher: ${toolCallbacks.size} tools, " +
"${resources().size} resources"
}
}Purpose: Control tool name generation
Extension Point: Provide StringTransformer implementations
Example: { .api }
class VersionedNamingStrategy(
private val version: String,
private val namespace: String
) : StringTransformer {
override fun transform(input: String): String {
return "${namespace}_${version}_${input}"
}
}
// Usage
val strategy = VersionedNamingStrategy("v2", "api")
val toolObject = ToolObject(
objects = listOf(myTool),
namingStrategy = strategy // Results in "api_v2_toolName"
)Purpose: Alternative storage implementations
Extension Point: Implement ToolRegistry interface
Example: { .api }
@Service
@Primary
class RedisToolRegistry(
private val redisTemplate: RedisTemplate<String, ToolCallback>
) : ToolRegistry {
override fun register(toolCallback: ToolCallback): Mono<Void> {
return Mono.fromRunnable {
redisTemplate.opsForValue()
.set("tool:${toolCallback.name}", toolCallback)
}
}
override fun findToolCallback(toolName: String): Mono<ToolCallback> {
return Mono.fromCallable {
redisTemplate.opsForValue()
.get("tool:$toolName")
}.subscribeOn(Schedulers.boundedElastic())
}
override fun listToolCallbacks(): Mono<List<ToolCallback>> {
return Mono.fromCallable {
val keys = redisTemplate.keys("tool:*")
keys.mapNotNull { key ->
redisTemplate.opsForValue().get(key)
}
}.subscribeOn(Schedulers.boundedElastic())
}
}Purpose: Specialized resource creation patterns
Extension Point: Create factory utility classes
Example: { .api }
object DatabaseResourceFactory {
fun createQueryResource(
uri: String,
name: String,
query: String,
dataSource: DataSource
): SyncResourceSpecification {
return SyncResourceSpecificationFactory.syncResourceSpecification(
uri = uri,
name = name,
description = "Database query: $query",
resourceLoader = { exchange ->
executeQuery(query, dataSource)
},
mimeType = "application/json"
)
}
private fun executeQuery(query: String, dataSource: DataSource): String {
// Execute query and serialize result
return "{}"
}
}Purpose: Domain-specific health monitoring
Extension Point: Override checkHealth() in custom strategies
Example: { .api }
class CustomAsyncServerStrategy(
mcpAsyncServer: McpAsyncServer,
toolRegistry: ToolRegistry,
serverInfo: ServerInfo,
private val healthChecker: CustomHealthChecker
) : AsyncServerStrategy(mcpAsyncServer, toolRegistry, serverInfo) {
override fun checkHealth(): Mono<ServerHealthStatus> {
return super.checkHealth()
.flatMap { baseHealth ->
healthChecker.performCustomChecks()
.map { customHealth ->
ServerHealthStatus(
isHealthy = baseHealth.isHealthy && customHealth.healthy,
message = combineMessages(baseHealth, customHealth),
details = baseHealth.details + customHealth.details
)
}
}
}
}Request Handling:
Tool Execution:
// Executes on request thread
override fun call(functionArguments: String): String {
// Blocking database call
val result = database.query(arguments) // Thread blocks here
return serialize(result)
}Concurrency Control: { .api }
@Service
class ThreadSafePublisher : McpExportToolCallbackPublisher {
private val cache = ConcurrentHashMap<String, String>()
override val toolCallbacks: List<ToolCallback> by lazy {
// Computed once, thread-safe initialization
loadToolsExpensively()
}
}Request Handling:
Tool Execution: { .api }
// Returns immediately, actual work scheduled
override fun call(functionArguments: String): String {
// Non-blocking database call
databaseReactive.query(arguments)
.map { result -> serialize(result) }
.subscribe()
return "Processing..." // Or use callbacks
}Scheduler Selection: { .api }
@Service
class ReactivePublisher : McpAsyncResourcePublisher {
override fun resources(): List<AsyncResourceSpecification> {
return listOf(
createCpuBoundResource(), // Uses parallel scheduler
createIoBoundResource(), // Uses boundedElastic scheduler
createBlockingResource() // Uses boundedElastic scheduler
)
}
private fun createIoBoundResource(): AsyncResourceSpecification {
return AsyncResourceSpecification(...) { exchange, request ->
Mono.fromCallable {
// Blocking I/O
fileSystem.readFile("data.json")
}
.subscribeOn(Schedulers.boundedElastic()) // I/O scheduler
.map { content ->
ReadResourceResult(listOf(
TextResourceContents(uri, "application/json", content)
))
}
}
}
}See Error Handling for comprehensive error handling strategies.
See Performance for detailed performance analysis and optimization strategies.