CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-ai--spring-ai-mcp

Spring Framework integration for Model Context Protocol (MCP), providing Spring AI function calling capabilities and Spring-friendly abstractions for MCP clients and MCP servers

Overview
Eval results
Files

async-tool-callbacks.mddocs/reference/

Asynchronous Tool Callbacks

The AsyncMcpToolCallback class adapts MCP tools to Spring AI's ToolCallback interface with asynchronous execution capabilities using Project Reactor. It enables non-blocking tool execution suitable for long-running operations or reactive applications.

Import

import org.springframework.ai.mcp.AsyncMcpToolCallback;
import org.springframework.ai.mcp.ToolContextToMcpMetaConverter;
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.spec.McpSchema;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.ai.tool.definition.ToolDefinition;
import org.springframework.ai.chat.model.ToolContext;
import org.springframework.ai.tool.execution.ToolExecutionException;
import reactor.core.publisher.Mono;
import org.springframework.ai.model.tool.internal.ToolCallReactiveContextHolder;

Class Declaration

public class AsyncMcpToolCallback implements ToolCallback {
    // Immutable, thread-safe implementation
    // Uses Project Reactor for non-blocking execution
}

Creating Tool Callbacks

Using Builder (Recommended)

static AsyncMcpToolCallback.Builder builder();

class Builder {
    Builder mcpClient(McpAsyncClient mcpClient);                                     // required
    Builder tool(McpSchema.Tool tool);                                               // required
    Builder prefixedToolName(String prefixedToolName);                              // optional
    Builder toolContextToMcpMetaConverter(ToolContextToMcpMetaConverter converter); // optional
    AsyncMcpToolCallback build();                                                   // returns AsyncMcpToolCallback
}

Builder Parameters

  • mcpClient (required): The McpAsyncClient instance for asynchronous tool execution

    • Must be non-null and initialized
    • Should be configured with appropriate reactive settings
    • Connection should be established before creating callback
    • Uses Project Reactor (Mono/Flux) for non-blocking operations
  • tool (required): The McpSchema.Tool definition from the MCP server

    • Obtained via mcpClient.listTools().block()
    • Contains name, description, and input schema
    • Must be non-null
  • prefixedToolName (optional): Custom prefixed name for the tool

    • If not specified, defaults to the formatted original tool name
    • Used to avoid naming conflicts with multiple servers
    • Should be unique across all registered tools
    • Maximum length: 64 characters
  • toolContextToMcpMetaConverter (optional): Converter for tool context to MCP metadata

    • Defaults to ToolContextToMcpMetaConverter.defaultConverter()
    • Can be set to ToolContextToMcpMetaConverter.noOp() to skip conversion
    • Custom implementations can transform context data

Usage Example

McpAsyncClient mcpClient = // ... initialize async MCP client
McpSchema.Tool tool = // ... get tool from MCP server

// Basic usage
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
    .mcpClient(mcpClient)
    .tool(tool)
    .build();

// With custom prefix
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
    .mcpClient(mcpClient)
    .tool(tool)
    .prefixedToolName("my_async_server_list_files")
    .build();

// With custom converter
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
    .mcpClient(mcpClient)
    .tool(tool)
    .toolContextToMcpMetaConverter(ToolContextToMcpMetaConverter.noOp())
    .build();

// With all options
AsyncMcpToolCallback callback = AsyncMcpToolCallback.builder()
    .mcpClient(mcpClient)
    .tool(tool)
    .prefixedToolName("prod_weather_get_forecast_async")
    .toolContextToMcpMetaConverter(customConverter)
    .build();

Using Constructor (Deprecated)

@Deprecated
public AsyncMcpToolCallback(McpAsyncClient mcpClient, McpSchema.Tool tool);

Note: This constructor is deprecated since version 1.1.0. Use the builder pattern instead for better flexibility and readability.

Tool Callback Methods

Get Tool Definition

ToolDefinition getToolDefinition();

Returns the Spring AI ToolDefinition for this MCP tool, including the name, description, and input schema.

Returns

  • ToolDefinition containing:
    • name(): Tool name (prefixed if specified)
    • description(): Tool description from MCP server
    • inputSchema(): JSON schema for tool inputs (normalized)

Usage Example

AsyncMcpToolCallback callback = // ... create callback
ToolDefinition definition = callback.getToolDefinition();

String name = definition.name();
String description = definition.description();
String inputSchema = definition.inputSchema();

System.out.println("Tool: " + name);
System.out.println("Description: " + description);
System.out.println("Schema: " + inputSchema);

Get Original Tool Name

String getOriginalToolName();

