Spring Framework integration for Model Context Protocol (MCP), providing Spring AI function calling capabilities and Spring-friendly abstractions for MCP clients and MCP servers
The AsyncMcpToolCallback class adapts MCP tools to Spring AI's ToolCallback interface with asynchronous execution capabilities using Project Reactor. It enables non-blocking tool execution suitable for long-running operations or reactive applications.
import org.springframework.ai.mcp.AsyncMcpToolCallback;
import org.springframework.ai.mcp.ToolContextToMcpMetaConverter;
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.spec.McpSchema;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.ai.tool.definition.ToolDefinition;
import org.springframework.ai.chat.model.ToolContext;
import org.springframework.ai.tool.execution.ToolExecutionException;
import reactor.core.publisher.Mono;
import org.springframework.ai.model.tool.internal.ToolCallReactiveContextHolder;public class AsyncMcpToolCallback implements ToolCallback {
// Immutable, thread-safe implementation
// Uses Project Reactor for non-blocking execution
}static AsyncMcpToolCallback.Builder builder();
class Builder {
Builder mcpClient(McpAsyncClient mcpClient); // required
Builder tool(McpSchema.Tool tool); // required
Builder prefixedToolName(String prefixedToolName); // optional
Builder toolContextToMcpMetaConverter(ToolContextToMcpMetaConverter converter); // optional
AsyncMcpToolCallback build(); // returns AsyncMcpToolCallback
}mcpClient (required): The McpAsyncClient instance for asynchronous tool execution
tool (required): The McpSchema.Tool definition from the MCP server
mcpClient.listTools().block()prefixedToolName (optional): Custom prefixed name for the tool
toolContextToMcpMetaConverter (optional): Converter for tool context to MCP metadata
ToolContextToMcpMetaConverter.defaultConverter()ToolContextToMcpMetaConverter.noOp() to skip conversionMcpAsyncClient mcpClient = // ... initialize async MCP client
McpSchema.Tool tool = // ... get tool from MCP server
// Basic usage
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
.mcpClient(mcpClient)
.tool(tool)
.build();
// With custom prefix
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
.mcpClient(mcpClient)
.tool(tool)
.prefixedToolName("my_async_server_list_files")
.build();
// With custom converter
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
.mcpClient(mcpClient)
.tool(tool)
.toolContextToMcpMetaConverter(ToolContextToMcpMetaConverter.noOp())
.build();
// With all options
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
.mcpClient(mcpClient)
.tool(tool)
.prefixedToolName("prod_weather_get_forecast_async")
.toolContextToMcpMetaConverter(customConverter)
.build();@Deprecated
public AsyncMcpToolCallback(McpAsyncClient mcpClient, McpSchema.Tool tool);Note: This constructor is deprecated since version 1.1.0. Use the builder pattern instead for better flexibility and readability.
ToolDefinition getToolDefinition();Returns the Spring AI ToolDefinition for this MCP tool, including the name, description, and input schema.
ToolDefinition containing:
name(): Tool name (prefixed if specified)description(): Tool description from MCP serverinputSchema(): JSON schema for tool inputs (normalized)AsyncMcpToolCallback callback = // ... create callback
ToolDefinition definition = callback.getToolDefinition();
String name = definition.name();
String description = definition.description();
String inputSchema = definition.inputSchema();
System.out.println("Tool: " + name);
System.out.println("Description: " + description);
System.out.println("Schema: " + inputSchema);String getOriginalToolName();Returns the original MCP tool name without any prefix modification. Useful when you need to reference the server-side tool name.
String: Original tool name from MCP server (without prefix)AsyncMcpToolCallback callback = // ... create callback with prefix
String originalName = callback.getOriginalToolName();
String prefixedName = callback.getToolDefinition().name();
System.out.println("Original: " + originalName); // "list_files"
System.out.println("Prefixed: " + prefixedName); // "my_async_server_list_files"String call(String toolCallInput);
String call(String toolCallInput, ToolContext toolContext);Executes the MCP tool asynchronously with the provided input and optional context. The call blocks on the reactive execution using .block() from Project Reactor.
Important: While the underlying execution is non-blocking and reactive, the call() method blocks to satisfy the ToolCallback interface contract. For fully reactive workflows, use the underlying McpAsyncClient directly or the AsyncMcpToolCallbackProvider.asyncToolCallbacks() factory method.
toolCallInput: JSON string containing the tool arguments
"{}""{\"path\": \"/home/user\", \"recursive\": true}"toolContext (optional): Spring AI tool context that can be converted to MCP metadata
toolContextToMcpMetaConverternew ToolContext(Map.of("requestId", "req-12345"))String: JSON string containing the tool execution results from the MCP server
ToolExecutionException: If tool execution fails or the MCP server returns an error
ToolDefinition for contextCallToolRequest with tool name, arguments, and metadatamcpClient.callTool() returning Mono<CallToolResult>ToolCallReactiveContextHolderToolExecutionExceptionMono to get resultisError()// Automatic reactive context propagation
mcpClient.callTool(request)
.contextWrite(ctx -> ctx.putAll(ToolCallReactiveContextHolder.getContext()))
.onErrorMap(exception -> new ToolExecutionException(toolDefinition, exception))
.block();This ensures context values (trace IDs, user info, etc.) flow through the reactive chain automatically.
AsyncMcpToolCallback callback = // ... create callback
// Simple execution without context
String input = "{\"path\": \"/home/user\", \"recursive\": true}";
String result = callback.call(input);
System.out.println("Result: " + result);
// Execution with context
ToolContext context = new ToolContext(Map.of(
"requestId", "req-12345",
"userId", "user789"
));
String resultWithContext = callback.call(input, context);
// Handling null/empty input (automatically becomes "{}")
String emptyResult = callback.call(null); // Same as call("{}")
String emptyResult2 = callback.call(""); // Same as call("{}")
// Error handling
try {
String result = callback.call(input);
} catch (ToolExecutionException e) {
ToolDefinition failedTool = e.getToolDefinition();
System.err.println("Async tool " + failedTool.name() + " failed");
System.err.println("Error: " + e.getMessage());
Throwable cause = e.getCause(); // Original exception
}import org.springframework.ai.mcp.AsyncMcpToolCallback;
import org.springframework.ai.mcp.ToolContextToMcpMetaConverter;
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.spec.McpSchema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.ai.chat.model.ToolContext;
import org.springframework.ai.tool.execution.ToolExecutionException;
import java.util.Map;
import java.util.List;
// Initialize async MCP client
McpAsyncClient mcpClient = // ... create your async MCP client
// Get available tools from the MCP server (returns Mono)
Mono<McpSchema.ListToolsResult> toolsResultMono = mcpClient.listTools();
McpSchema.ListToolsResult toolsResult = toolsResultMono.block();
List<McpSchema.Tool> tools = toolsResult.tools();
// Create async callbacks for each tool
List<AsyncMcpToolCallback> callbacks = tools.stream()
.map(tool -> AsyncMcpToolCallback.builder()
.mcpClient(mcpClient)
.tool(tool)
.prefixedToolName("async_server_" + tool.name())
.toolContextToMcpMetaConverter(ToolContextToMcpMetaConverter.defaultConverter())
.build())
.toList();
// Use a specific callback
AsyncMcpToolCallback callback = callbacks.get(0);
// Prepare input
String input = "{\"query\": \"example\"}";
// Execute with context
ToolContext context = new ToolContext(Map.of(
"requestId", "req-123",
"timestamp", System.currentTimeMillis()
));
try {
String result = callback.call(input, context);
System.out.println("Tool: " + callback.getToolDefinition().name());
System.out.println("Result: " + result);
} catch (ToolExecutionException e) {
System.err.println("Async execution failed: " + e.getMessage());
}
// Get tool information
ToolDefinition definition = callback.getToolDefinition();
System.out.println("Name: " + definition.name());
System.out.println("Description: " + definition.description());
System.out.println("Original Name: " + callback.getOriginalToolName());The AsyncMcpToolCallback automatically propagates reactive context using ToolCallReactiveContextHolder. This ensures context values are available throughout the reactive chain.
import org.springframework.ai.model.tool.internal.ToolCallReactiveContextHolder;
import reactor.util.context.Context;
// Context is automatically propagated from the reactive chain
// Internal implementation:
mcpClient.callTool(request)
.contextWrite(ctx -> ctx.putAll(ToolCallReactiveContextHolder.getContext()))
.block();
// This allows trace IDs, user info, and other context to flow throughimport reactor.util.context.Context;
// For fully reactive workflows (not using the blocking call() method)
Mono<String> reactiveExecution = mcpClient.callTool(request)
.contextWrite(Context.of(
"traceId", "trace-123",
"userId", "user-456"
))
.map(result -> result.content().toString());
// Subscribe or chain further operations
reactiveExecution.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Error: " + error.getMessage())
);import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@Service
public class AsyncMcpChatService {
private final ChatClient chatClient;
private final List<McpAsyncClient> asyncClients;
@Autowired
public AsyncMcpChatService(ChatModel chatModel, List<McpAsyncClient> asyncClients) {
this.asyncClients = asyncClients;
// Create async tool callbacks from MCP tools
List<AsyncMcpToolCallback> mcpCallbacks = asyncClients.stream()
.flatMap(client -> {
try {
return client.listTools().block().tools().stream()
.map(tool -> AsyncMcpToolCallback.builder()
.mcpClient(client)
.tool(tool)
.build());
} catch (Exception e) {
System.err.println("Failed to list tools: " + e.getMessage());
return java.util.stream.Stream.empty();
}
})
.toList();
// Convert to ToolCallback array
ToolCallback[] toolCallbacks = mcpCallbacks.toArray(new ToolCallback[0]);
// Create ChatClient with async MCP tools
this.chatClient = ChatClient.builder(chatModel)
.defaultFunctions(toolCallbacks)
.build();
}
public String chat(String userMessage) {
return chatClient.prompt()
.user(userMessage)
.call()
.content();
}
public String chatWithContext(String userMessage, Map<String, Object> contextData) {
return chatClient.prompt()
.user(userMessage)
.toolContext(contextData)
.call()
.content();
}
// Fully reactive variant
public Mono<String> chatReactive(String userMessage) {
// For truly reactive applications
return Mono.fromCallable(() -> chat(userMessage));
}
}The call methods throw ToolExecutionException when:
isError() set to trueError mapping is applied through the reactive chain:
mcpClient.callTool(request)
.onErrorMap(exception -> {
logger.error("Exception while tool calling: ", exception);
return new ToolExecutionException(this.getToolDefinition(), exception);
})
.block();import org.springframework.ai.tool.execution.ToolExecutionException;
import reactor.core.publisher.Mono;
import java.time.Duration;
// Pattern 1: Basic error handling
try {
String result = callback.call(input);
} catch (ToolExecutionException e) {
System.err.println("Async tool execution failed: " + e.getMessage());
ToolDefinition failedTool = e.getToolDefinition();
// Handle the error appropriately
}
// Pattern 2: Differentiate reactive error types
try {
String result = callback.call(input);
} catch (ToolExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof reactor.core.Exceptions.ReactiveException) {
System.err.println("Reactive stream error");
} else if (cause instanceof java.util.concurrent.TimeoutException) {
System.err.println("Async tool execution timed out");
} else if (cause instanceof java.net.ConnectException) {
System.err.println("Connection failed to MCP server");
} else {
System.err.println("Tool error: " + e.getMessage());
}
}
// Pattern 3: Retry with exponential backoff (using reactor)
String executeWithRetry(AsyncMcpToolCallback callback, String input, int maxRetries) {
// For fully reactive approach, work with Mono directly
return Mono.defer(() -> {
try {
return Mono.just(callback.call(input));
} catch (ToolExecutionException e) {
return Mono.error(e);
}
})
.retryWhen(reactor.util.retry.Retry.backoff(maxRetries, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(throwable -> throwable instanceof ToolExecutionException))
.block();
}
// Pattern 4: Timeout with fallback
String executeWithTimeout(AsyncMcpToolCallback callback, String input,
Duration timeout, String fallback) {
try {
return Mono.defer(() -> {
try {
return Mono.just(callback.call(input));
} catch (ToolExecutionException e) {
return Mono.error(e);
}
})
.timeout(timeout)
.onErrorReturn(fallback)
.block();
} catch (Exception e) {
System.err.println("Execution failed or timed out: " + e.getMessage());
return fallback;
}
}AsyncMcpToolCallback instances are immutable after construction and are thread-safe. The same callback instance can be safely used concurrently across multiple threads. The underlying reactive execution is managed by Project Reactor.
call() simultaneouslyMcpAsyncClient implementationimport java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.List;
import java.util.ArrayList;
AsyncMcpToolCallback callback = // ... create once
ExecutorService executor = Executors.newFixedThreadPool(10);
// Submit concurrent tasks
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String input = "{\"id\": " + i + "}";
Future<String> future = executor.submit(() -> callback.call(input));
futures.add(future);
}
// Collect results
for (Future<String> future : futures) {
try {
String result = future.get();
System.out.println("Result: " + result);
} catch (Exception e) {
System.err.println("Task failed: " + e.getMessage());
}
}
executor.shutdown();import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
AsyncMcpToolCallback callback = // ... create once
// Process multiple inputs concurrently using reactive streams
Flux.range(0, 100)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> "{\"id\": " + i + "}")
.map(input -> {
try {
return callback.call(input);
} catch (ToolExecutionException e) {
return "ERROR: " + e.getMessage();
}
})
.sequential()
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Stream error: " + error.getMessage()),
() -> System.out.println("All tasks completed")
);.call() is invokedMcpAsyncClient directlySchedulers.boundedElastic() or Schedulers.parallel()Mono operatorsContextView from Project Reactor"{}" to ensure valid JSONimport java.time.Duration;
import reactor.core.publisher.Mono;
// Configure timeout at reactive level
String executeWithTimeout(AsyncMcpToolCallback callback, String input, Duration timeout) {
return Mono.defer(() -> {
try {
return Mono.just(callback.call(input));
} catch (ToolExecutionException e) {
return Mono.error(e);
}
})
.timeout(timeout)
.block();
}
// Usage
try {
String result = executeWithTimeout(callback, input, Duration.ofSeconds(30));
} catch (Exception e) {
if (e.getCause() instanceof java.util.concurrent.TimeoutException) {
System.err.println("Operation timed out after 30 seconds");
}
}| Feature | AsyncMcpToolCallback | SyncMcpToolCallback |
|---|---|---|
| Execution Model | Reactive (Project Reactor) | Blocking |
| Client Type | McpAsyncClient | McpSyncClient |
| Context Propagation | Automatic reactive context | Standard Java context |
| Best For | Long-running operations, reactive apps | Simple operations, blocking apps |
| Backpressure | Supported through Mono | Not applicable |
| Thread Model | Event loop / scheduler threads | One thread per request |
| Timeout Handling | Reactive timeout operators | Blocking timeout |
| Error Handling | Reactive error propagation | Synchronous exceptions |
| Concurrency | High (non-blocking) | Medium (thread-per-request) |
| Memory Usage | Lower (fewer threads) | Higher (thread stack per request) |
// MCP server may return empty results
String result = callback.call("{}");
// result could be: "", "{}", "[]", or "null"
// Always check result before parsing
if (result != null && !result.isEmpty() && !result.equals("null")) {
// Process result
}import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// For large payloads, use reactive backpressure
Flux<String> processLargeDataset(AsyncMcpToolCallback callback, List<String> inputs) {
return Flux.fromIterable(inputs)
.limitRate(10) // Process 10 at a time
.flatMap(input -> Mono.defer(() -> {
try {
return Mono.just(callback.call(input));
} catch (ToolExecutionException e) {
return Mono.error(e);
}
}), 5); // Max 5 concurrent executions
}// Some tools accept no parameters
AsyncMcpToolCallback callback = // ... tool with empty input schema
String result = callback.call("{}"); // Empty object
String result2 = callback.call(null); // Also becomes "{}"import reactor.core.publisher.Mono;
import java.time.Duration;
// Reactive cancellation support
Mono<String> cancellableTool = Mono.defer(() -> {
try {
return Mono.just(callback.call(input));
} catch (ToolExecutionException e) {
return Mono.error(e);
}
})
.timeout(Duration.ofSeconds(30))
.doOnCancel(() -> System.out.println("Tool execution was cancelled"));
// Cancel after 1 second
var subscription = cancellableTool.subscribe();
Thread.sleep(1000);
subscription.dispose(); // Cancels executionimport reactor.core.publisher.Mono;
// Automatic retry on transient errors
Mono<String> resilientExecution = Mono.defer(() -> {
try {
return Mono.just(callback.call(input));
} catch (ToolExecutionException e) {
return Mono.error(e);
}
})
.retryWhen(reactor.util.retry.Retry.fixedDelay(3, Duration.ofSeconds(1))
.filter(throwable -> isTransientError(throwable)))
.onErrorResume(throwable -> {
System.err.println("All retries failed: " + throwable.getMessage());
return Mono.just("{\"error\": \"fallback\"}");
});
boolean isTransientError(Throwable t) {
return t instanceof java.net.ConnectException ||
t instanceof java.net.SocketTimeoutException;
}