Returns the original MCP tool name without any prefix modification. Useful when you need to reference the server-side tool name.

Returns

  • String: Original tool name from MCP server (without prefix)

Usage Example

AsyncMcpToolCallback callback = // ... create callback with prefix
String originalName = callback.getOriginalToolName();
String prefixedName = callback.getToolDefinition().name();

System.out.println("Original: " + originalName);    // "list_files"
System.out.println("Prefixed: " + prefixedName);    // "my_async_server_list_files"

Execute Tool

String call(String toolCallInput);
String call(String toolCallInput, ToolContext toolContext);

Executes the MCP tool asynchronously with the provided input and optional context. The call blocks on the reactive execution using .block() from Project Reactor.

Important: While the underlying execution is non-blocking and reactive, the call() method blocks to satisfy the ToolCallback interface contract. For fully reactive workflows, use the underlying McpAsyncClient directly or the AsyncMcpToolCallbackProvider.asyncToolCallbacks() factory method.

Parameters

  • toolCallInput: JSON string containing the tool arguments

    • Must be valid JSON object syntax
    • If null or empty, defaults to "{}"
    • Should match the tool's input schema
    • Example: "{\"path\": \"/home/user\", \"recursive\": true}"
  • toolContext (optional): Spring AI tool context that can be converted to MCP metadata

    • Can contain arbitrary key-value pairs
    • Converted via toolContextToMcpMetaConverter
    • Filtered to exclude reserved keys (e.g., "exchange")
    • Automatically propagated through reactive context
    • Example: new ToolContext(Map.of("requestId", "req-12345"))

Returns

  • String: JSON string containing the tool execution results from the MCP server
    • Always returns a JSON-formatted string
    • May contain success data or error information
    • Should be parsed according to tool's output schema

Throws

  • ToolExecutionException: If tool execution fails or the MCP server returns an error
    • Contains the ToolDefinition for context
    • Wraps the underlying exception cause
    • Includes error message from MCP server if available

Execution Flow

  1. Input validation and normalization (null/empty → "{}")
  2. Context conversion to MCP metadata (if provided)
  3. Build CallToolRequest with tool name, arguments, and metadata
  4. Invoke mcpClient.callTool() returning Mono<CallToolResult>
  5. Apply reactive context propagation via ToolCallReactiveContextHolder
  6. Map errors to ToolExecutionException
  7. Block on Mono to get result
  8. Check result for errors via isError()
  9. Extract and return result content

Reactive Context Propagation

// Automatic reactive context propagation
mcpClient.callTool(request)
    .contextWrite(ctx -> ctx.putAll(ToolCallReactiveContextHolder.getContext()))
    .onErrorMap(exception -> new ToolExecutionException(toolDefinition, exception))
    .block();

This ensures context values (trace IDs, user info, etc.) flow through the reactive chain automatically.

Usage Example

AsyncMcpToolCallback callback = // ... create callback

// Simple execution without context
String input = "{\"path\": \"/home/user\", \"recursive\": true}";
String result = callback.call(input);
System.out.println("Result: " + result);

// Execution with context
ToolContext context = new ToolContext(Map.of(
    "requestId", "req-12345",
    "userId", "user789"
));
String resultWithContext = callback.call(input, context);

// Handling null/empty input (automatically becomes "{}")
String emptyResult = callback.call(null);  // Same as call("{}")
String emptyResult2 = callback.call("");   // Same as call("{}")

// Error handling
try {
    String result = callback.call(input);
} catch (ToolExecutionException e) {
    ToolDefinition failedTool = e.getToolDefinition();
    System.err.println("Async tool " + failedTool.name() + " failed");
    System.err.println("Error: " + e.getMessage());
    Throwable cause = e.getCause();  // Original exception
}

Complete Example

import org.springframework.ai.mcp.AsyncMcpToolCallback;
import org.springframework.ai.mcp.ToolContextToMcpMetaConverter;
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.spec.McpSchema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.ai.chat.model.ToolContext;
import org.springframework.ai.tool.execution.ToolExecutionException;
import java.util.Map;
import java.util.List;

// Initialize async MCP client
McpAsyncClient mcpClient = // ... create your async MCP client

// Get available tools from the MCP server (returns Mono)
Mono<McpSchema.ListToolsResult> toolsResultMono = mcpClient.listTools();
McpSchema.ListToolsResult toolsResult = toolsResultMono.block();
List<McpSchema.Tool> tools = toolsResult.tools();

// Create async callbacks for each tool
List<AsyncMcpToolCallback> callbacks = tools.stream()
    .map(tool -> AsyncMcpToolCallback.builder()
        .mcpClient(mcpClient)
        .tool(tool)
        .prefixedToolName("async_server_" + tool.name())
        .toolContextToMcpMetaConverter(ToolContextToMcpMetaConverter.defaultConverter())
        .build())
    .toList();

// Use a specific callback
AsyncMcpToolCallback callback = callbacks.get(0);

// Prepare input
String input = "{\"query\": \"example\"}";

// Execute with context
ToolContext context = new ToolContext(Map.of(
    "requestId", "req-123",
    "timestamp", System.currentTimeMillis()
));

try {
    String result = callback.call(input, context);
    System.out.println("Tool: " + callback.getToolDefinition().name());
    System.out.println("Result: " + result);
} catch (ToolExecutionException e) {
    System.err.println("Async execution failed: " + e.getMessage());
}

// Get tool information
ToolDefinition definition = callback.getToolDefinition();
System.out.println("Name: " + definition.name());
System.out.println("Description: " + definition.description());
System.out.println("Original Name: " + callback.getOriginalToolName());

Reactive Context Propagation

The AsyncMcpToolCallback automatically propagates reactive context using ToolCallReactiveContextHolder. This ensures context values are available throughout the reactive chain.

import org.springframework.ai.model.tool.internal.ToolCallReactiveContextHolder;
import reactor.util.context.Context;

// Context is automatically propagated from the reactive chain
// Internal implementation:
mcpClient.callTool(request)
    .contextWrite(ctx -> ctx.putAll(ToolCallReactiveContextHolder.getContext()))
    .block();

// This allows trace IDs, user info, and other context to flow through

Custom Reactive Context

import reactor.util.context.Context;

// For fully reactive workflows (not using the blocking call() method)
Mono<String> reactiveExecution = mcpClient.callTool(request)
    .contextWrite(Context.of(
        "traceId", "trace-123",
        "userId", "user-456"
    ))
    .map(result -> result.content().toString());

// Subscribe or chain further operations
reactiveExecution.subscribe(
    result -> System.out.println("Result: " + result),
    error -> System.err.println("Error: " + error.getMessage())
);

Integration with Spring AI

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

@Service
public class AsyncMcpChatService {

    private final ChatClient chatClient;
    private final List<McpAsyncClient> asyncClients;

    @Autowired
    public AsyncMcpChatService(ChatModel chatModel, List<McpAsyncClient> asyncClients) {
        this.asyncClients = asyncClients;
        
        // Create async tool callbacks from MCP tools
        List<AsyncMcpToolCallback> mcpCallbacks = asyncClients.stream()
            .flatMap(client -> {
                try {
                    return client.listTools().block().tools().stream()
                        .map(tool -> AsyncMcpToolCallback.builder()
                            .mcpClient(client)
                            .tool(tool)
                            .build());
                } catch (Exception e) {
                    System.err.println("Failed to list tools: " + e.getMessage());
                    return java.util.stream.Stream.empty();
                }
            })
            .toList();

        // Convert to ToolCallback array
        ToolCallback[] toolCallbacks = mcpCallbacks.toArray(new ToolCallback[0]);

        // Create ChatClient with async MCP tools
        this.chatClient = ChatClient.builder(chatModel)
            .defaultFunctions(toolCallbacks)
            .build();
    }

    public String chat(String userMessage) {
        return chatClient.prompt()
            .user(userMessage)
            .call()
            .content();
    }

    public String chatWithContext(String userMessage, Map<String, Object> contextData) {
        return chatClient.prompt()
            .user(userMessage)
            .toolContext(contextData)
            .call()
            .content();
    }
    
    // Fully reactive variant
    public Mono<String> chatReactive(String userMessage) {
        // For truly reactive applications
        return Mono.fromCallable(() -> chat(userMessage));
    }
}

Error Handling

The call methods throw ToolExecutionException when:

  1. The MCP async client throws an exception during tool invocation
  2. The MCP server returns a result with isError() set to true
  3. JSON parsing fails during input/output processing
  4. Network connectivity issues occur
  5. Reactive stream errors occur (timeout, cancellation, etc.)

Error mapping is applied through the reactive chain:

mcpClient.callTool(request)
    .onErrorMap(exception -> {
        logger.error("Exception while tool calling: ", exception);
        return new ToolExecutionException(this.getToolDefinition(), exception);
    })
    .block();

Error Handling Patterns

import org.springframework.ai.tool.execution.ToolExecutionException;
import reactor.core.publisher.Mono;
import java.time.Duration;

// Pattern 1: Basic error handling
try {
    String result = callback.call(input);
} catch (ToolExecutionException e) {
    System.err.println("Async tool execution failed: " + e.getMessage());
    ToolDefinition failedTool = e.getToolDefinition();
    // Handle the error appropriately
}

// Pattern 2: Differentiate reactive error types
try {
    String result = callback.call(input);
} catch (ToolExecutionException e) {
    Throwable cause = e.getCause();
    if (cause instanceof reactor.core.Exceptions.ReactiveException) {
        System.err.println("Reactive stream error");
    } else if (cause instanceof java.util.concurrent.TimeoutException) {
        System.err.println("Async tool execution timed out");
    } else if (cause instanceof java.net.ConnectException) {
        System.err.println("Connection failed to MCP server");
    } else {
        System.err.println("Tool error: " + e.getMessage());
    }
}

// Pattern 3: Retry with exponential backoff (using reactor)
String executeWithRetry(AsyncMcpToolCallback callback, String input, int maxRetries) {
    // For fully reactive approach, work with Mono directly
    return Mono.defer(() -> {
        try {
            return Mono.just(callback.call(input));
        } catch (ToolExecutionException e) {
            return Mono.error(e);
        }
    })
    .retryWhen(reactor.util.retry.Retry.backoff(maxRetries, Duration.ofSeconds(1))
        .maxBackoff(Duration.ofSeconds(10))
        .filter(throwable -> throwable instanceof ToolExecutionException))
    .block();
}

// Pattern 4: Timeout with fallback
String executeWithTimeout(AsyncMcpToolCallback callback, String input, 
                          Duration timeout, String fallback) {
    try {
        return Mono.defer(() -> {
            try {
                return Mono.just(callback.call(input));
            } catch (ToolExecutionException e) {
                return Mono.error(e);
            }
        })
        .timeout(timeout)
        .onErrorReturn(fallback)
        .block();
    } catch (Exception e) {
        System.err.println("Execution failed or timed out: " + e.getMessage());
        return fallback;
    }
}

Thread Safety

AsyncMcpToolCallback instances are immutable after construction and are thread-safe. The same callback instance can be safely used concurrently across multiple threads. The underlying reactive execution is managed by Project Reactor.

Thread Safety Characteristics

  • Immutable State: All fields are final and set during construction
  • No Shared Mutable State: No instance variables are modified after construction
  • Concurrent Execution: Multiple threads can call call() simultaneously
  • Reactive Scheduler: Async operations use Project Reactor's default schedulers
  • Client Thread Safety: Thread safety depends on the underlying McpAsyncClient implementation

Concurrent Usage Example

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.List;
import java.util.ArrayList;

AsyncMcpToolCallback callback = // ... create once
ExecutorService executor = Executors.newFixedThreadPool(10);

// Submit concurrent tasks
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    String input = "{\"id\": " + i + "}";
    Future<String> future = executor.submit(() -> callback.call(input));
    futures.add(future);
}

// Collect results
for (Future<String> future : futures) {
    try {
        String result = future.get();
        System.out.println("Result: " + result);
    } catch (Exception e) {
        System.err.println("Task failed: " + e.getMessage());
    }
}

executor.shutdown();

Reactive Concurrency

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

AsyncMcpToolCallback callback = // ... create once

// Process multiple inputs concurrently using reactive streams
Flux.range(0, 100)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(i -> "{\"id\": " + i + "}")
    .map(input -> {
        try {
            return callback.call(input);
        } catch (ToolExecutionException e) {
            return "ERROR: " + e.getMessage();
        }
    })
    .sequential()
    .subscribe(
        result -> System.out.println("Result: " + result),
        error -> System.err.println("Stream error: " + error.getMessage()),
        () -> System.out.println("All tasks completed")
    );

Performance Considerations

Non-blocking Execution

  • Tool execution uses reactive patterns but blocks when .call() is invoked
  • For fully reactive workflows, use the underlying McpAsyncClient directly
  • Execution uses event loop threads, not blocking I/O threads
  • More efficient than sync for high-concurrency scenarios

Scheduler Configuration

  • Async operations use Project Reactor's default schedulers
  • Can be customized via Schedulers.boundedElastic() or Schedulers.parallel()
  • The tool execution itself runs on the reactive pipeline
  • Backpressure is supported through Mono operators

Context Propagation Performance

  • Reactive context is automatically propagated
  • Ensures trace IDs and other context values flow through the execution chain
  • Minimal overhead (< 1% of execution time)
  • Uses ContextView from Project Reactor

Null Input Handling

  • Empty or null input is automatically converted to "{}" to ensure valid JSON
  • This conversion happens before creating the reactive pipeline
  • No performance penalty for null/empty input handling

Timeout Configuration

import java.time.Duration;
import reactor.core.publisher.Mono;

// Configure timeout at reactive level
String executeWithTimeout(AsyncMcpToolCallback callback, String input, Duration timeout) {
    return Mono.defer(() -> {
        try {
            return Mono.just(callback.call(input));
        } catch (ToolExecutionException e) {
            return Mono.error(e);
        }
    })
    .timeout(timeout)
    .block();
}

// Usage
try {
    String result = executeWithTimeout(callback, input, Duration.ofSeconds(30));
} catch (Exception e) {
    if (e.getCause() instanceof java.util.concurrent.TimeoutException) {
        System.err.println("Operation timed out after 30 seconds");
    }
}

Comparison with SyncMcpToolCallback

FeatureAsyncMcpToolCallbackSyncMcpToolCallback
Execution ModelReactive (Project Reactor)Blocking
Client TypeMcpAsyncClientMcpSyncClient
Context PropagationAutomatic reactive contextStandard Java context
Best ForLong-running operations, reactive appsSimple operations, blocking apps
BackpressureSupported through MonoNot applicable
Thread ModelEvent loop / scheduler threadsOne thread per request
Timeout HandlingReactive timeout operatorsBlocking timeout
Error HandlingReactive error propagationSynchronous exceptions
ConcurrencyHigh (non-blocking)Medium (thread-per-request)
Memory UsageLower (fewer threads)Higher (thread stack per request)

When to Use AsyncMcpToolCallback

  • Long-running tool operations (> 5 seconds)
  • High-concurrency environments (> 100 concurrent requests)
  • Reactive Spring applications (WebFlux)
  • Microservices with async communication
  • Operations requiring backpressure handling
  • Systems with limited thread pools

When to Use SyncMcpToolCallback

  • Simple, short-running operations (< 5 seconds)
  • Traditional Spring MVC applications
  • Sequential batch processing
  • Immediate result requirements
  • Simpler error handling needs
  • Applications without reactive dependencies

Edge Cases and Special Scenarios

Empty Tool Results

// MCP server may return empty results
String result = callback.call("{}");
// result could be: "", "{}", "[]", or "null"
// Always check result before parsing
if (result != null && !result.isEmpty() && !result.equals("null")) {
    // Process result
}

Large Input/Output with Backpressure

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// For large payloads, use reactive backpressure
Flux<String> processLargeDataset(AsyncMcpToolCallback callback, List<String> inputs) {
    return Flux.fromIterable(inputs)
        .limitRate(10)  // Process 10 at a time
        .flatMap(input -> Mono.defer(() -> {
            try {
                return Mono.just(callback.call(input));
            } catch (ToolExecutionException e) {
                return Mono.error(e);
            }
        }), 5);  // Max 5 concurrent executions
}

Tool with No Parameters

// Some tools accept no parameters
AsyncMcpToolCallback callback = // ... tool with empty input schema
String result = callback.call("{}");  // Empty object
String result2 = callback.call(null);  // Also becomes "{}"

Cancellation Handling

import reactor.core.publisher.Mono;
import java.time.Duration;

// Reactive cancellation support
Mono<String> cancellableTool = Mono.defer(() -> {
    try {
        return Mono.just(callback.call(input));
    } catch (ToolExecutionException e) {
        return Mono.error(e);
    }
})
.timeout(Duration.ofSeconds(30))
.doOnCancel(() -> System.out.println("Tool execution was cancelled"));

// Cancel after 1 second
var subscription = cancellableTool.subscribe();
Thread.sleep(1000);
subscription.dispose();  // Cancels execution

Reactive Error Recovery

import reactor.core.publisher.Mono;

// Automatic retry on transient errors
Mono<String> resilientExecution = Mono.defer(() -> {
    try {
        return Mono.just(callback.call(input));
    } catch (ToolExecutionException e) {
        return Mono.error(e);
    }
})
.retryWhen(reactor.util.retry.Retry.fixedDelay(3, Duration.ofSeconds(1))
    .filter(throwable -> isTransientError(throwable)))
.onErrorResume(throwable -> {
    System.err.println("All retries failed: " + throwable.getMessage());
    return Mono.just("{\"error\": \"fallback\"}");
});

boolean isTransientError(Throwable t) {
    return t instanceof java.net.ConnectException ||
           t instanceof java.net.SocketTimeoutException;
}

Related Components

  • Synchronous Tool Callbacks - Blocking alternative
  • Asynchronous Tool Discovery - Automatic async tool discovery
  • Context and Metadata Conversion - Converting tool context to MCP metadata
  • MCP Tool Utilities - Utility methods for tool management

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-ai--spring-ai-mcp@1.1.0

docs

index.md

tile.json